#include <Transport.h>
Inheritance diagram for TAO_Transport:
Template methods | |
The Transport class uses the Template Method Pattern to implement the protocol specific functionality. Implementors of a pluggable protocol should override the following methods with the semantics documented below. | |
enum | TAO_Message_Semantics { TAO_ONEWAY_REQUEST = 0, TAO_TWOWAY_REQUEST = 1, TAO_REPLY } |
virtual ACE_Event_Handler * | event_handler_i (void)=0 |
bool | is_connected (void) const |
Is this transport really connected. | |
bool | post_open (size_t id) |
Perform all the actions when this transport get opened. | |
void | pre_close (void) |
do what needs to be done when closing the transport | |
TAO_Connection_Handler * | connection_handler (void) |
Get the connection handler for this transport. | |
TAO_OutputCDR & | out_stream (void) |
Accessor for the output CDR stream. | |
int | generate_locate_request (TAO_Target_Specification &spec, TAO_Operation_Details &opdetails, TAO_OutputCDR &output) |
virtual int | generate_request_header (TAO_Operation_Details &opd, TAO_Target_Specification &spec, TAO_OutputCDR &msg) |
int | recache_transport (TAO_Transport_Descriptor_Interface *desc) |
Recache ourselves in the cache. | |
virtual int | handle_input (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time=0) |
Callback to read incoming data. | |
virtual int | send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, TAO_Message_Semantics message_semantics, ACE_Time_Value *max_time_wait)=0 |
virtual int | send_message (TAO_OutputCDR &stream, TAO_Stub *stub=0, TAO_Message_Semantics message_semantics=TAO_Transport::TAO_TWOWAY_REQUEST, ACE_Time_Value *max_time_wait=0)=0 |
virtual int | send_message_shared (TAO_Stub *stub, TAO_Message_Semantics message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
Sent the contents of message_block. | |
int | format_queue_message (TAO_OutputCDR &stream, ACE_Time_Value *max_wait_time) |
int | send_message_block_chain (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time=0) |
Send a message block chain,. | |
int | send_message_block_chain_i (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time) |
Send a message block chain, assuming the lock is held. | |
int | purge_entry (void) |
Cache management. | |
int | make_idle (void) |
Cache management. | |
int | update_transport (void) |
Cache management. | |
int | handle_timeout (const ACE_Time_Value ¤t_time, const void *act) |
size_t | recv_buffer_size (void) const |
Accessor to recv_buffer_size_. | |
size_t | sent_byte_count (void) const |
Accessor to sent_byte_count_. | |
TAO_Codeset_Translator_Base * | char_translator (void) const |
CodeSet Negotiation - Get the char codeset translator factory. | |
TAO_Codeset_Translator_Base * | wchar_translator (void) const |
CodeSet Negotiation - Get the wchar codeset translator factory. | |
void | char_translator (TAO_Codeset_Translator_Base *) |
CodeSet negotiation - Set the char codeset translator factory. | |
void | wchar_translator (TAO_Codeset_Translator_Base *) |
CodeSet negotiation - Set the wchar codeset translator factory. | |
void | assign_translators (TAO_InputCDR *, TAO_OutputCDR *) |
void | clear_translators (TAO_InputCDR *, TAO_OutputCDR *) |
CORBA::Boolean | is_tcs_set () const |
Return true if the tcs has been set. | |
void | first_request_sent () |
Set the state of the first_request_ flag to 0. | |
void | send_connection_closed_notifications (void) |
TAO::Transport::Stats * | stats (void) const |
Transport statistics. | |
virtual TAO_Connection_Handler * | connection_handler_i (void)=0 |
int | process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) |
int | send_message_shared_i (TAO_Stub *stub, TAO_Message_Semantics message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
int | queue_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time, bool back=true) |
CORBA::ULong const | tag_ |
IOP protocol tag. | |
TAO_ORB_Core *const | orb_core_ |
Global orbcore resource. | |
TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * | cache_map_entry_ |
TAO_Transport_Mux_Strategy * | tms_ |
TAO_Wait_Strategy * | ws_ |
Strategy for waiting for the reply after sending the request. | |
int | bidirectional_flag_ |
TAO::Connection_Role | opening_connection_role_ |
TAO_Queued_Message * | head_ |
Implement the outgoing data queue. | |
TAO_Queued_Message * | tail_ |
TAO_Incoming_Message_Queue | incoming_message_queue_ |
Queue of the consolidated, incoming messages.. | |
TAO::Incoming_Message_Stack | incoming_message_stack_ |
ACE_Time_Value | current_deadline_ |
long | flush_timer_id_ |
The timer ID. | |
TAO_Transport_Timer | transport_timer_ |
The adapter used to receive timeout callbacks from the Reactor. | |
ACE_Lock * | handler_lock_ |
size_t | id_ |
A unique identifier for the transport. | |
unsigned long | purging_order_ |
Used by the LRU, LFU and FIFO Connection Purging Strategies. | |
size_t | recv_buffer_size_ |
Size of the buffer received. | |
size_t | sent_byte_count_ |
Number of bytes sent. | |
bool | is_connected_ |
TAO_GIOP_Message_Base * | messaging_object_ |
Our messaging object. | |
TAO::Transport_Cache_Manager & | transport_cache_manager (void) |
Helper method that returns the Transport Cache Manager. | |
int | drain_queue (ACE_Time_Value *max_wait_time) |
Send some of the data in the queue. | |
int | drain_queue_i (ACE_Time_Value *max_wait_time) |
Implement drain_queue() assuming the lock is held. | |
bool | queue_is_empty_i (void) |
Check if there are messages pending in the queue. | |
int | drain_queue_helper (int &iovcnt, iovec iov[], ACE_Time_Value *max_wait_time) |
A helper routine used in drain_queue_i(). | |
int | schedule_output_i (void) |
Schedule handle_output() callbacks. | |
int | cancel_output_i (void) |
Cancel handle_output() callbacks. | |
void | cleanup_queue (size_t byte_count) |
Cleanup the queue. | |
void | cleanup_queue_i () |
Cleanup the complete queue. | |
int | check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush) |
Check if the buffering constraints have been reached. | |
int | send_synchronous_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
int | send_reply_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
int | send_asynchronous_message_i (TAO_Stub *stub, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
int | send_synch_message_helper_i (TAO_Synch_Queued_Message &s, ACE_Time_Value *max_wait_time) |
int | flush_timer_pending (void) const |
Check if the flush timer is still pending. | |
void | reset_flush_timer (void) |
void | report_invalid_event_handler (const char *caller) |
Print out error messages if the event handler is not valid. | |
int | handle_input_missing_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time, TAO_Queued_Data *q_data) |
int | handle_input_parse_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time) |
int | handle_input_parse_extra_messages (ACE_Message_Block &message_block) |
int | consolidate_enqueue_message (TAO_Queued_Data *qd) |
int | consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) |
int | process_queue_head (TAO_Resume_Handle &rh) |
int | notify_reactor (void) |
void | send_connection_closed_notifications_i (void) |
Assume the lock is held. | |
void | allocate_partial_message_block (void) |
TAO_Transport (const TAO_Transport &) | |
void | operator= (const TAO_Transport &) |
TAO_Codeset_Translator_Base * | char_translator_ |
Additional member values required to support codeset translation. | |
TAO_Codeset_Translator_Base * | wchar_translator_ |
CORBA::Boolean | tcs_set_ |
CORBA::Boolean | first_request_ |
ACE_Message_Block * | partial_message_ |
Holds the partial GIOP message (if there is one). | |
TAO::Transport::Stats * | stats_ |
Statistics. | |
class | TAO_Reactive_Flushing_Strategy |
class | TAO_Leader_Follower_Flushing_Strategy |
class | TAO_Thread_Per_Connection_Handler |
Public Member Functions | |
TAO_Transport (CORBA::ULong tag, TAO_ORB_Core *orb_core, size_t input_cdr_size=ACE_CDR::DEFAULT_BUFSIZE) | |
Default creator, requires the tag value be supplied. | |
virtual | ~TAO_Transport (void) |
Destructor. | |
CORBA::ULong | tag (void) const |
Return the protocol tag. | |
TAO_ORB_Core * | orb_core (void) const |
Access the ORB that owns this connection. | |
TAO_Transport_Mux_Strategy * | tms (void) const |
Get the TAO_Tranport_Mux_Strategy used by this object. | |
TAO_Wait_Strategy * | wait_strategy (void) const |
Return the TAO_Wait_Strategy used by this object. | |
int | handle_output (ACE_Time_Value *max_wait_time) |
Callback method to reactively drain the outgoing data queue. | |
int | bidirectional_flag (void) const |
Get the bidirectional flag. | |
void | bidirectional_flag (int flag) |
Set the bidirectional flag. | |
void | cache_map_entry (TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *entry) |
Set the Cache Map entry. | |
TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * | cache_map_entry (void) |
Get the Cache Map entry. | |
size_t | id (void) const |
Set and Get the identifier for this transport instance. | |
void | id (size_t id) |
TAO::Connection_Role | opened_as (void) const |
void | opened_as (TAO::Connection_Role) |
unsigned long | purging_order (void) const |
void | purging_order (unsigned long value) |
bool | queue_is_empty (void) |
Check if there are messages pending in the queue. | |
void | provide_handler (TAO::Connection_Handler_Set &handlers) |
Added event handler to the handlers set. | |
bool | provide_blockable_handler (TAO::Connection_Handler_Set &handlers) |
virtual int | register_handler (void) |
Register the handler with the reactor. | |
virtual ssize_t | send (iovec *iov, int iovcnt, size_t &bytes_transferred, const ACE_Time_Value *timeout=0)=0 |
Write the complete Message_Block chain to the connection. | |
virtual ssize_t | recv (char *buffer, size_t len, const ACE_Time_Value *timeout=0)=0 |
Read len bytes from into buf. | |
Control connection lifecycle | |
These methods are routed through the TMS object. The TMS strategies implement them correctly. | |
bool | idle_after_send (void) |
bool | idle_after_reply (void) |
virtual void | close_connection (void) |
Call the implementation method after obtaining the lock. | |
Template methods | |
The Transport class uses the Template Method Pattern to implement the protocol specific functionality. Implementors of a pluggable protocol should override the following methods with the semantics documented below. | |
void | messaging_init (TAO_GIOP_Message_Version const &version) |
virtual int | tear_listen_point_list (TAO_InputCDR &cdr) |
virtual bool | post_connect_hook (void) |
Hooks that can be overridden in concrete transports. | |
ACE_Event_Handler::Reference_Count | add_reference (void) |
Memory management routines. | |
ACE_Event_Handler::Reference_Count | remove_reference (void) |
TAO_GIOP_Message_Base * | messaging_object (void) |
The transport object is created in the Service handler constructor and deleted in the Service Handler's destructor!!
The main responsability of a Transport object is to encapsulate a connection, and provide a transport independent way to send and receive data. Since TAO is heavily based on the Reactor for all if not all its I/O the Transport class is usually implemented with a helper Connection Handler that adapts the generic Transport interface to the Reactor types.
One of the responsibilities of the TAO_Transport class is to send out GIOP messages as efficiently as possible. In most cases messages are put out in FIFO order, the transport object will put out the message using a single system call and return control to the application. However, for oneways and AMI requests it may be more efficient (or required if the SYNC_NONE policy is in effect) to queue the messages until a large enough data set is available. Another reason to queue is that some applications cannot block for I/O, yet they want to send messages so large that a single write() operation would not be able to cope with them. In such cases we need to queue the data and use the Reactor to drain the queue.
Therefore, the Transport class may need to use a queue to temporarily hold the messages, and, in some configurations, it may need to use the Reactor to concurrently drain such queues.
TAO provides explicit policies to send 'urgent' messages. Such messages may put at the head of the queue. However, they cannot be sent immediately because the transport may already be sending another message in a reactive fashion.
Consequently, the Transport must also know if the head of the queue has been partially sent. In that case new messages can only follow the head. Only once the head is completely sent we can start sending new messages.
One or more threads can be blocked waiting for the connection to completely send the message. The thread should return as soon as its message has been sent, so a per-thread condition is required. This suggest that simply using a ACE_Message_Queue would not be enough: there is a significant amount of ancillary information, to keep on each message that the Message_Block class does not provide room for.
Blocking I/O is still attractive for some applications. First, my eliminating the Reactor overhead performance is improved when sending large blocks of data. Second, using the Reactor to send out data opens the door for nested upcalls, yet some applications cannot deal with the reentrancy issues in this case.
Some or all messages could have a timeout period attached to them. The timeout source could either be some high-level policy or maybe some strategy to prevent denial of service attacks. In any case the timeouts are per-message, and later messages could have shorter timeouts. In fact, some kind of scheduling (such as EDF) could be required in a few applications.
The outgoing data path consist in several components:
The Transport object provides a single method to send request messages (send_request_message ()).
One of the main responsibilities of the transport is to read and process the incoming GIOP message as quickly and efficiently as possible. There are other forces that needs to be given due consideration. They are
The messages should be checked for validity and the right information should be sent to the higher layer for processing. The process of doing a sanity check and preparing the messages for the higher layers of the ORB are done by the messaging protocol.
To keep things as efficient as possible for medium sized requests, it would be good to minimise data copying and locking along the incoming path ie. from the time of reading the data from the handle to the application. We achieve this by creating a buffer on stack and reading the data from the handle into the buffer. We then pass the same data block (the buffer is encapsulated into a data block) to the higher layers of the ORB. The problems stem from the following (a) Data is bigger than the buffer that we have on stack (b) Transports like TCP do not guarantee availability of the whole chunk of data in one shot. Data could trickle in byte by byte. (c) Single read gives multiple messages
We solve the problems as follows
(a) First do a read with the buffer on stack. Query the underlying messaging object whether the message has any incomplete portion. If so, data will be copied into new buffer being able to hold full message and is queued; succeeding events will read data from socket and write directly into this buffer. Otherwise, if if the message in local buffer is complete, we free the handle and then send the message to the higher layers of the ORB for processing.
(b) If buffer with incomplete message has been enqueued, while trying to do the above, the reactor will call us back when the handle becomes read ready. The read-operation will copy data directly into the enqueued buffer. If the message has bee read completely the message is sent to the higher layers of the ORB for processing.
(c) If we get multiple messages (possible if the client connected to the server sends oneways or AMI requests), we parse and split the messages. Every message is put in the queue. Once the messages are queued, the thread picks up one message to send to the higher layers of the ORB. Before doing that, if it finds more messages, it sends a notify to the reactor without resuming the handle. The next thread picks up a message from the queue and processes that. Once the queue is drained the last thread resumes the handle.
We could use the outgoing path of the ORB to send replies. This would allow us to reuse most of the code in the outgoing data path. We were doing this till TAO-1.2.3. We run in to problems. When writing the reply the ORB gets flow controlled, and the ORB tries to flush the message by going into the reactor. This resulted in unnecessary nesting. The thread that gets into the Reactor could potentially handle other messages (incoming or outgoing) and the stack starts growing leading to crashes.
The solution that we (plan to) adopt is pretty straight forward. The thread sending replies will not block to send the replies but queue the replies and return to the Reactor. (Note the careful usages of the terms "blocking in the Reactor" as opposed to "return back to the Reactor".
See Also:
Definition at line 245 of file Transport.h.
Definition at line 599 of file Transport.h.
00600 { 00601 TAO_ONEWAY_REQUEST = 0, 00602 TAO_TWOWAY_REQUEST = 1, 00603 TAO_REPLY 00604 };
TAO_Transport::TAO_Transport | ( | CORBA::ULong | tag, | |
TAO_ORB_Core * | orb_core, | |||
size_t | input_cdr_size = ACE_CDR::DEFAULT_BUFSIZE | |||
) |
Default creator, requires the tag value be supplied.
Definition at line 127 of file Transport.cpp.
References ACE_NEW, ACE_NEW_THROW_EX, TAO_ORB_Core::client_factory(), TAO_Client_Strategy_Factory::create_transport_mux_strategy(), TAO_Client_Strategy_Factory::create_wait_strategy(), orb_core(), orb_core_, tms_, and ws_.
00130 : tag_ (tag) 00131 , orb_core_ (orb_core) 00132 , cache_map_entry_ (0) 00133 , bidirectional_flag_ (-1) 00134 , opening_connection_role_ (TAO::TAO_UNSPECIFIED_ROLE) 00135 , head_ (0) 00136 , tail_ (0) 00137 , incoming_message_queue_ (orb_core) 00138 , current_deadline_ (ACE_Time_Value::zero) 00139 , flush_timer_id_ (-1) 00140 , transport_timer_ (this) 00141 , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ()) 00142 , id_ ((size_t) this) 00143 , purging_order_ (0) 00144 , recv_buffer_size_ (0) 00145 , sent_byte_count_ (0) 00146 , is_connected_ (false) 00147 , messaging_object_ (0) 00148 , char_translator_ (0) 00149 , wchar_translator_ (0) 00150 , tcs_set_ (0) 00151 , first_request_ (1) 00152 , partial_message_ (0) 00153 #if TAO_HAS_SENDFILE == 1 00154 // The ORB has been configured to use the MMAP allocator, meaning 00155 // we could/should use sendfile() to send data. Cast once rather 00156 // here rather than during each send. This assumes that all 00157 // TAO_OutputCDR instances are using the same TAO_MMAP_Allocator 00158 // instance as the underlying output CDR buffer allocator. 00159 , mmap_allocator_ ( 00160 dynamic_cast<TAO_MMAP_Allocator *> ( 00161 orb_core->output_cdr_buffer_allocator ())) 00162 #endif /* TAO_HAS_SENDFILE==1 */ 00163 #if TAO_HAS_TRANSPORT_CURRENT == 1 00164 , stats_ (0) 00165 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 00166 { 00167 ACE_NEW (this->messaging_object_, 00168 TAO_GIOP_Message_Base (orb_core, 00169 this, 00170 input_cdr_size)); 00171 00172 TAO_Client_Strategy_Factory *cf = 00173 this->orb_core_->client_factory (); 00174 00175 // Create WS now. 00176 this->ws_ = cf->create_wait_strategy (this); 00177 00178 // Create TMS now. 00179 this->tms_ = cf->create_transport_mux_strategy (this); 00180 00181 #if TAO_HAS_TRANSPORT_CURRENT == 1 00182 // Allocate stats 00183 ACE_NEW_THROW_EX (this->stats_, 00184 TAO::Transport::Stats, 00185 CORBA::NO_MEMORY ()); 00186 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 00187 00188 /* 00189 * Hook to add code that initializes components that 00190 * belong to the concrete protocol implementation. 00191 * Further additions to this Transport class will 00192 * need to add code *before* this hook. 00193 */ 00194 //@@ TAO_TRANSPORT_SPL_CONSTRUCTOR_ADD_HOOK 00195 }
TAO_Transport::~TAO_Transport | ( | void | ) | [virtual] |
Destructor.
Definition at line 197 of file Transport.cpp.
References ACE_ASSERT, cleanup_queue_i(), handler_lock_, messaging_object_, purge_entry(), ACE_Message_Block::release(), stats_, tms_, and ws_.
00198 { 00199 delete this->messaging_object_; 00200 00201 delete this->ws_; 00202 00203 delete this->tms_; 00204 00205 delete this->handler_lock_; 00206 00207 if (!this->is_connected_) 00208 { 00209 // When we have a not connected transport we could have buffered 00210 // messages on this transport which we have to cleanup now. 00211 this->cleanup_queue_i(); 00212 00213 // Cleanup our cache entry 00214 this->purge_entry(); 00215 } 00216 00217 // Release the partial message block, however we may 00218 // have never allocated one. 00219 ACE_Message_Block::release (this->partial_message_); 00220 00221 // By the time the destructor is reached here all the connection stuff 00222 // *must* have been cleaned up. 00223 00224 // The following assert is needed for the test "Bug_2494_Regression". 00225 // See the bugzilla bug #2494 for details. 00226 ACE_ASSERT (this->head_ == 0); 00227 ACE_ASSERT (this->cache_map_entry_ == 0); 00228 00229 #if TAO_HAS_TRANSPORT_CURRENT == 1 00230 delete this->stats_; 00231 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 00232 00233 /* 00234 * Hook to add code that cleans up components 00235 * belong to the concrete protocol implementation. 00236 * Further additions to this Transport class will 00237 * need to add code *before* this hook. 00238 */ 00239 //@@ TAO_TRANSPORT_SPL_DESTRUCTOR_ADD_HOOK 00240 }
TAO_Transport::TAO_Transport | ( | const TAO_Transport & | ) | [private] |
ACE_Event_Handler::Reference_Count TAO_Transport::add_reference | ( | void | ) |
Memory management routines.
Definition at line 2582 of file Transport.cpp.
References ACE_Event_Handler::add_reference(), and event_handler_i().
Referenced by TAO::Cache_IntId::Cache_IntId(), TAO_Connection_Handler::connection_pending(), TAO::Cache_IntId::operator=(), TAO::Transport_Cache_Manager::purge(), and TAO_Asynch_Reply_Dispatcher_Base::transport().
02583 { 02584 return this->event_handler_i ()->add_reference (); 02585 }
void TAO_Transport::allocate_partial_message_block | ( | void | ) | [private] |
Allocate a partial message block and store it in our partial_message_ data member.
Definition at line 2662 of file Transport.cpp.
References ACE_NEW, TAO_GIOP_Message_Base::header_length(), and messaging_object().
Referenced by handle_input_parse_data().
02663 { 02664 if (this->partial_message_ == 0) 02665 { 02666 // This value must be at least large enough to hold a GIOP message 02667 // header plus a GIOP fragment header 02668 size_t const partial_message_size = 02669 this->messaging_object ()->header_length (); 02670 // + this->messaging_object ()->fragment_header_length (); 02671 // deprecated, conflicts with not-single_read_opt. 02672 02673 ACE_NEW (this->partial_message_, 02674 ACE_Message_Block (partial_message_size)); 02675 } 02676 }
void TAO_Transport::assign_translators | ( | TAO_InputCDR * | , | |
TAO_OutputCDR * | ||||
) |
Use the Transport's codeset factories to set the translator for input and output CDRs.
Definition at line 2552 of file Transport.cpp.
References TAO_Codeset_Translator_Base::assign(), char_translator_, and wchar_translator_.
Referenced by TAO::LocateRequest_Invocation::check_reply(), TAO::Synch_Twoway_Invocation::check_reply_status(), TAO_GIOP_Message_Base::process_reply_message(), TAO_GIOP_Message_Base::process_request(), TAO_GIOP_Message_Base::process_request_message(), and TAO::Remote_Invocation::write_header().
02553 { 02554 if (this->char_translator_) 02555 { 02556 this->char_translator_->assign (inp); 02557 this->char_translator_->assign (outp); 02558 } 02559 if (this->wchar_translator_) 02560 { 02561 this->wchar_translator_->assign (inp); 02562 this->wchar_translator_->assign (outp); 02563 } 02564 }
ACE_INLINE void TAO_Transport::bidirectional_flag | ( | int | flag | ) |
Set the bidirectional flag.
Definition at line 39 of file Transport.inl.
References bidirectional_flag_.
00040 { 00041 this->bidirectional_flag_ = flag; 00042 }
ACE_INLINE int TAO_Transport::bidirectional_flag | ( | void | ) | const |
Get the bidirectional flag.
Definition at line 33 of file Transport.inl.
References bidirectional_flag_.
Referenced by TAO_IIOP_Transport::generate_request_header(), TAO_Muxed_TMS::request_id(), TAO_Exclusive_TMS::request_id(), and TAO_IIOP_Transport::tear_listen_point_list().
00034 { 00035 return this->bidirectional_flag_; 00036 }
ACE_INLINE TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * TAO_Transport::cache_map_entry | ( | void | ) |
Get the Cache Map entry.
Definition at line 57 of file Transport.inl.
References cache_map_entry_.
00058 { 00059 return this->cache_map_entry_; 00060 }
ACE_INLINE void TAO_Transport::cache_map_entry | ( | TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * | entry | ) |
Set the Cache Map entry.
Definition at line 63 of file Transport.inl.
References cache_map_entry_.
Referenced by TAO::Transport_Cache_Manager::bind_i().
00065 { 00066 this->cache_map_entry_ = entry; 00067 }
int TAO_Transport::cancel_output_i | ( | void | ) | [private] |
Cancel handle_output() callbacks.
Definition at line 805 of file Transport.cpp.
References ACE_DEBUG, ACE_TEXT, ACE_Reactor::cancel_wakeup(), event_handler_i(), LM_DEBUG, ACE_Event_Handler::reactor(), TAO_debug_level, and ACE_Event_Handler::WRITE_MASK.
Referenced by TAO_Reactive_Flushing_Strategy::cancel_output(), and TAO_Leader_Follower_Flushing_Strategy::cancel_output().
00806 { 00807 ACE_Event_Handler * const eh = this->event_handler_i (); 00808 ACE_Reactor *const reactor = eh->reactor (); 00809 00810 if (TAO_debug_level > 3) 00811 { 00812 ACE_DEBUG ((LM_DEBUG, 00813 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"), 00814 this->id ())); 00815 } 00816 00817 return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK); 00818 }
ACE_INLINE void TAO_Transport::char_translator | ( | TAO_Codeset_Translator_Base * | ) |
CodeSet negotiation - Set the char codeset translator factory.
Definition at line 143 of file Transport.inl.
References char_translator_, and tcs_set_.
00144 { 00145 this->char_translator_ = tf; 00146 this->tcs_set_ = 1; 00147 }
ACE_INLINE TAO_Codeset_Translator_Base * TAO_Transport::char_translator | ( | void | ) | const |
CodeSet Negotiation - Get the char codeset translator factory.
Definition at line 131 of file Transport.inl.
References char_translator_.
00132 { 00133 return this->char_translator_; 00134 }
int TAO_Transport::check_buffering_constraints_i | ( | TAO_Stub * | stub, | |
bool & | must_flush | |||
) | [private] |
Check if the buffering constraints have been reached.
Definition at line 1152 of file Transport.cpp.
References ACE_Reactor::cancel_timer(), current_deadline_, event_handler_i(), flush_timer_id_, ACE_OS::gettimeofday(), ACE_Event_Handler::reactor(), ACE_Reactor::schedule_timer(), and TAO_Stub::transport_queueing_strategy().
Referenced by send_asynchronous_message_i().
01153 { 01154 // First let's compute the size of the queue: 01155 size_t msg_count = 0; 01156 size_t total_bytes = 0; 01157 01158 for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) 01159 { 01160 ++msg_count; 01161 total_bytes += i->message_length (); 01162 } 01163 01164 bool set_timer = false; 01165 ACE_Time_Value new_deadline; 01166 01167 TAO::Transport_Queueing_Strategy *queue_strategy = 01168 stub->transport_queueing_strategy (); 01169 01170 bool constraints_reached = true; 01171 01172 if (queue_strategy) 01173 { 01174 constraints_reached = 01175 queue_strategy->buffering_constraints_reached (stub, 01176 msg_count, 01177 total_bytes, 01178 must_flush, 01179 this->current_deadline_, 01180 set_timer, 01181 new_deadline); 01182 } 01183 else 01184 { 01185 must_flush = false; 01186 } 01187 01188 // ... set the new timer, also cancel any previous timers ... 01189 if (set_timer) 01190 { 01191 ACE_Event_Handler *eh = this->event_handler_i (); 01192 ACE_Reactor * const reactor = eh->reactor (); 01193 this->current_deadline_ = new_deadline; 01194 ACE_Time_Value delay = new_deadline - ACE_OS::gettimeofday (); 01195 01196 if (this->flush_timer_pending ()) 01197 { 01198 reactor->cancel_timer (this->flush_timer_id_); 01199 } 01200 01201 this->flush_timer_id_ = 01202 reactor->schedule_timer (&this->transport_timer_, 01203 &this->current_deadline_, 01204 delay); 01205 } 01206 01207 return constraints_reached; 01208 }
void TAO_Transport::cleanup_queue | ( | size_t | byte_count | ) | [private] |
Cleanup the queue.
Exactly byte_count bytes have been sent, the queue must be cleaned up as potentially several messages have been completely sent out. It leaves on head_ the next message to send out.
Definition at line 1115 of file Transport.cpp.
References ACE_DEBUG, ACE_TEXT, TAO_Queued_Message::all_data_sent(), TAO_Queued_Message::bytes_transferred(), TAO_Queued_Message::destroy(), head_, LM_DEBUG, TAO_Queued_Message::message_length(), TAO_Queued_Message::remove_from_list(), and TAO_debug_level.
Referenced by drain_queue_helper().
01116 { 01117 while (this->head_ != 0 && byte_count > 0) 01118 { 01119 TAO_Queued_Message *i = this->head_; 01120 01121 if (TAO_debug_level > 4) 01122 { 01123 ACE_DEBUG ((LM_DEBUG, 01124 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ") 01125 ACE_TEXT ("byte_count = %d\n"), 01126 this->id (), byte_count)); 01127 } 01128 01129 // Update the state of the first message 01130 i->bytes_transferred (byte_count); 01131 01132 if (TAO_debug_level > 4) 01133 { 01134 ACE_DEBUG ((LM_DEBUG, 01135 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ") 01136 ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"), 01137 this->id (), byte_count, i->all_data_sent (), 01138 i->message_length ())); 01139 } 01140 01141 // ... if all the data was sent the message must be removed from 01142 // the queue... 01143 if (i->all_data_sent ()) 01144 { 01145 i->remove_from_list (this->head_, this->tail_); 01146 i->destroy (); 01147 } 01148 } 01149 }
void TAO_Transport::cleanup_queue_i | ( | ) | [private] |
Cleanup the complete queue.
Definition at line 1072 of file Transport.cpp.
References ACE_DEBUG, ACE_TEXT, TAO_Queued_Message::destroy(), head_, TAO_LF_Event::LFS_CONNECTION_CLOSED, LM_DEBUG, TAO_Queued_Message::message_length(), TAO_Queued_Message::remove_from_list(), TAO_LF_Event::state_changed(), and TAO_debug_level.
Referenced by pre_close(), send_connection_closed_notifications_i(), and ~TAO_Transport().
01073 { 01074 if (TAO_debug_level > 4) 01075 { 01076 ACE_DEBUG ((LM_DEBUG, 01077 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ") 01078 ACE_TEXT ("cleaning up complete queue\n"), 01079 this->id ())); 01080 } 01081 01082 size_t byte_count = 0; 01083 int msg_count = 0; 01084 01085 // Cleanup all messages 01086 while (this->head_ != 0) 01087 { 01088 TAO_Queued_Message *i = this->head_; 01089 01090 if (TAO_debug_level > 4) 01091 { 01092 byte_count += i->message_length(); 01093 ++msg_count; 01094 } 01095 // @@ This is a good point to insert a flag to indicate that a 01096 // CloseConnection message was successfully received. 01097 i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED, 01098 this->orb_core_->leader_follower ()); 01099 01100 i->remove_from_list (this->head_, this->tail_); 01101 01102 i->destroy (); 01103 } 01104 01105 if (TAO_debug_level > 4) 01106 { 01107 ACE_DEBUG ((LM_DEBUG, 01108 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ") 01109 ACE_TEXT ("discarded %d messages, %u bytes.\n"), 01110 this->id (), msg_count, byte_count)); 01111 } 01112 }
void TAO_Transport::clear_translators | ( | TAO_InputCDR * | , | |
TAO_OutputCDR * | ||||
) |
It is necessary to clear the codeset translator when a CDR stream is used for more than one GIOP message. This is required since the header must not be translated, whereas the body must be.
Definition at line 2567 of file Transport.cpp.
References ACE_OutputCDR::char_translator(), ACE_InputCDR::char_translator(), ACE_OutputCDR::wchar_translator(), and ACE_InputCDR::wchar_translator().
Referenced by TAO::Remote_Invocation::write_header().
02568 { 02569 if (inp) 02570 { 02571 inp->char_translator (0); 02572 inp->wchar_translator (0); 02573 } 02574 if (outp) 02575 { 02576 outp->char_translator (0); 02577 outp->wchar_translator (0); 02578 } 02579 }
void TAO_Transport::close_connection | ( | void | ) | [virtual] |
Call the implementation method after obtaining the lock.
Definition at line 329 of file Transport.cpp.
References TAO_Connection_Handler::close_connection(), and connection_handler_i().
Referenced by TAO_IIOP_Connector::complete_connection(), TAO::Synch_Twoway_Invocation::remote_twoway(), TAO_GIOP_Message_Base::send_close_connection(), TAO::Remote_Invocation::send_message(), send_message_shared(), and TAO::Synch_Twoway_Invocation::wait_for_reply().
00330 { 00331 this->connection_handler_i ()->close_connection (); 00332 }
ACE_INLINE TAO_Connection_Handler * TAO_Transport::connection_handler | ( | void | ) |
Get the connection handler for this transport.
Definition at line 181 of file Transport.inl.
References connection_handler_i().
Referenced by TAO_IIOP_Connector::complete_connection(), TAO::Remote_Invocation::send_message(), TAO_Connect_Strategy::wait(), and TAO_Connector::wait_for_connection_completion().
00182 { 00183 return this->connection_handler_i(); 00184 }
virtual TAO_Connection_Handler* TAO_Transport::connection_handler_i | ( | void | ) | [protected, pure virtual] |
int TAO_Transport::consolidate_enqueue_message | ( | TAO_Queued_Data * | qd | ) | [private] |
Definition at line 1716 of file Transport.cpp.
References ACE_ERROR, ACE_TEXT, LM_ERROR, TAO_Queued_Data::missing_data(), TAO_Queued_Data::more_fragments(), TAO_Queued_Data::msg_type(), TAO_Queued_Data::release(), and TAO_debug_level.
01717 { 01718 // consolidate message on top of stack, only for fragmented messages 01719 01720 // paranoid check 01721 if (q_data->missing_data () != 0) 01722 { 01723 return -1; 01724 } 01725 01726 if (q_data->more_fragments () || 01727 q_data->msg_type () == GIOP::Fragment) 01728 { 01729 TAO_Queued_Data *new_q_data = 0; 01730 01731 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data)) 01732 { 01733 case -1: // error 01734 return -1; 01735 01736 case 0: // returning consolidated message in new_q_data 01737 if (!new_q_data) 01738 { 01739 if (TAO_debug_level > 0) 01740 { 01741 ACE_ERROR ((LM_ERROR, 01742 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ") 01743 ACE_TEXT ("error, consolidated message is NULL\n"), 01744 this->id ())); 01745 } 01746 return -1; 01747 } 01748 01749 if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0) 01750 { 01751 TAO_Queued_Data::release (new_q_data); 01752 return -1; 01753 } 01754 break; 01755 01756 case 1: // fragment has been stored in messaging_oject() 01757 break; 01758 } 01759 } 01760 else 01761 { 01762 if (this->incoming_message_queue_.enqueue_tail (q_data) != 0) 01763 { 01764 TAO_Queued_Data::release (q_data); 01765 return -1; 01766 } 01767 } 01768 01769 return 0; // success 01770 }
int TAO_Transport::consolidate_process_message | ( | TAO_Queued_Data * | qd, | |
TAO_Resume_Handle & | rh | |||
) | [private] |
Definition at line 1629 of file Transport.cpp.
References ACE_ERROR, ACE_TEXT, LM_ERROR, TAO_Queued_Data::missing_data(), TAO_Queued_Data::more_fragments(), TAO_Queued_Data::msg_type(), TAO_Queued_Data::release(), and TAO_debug_level.
01631 { 01632 // paranoid check 01633 if (q_data->missing_data () != 0) 01634 { 01635 if (TAO_debug_level > 0) 01636 { 01637 ACE_ERROR ((LM_ERROR, 01638 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") 01639 ACE_TEXT ("missing data\n"), 01640 this->id ())); 01641 } 01642 return -1; 01643 } 01644 01645 if (q_data->more_fragments () || 01646 q_data->msg_type () == GIOP::Fragment) 01647 { 01648 // consolidate message on top of stack, only for fragmented messages 01649 TAO_Queued_Data *new_q_data = 0; 01650 01651 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data)) 01652 { 01653 case -1: // error 01654 return -1; 01655 01656 case 0: // returning consolidated message in q_data 01657 if (!new_q_data) 01658 { 01659 if (TAO_debug_level > 0) 01660 { 01661 ACE_ERROR ((LM_ERROR, 01662 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") 01663 ACE_TEXT ("error, consolidated message is NULL\n"), 01664 this->id ())); 01665 } 01666 return -1; 01667 } 01668 01669 01670 if (this->process_parsed_messages (new_q_data, rh) == -1) 01671 { 01672 TAO_Queued_Data::release (new_q_data); 01673 01674 if (TAO_debug_level > 0) 01675 { 01676 ACE_ERROR ((LM_ERROR, 01677 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") 01678 ACE_TEXT ("error processing consolidated message\n"), 01679 this->id ())); 01680 } 01681 return -1; 01682 } 01683 01684 TAO_Queued_Data::release (new_q_data); 01685 01686 break; 01687 01688 case 1: // fragment has been stored in messaging_oject() 01689 break; 01690 } 01691 } 01692 else 01693 { 01694 if (this->process_parsed_messages (q_data, rh) == -1) 01695 { 01696 TAO_Queued_Data::release (q_data); 01697 01698 if (TAO_debug_level > 0) 01699 { 01700 ACE_ERROR ((LM_ERROR, 01701 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") 01702 ACE_TEXT ("error processing message\n"), 01703 this->id ())); 01704 } 01705 return -1; 01706 } 01707 01708 TAO_Queued_Data::release (q_data); 01709 01710 } 01711 01712 return 0; 01713 }
int TAO_Transport::drain_queue | ( | ACE_Time_Value * | max_wait_time | ) | [private] |
Send some of the data in the queue.
As the outgoing data is drained this method is invoked to send as much of the current message as possible.
Returns 0 if there is more data to send, -1 if there was an error and 1 if the message was completely sent.
Definition at line 862 of file Transport.cpp.
References ACE_GUARD_RETURN, TAO_Flushing_Strategy::cancel_output(), drain_queue_i(), TAO_ORB_Core::flushing_strategy(), and orb_core().
Referenced by handle_output().
00863 { 00864 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); 00865 int const retval = this->drain_queue_i (max_wait_time); 00866 00867 if (retval == 1) 00868 { 00869 // ... there is no current message or it was completely 00870 // sent, cancel output... 00871 TAO_Flushing_Strategy *flushing_strategy = 00872 this->orb_core ()->flushing_strategy (); 00873 00874 flushing_strategy->cancel_output (this); 00875 00876 return 0; 00877 } 00878 00879 return retval; 00880 }
int TAO_Transport::drain_queue_helper | ( | int & | iovcnt, | |
iovec | iov[], | |||
ACE_Time_Value * | max_wait_time | |||
) | [private] |
A helper routine used in drain_queue_i().
Definition at line 883 of file Transport.cpp.
References ACE_DEBUG, ACE_TEXT, cleanup_queue(), dump_iov(), head_, LM_DEBUG, send(), sendfile(), sent_byte_count_, and TAO_debug_level.
Referenced by drain_queue_i().
00884 { 00885 size_t byte_count = 0; 00886 ACE_Countdown_Time countdown (max_wait_time); 00887 00888 // ... send the message ... 00889 ssize_t retval = -1; 00890 00891 #if TAO_HAS_SENDFILE == 1 00892 if (this->mmap_allocator_) 00893 retval = this->sendfile (this->mmap_allocator_, 00894 iov, 00895 iovcnt, 00896 byte_count); 00897 else 00898 #endif /* TAO_HAS_SENDFILE==1 */ 00899 retval = this->send (iov, iovcnt, byte_count, max_wait_time); 00900 00901 if (TAO_debug_level == 5) 00902 { 00903 dump_iov (iov, iovcnt, this->id (), 00904 byte_count, "drain_queue_helper"); 00905 } 00906 00907 // ... now we need to update the queue, removing elements 00908 // that have been sent, and updating the last element if it 00909 // was only partially sent ... 00910 this->cleanup_queue (byte_count); 00911 iovcnt = 0; 00912 00913 if (retval == 0) 00914 { 00915 if (TAO_debug_level > 4) 00916 { 00917 ACE_DEBUG ((LM_DEBUG, 00918 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") 00919 ACE_TEXT ("send() returns 0\n"), 00920 this->id ())); 00921 } 00922 return -1; 00923 } 00924 else if (retval == -1) 00925 { 00926 if (TAO_debug_level > 4) 00927 { 00928 ACE_DEBUG ((LM_DEBUG, 00929 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") 00930 ACE_TEXT ("error during send (errno: %d) - %m\n"), 00931 this->id (), errno)); 00932 } 00933 00934 if (errno == EWOULDBLOCK || errno == EAGAIN) 00935 { 00936 return 0; 00937 } 00938 00939 return -1; 00940 } 00941 00942 // ... start over, how do we guarantee progress? Because if 00943 // no bytes are sent send() can only return 0 or -1 00944 00945 // Total no. of bytes sent for a send call 00946 this->sent_byte_count_ += byte_count; 00947 00948 if (TAO_debug_level > 4) 00949 { 00950 ACE_DEBUG ((LM_DEBUG, 00951 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") 00952 ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"), 00953 this->id(), byte_count, (this->head_ == 0))); 00954 } 00955 00956 return 1; 00957 }
int TAO_Transport::drain_queue_i | ( | ACE_Time_Value * | max_wait_time | ) | [private] |
Implement drain_queue() assuming the lock is held.
Definition at line 960 of file Transport.cpp.
References ACE_DEBUG, ACE_IOV_MAX, ACE_TEXT, ACE_Reactor::cancel_timer(), TAO_Queued_Message::destroy(), drain_queue_helper(), TAO_Queued_Message::fill_iov(), ACE_High_Res_Timer::gettimeofday_hr(), head_, TAO_Queued_Message::is_expired(), TAO_LF_Event::LFS_TIMEOUT, LM_DEBUG, TAO_Queued_Message::next(), TAO_Queued_Message::remove_from_list(), reset_flush_timer(), sent_byte_count_, TAO_LF_Event::state_changed(), and TAO_debug_level.
Referenced by drain_queue(), send_message_block_chain_i(), and send_synch_message_helper_i().
00961 { 00962 // This is the vector used to send data, it must be declared outside 00963 // the loop because after the loop there may still be data to be 00964 // sent 00965 int iovcnt = 0; 00966 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) 00967 iovec iov[ACE_IOV_MAX] = { { 0 , 0 } }; 00968 #else 00969 iovec iov[ACE_IOV_MAX]; 00970 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ 00971 00972 // We loop over all the elements in the queue ... 00973 TAO_Queued_Message *i = this->head_; 00974 00975 // Reset the value so that the counting is done for each new send 00976 // call. 00977 this->sent_byte_count_ = 0; 00978 00979 // Avoid calling this expensive function each time through the loop. Instead 00980 // we'll assume that the time is unlikely to change much during the loop. 00981 // If we are forced to send in the loop then we'll recompute the time. 00982 ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr (); 00983 00984 while (i != 0) 00985 { 00986 if (i->is_expired (now)) 00987 { 00988 if (TAO_debug_level > 3) 00989 { 00990 ACE_DEBUG ((LM_DEBUG, 00991 ACE_TEXT ("TAO (%P|%t - Transport[%d]::drain_queue_i, ") 00992 ACE_TEXT ("Discarding expired queued message.\n"), 00993 this->id ())); 00994 } 00995 TAO_Queued_Message *next = i->next (); 00996 i->state_changed (TAO_LF_Event::LFS_TIMEOUT, 00997 this->orb_core_->leader_follower ()); 00998 i->remove_from_list (this->head_, this->tail_); 00999 i->destroy (); 01000 i = next; 01001 continue; 01002 } 01003 // ... each element fills the iovector ... 01004 i->fill_iov (ACE_IOV_MAX, iovcnt, iov); 01005 01006 // ... the vector is full, no choice but to send some data out. 01007 // We need to loop because a single message can span multiple 01008 // IOV_MAX elements ... 01009 if (iovcnt == ACE_IOV_MAX) 01010 { 01011 int const retval = this->drain_queue_helper (iovcnt, iov, 01012 max_wait_time); 01013 01014 now = ACE_High_Res_Timer::gettimeofday_hr (); 01015 01016 if (TAO_debug_level > 4) 01017 { 01018 ACE_DEBUG ((LM_DEBUG, 01019 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ") 01020 ACE_TEXT ("helper retval = %d\n"), 01021 this->id (), retval)); 01022 } 01023 01024 if (retval != 1) 01025 { 01026 return retval; 01027 } 01028 01029 i = this->head_; 01030 continue; 01031 } 01032 // ... notice that this line is only reached if there is still 01033 // room in the iovector ... 01034 i = i->next (); 01035 } 01036 01037 if (iovcnt != 0) 01038 { 01039 int const retval = this->drain_queue_helper (iovcnt, iov, max_wait_time); 01040 01041 if (TAO_debug_level > 4) 01042 { 01043 ACE_DEBUG ((LM_DEBUG, 01044 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ") 01045 ACE_TEXT ("helper retval = %d\n"), 01046 this->id (), retval)); 01047 } 01048 01049 if (retval != 1) 01050 { 01051 return retval; 01052 } 01053 } 01054 01055 if (this->head_ == 0) 01056 { 01057 if (this->flush_timer_pending ()) 01058 { 01059 ACE_Event_Handler *eh = this->event_handler_i (); 01060 ACE_Reactor * const reactor = eh->reactor (); 01061 reactor->cancel_timer (this->flush_timer_id_); 01062 this->reset_flush_timer (); 01063 } 01064 01065 return 1; 01066 } 01067 01068 return 0; 01069 }
virtual ACE_Event_Handler* TAO_Transport::event_handler_i | ( | void | ) | [pure virtual] |
Normally a concrete TAO_Transport object has-a ACE_Event_Handler member that functions as an adapter between the ACE_Reactor framework and the TAO pluggable protocol framework. In all the protocols implemented so far this role is fullfilled by an instance of ACE_Svc_Handler.
Implemented in TAO_IIOP_Transport.
Referenced by add_reference(), cancel_output_i(), check_buffering_constraints_i(), TAO::Transport_Cache_Manager::find_transport(), notify_reactor(), remove_reference(), schedule_output_i(), TAO_Connection_Handler::transport(), and TAO_Wait_On_Read::wait().
ACE_INLINE void TAO_Transport::first_request_sent | ( | ) |
Set the state of the first_request_ flag to 0.
Definition at line 164 of file Transport.inl.
References first_request_.
Referenced by TAO_IIOP_Transport::send_request().
00165 { 00166 this->first_request_ = 0; 00167 }
ACE_INLINE int TAO_Transport::flush_timer_pending | ( | void | ) | const [private] |
Check if the flush timer is still pending.
Definition at line 108 of file Transport.inl.
References flush_timer_id_.
00109 { 00110 return this->flush_timer_id_ != -1; 00111 }
int TAO_Transport::format_queue_message | ( | TAO_OutputCDR & | stream, | |
ACE_Time_Value * | max_wait_time | |||
) |
Format and queue a message for stream
max_wait_time | The maximum time that the operation can block, used in the implementation of timeouts. |
Definition at line 515 of file Transport.cpp.
References ACE_OutputCDR::begin(), and queue_message_i().
Referenced by TAO::Synch_Oneway_Invocation::remote_oneway().
00517 { 00518 if (this->messaging_object ()->format_message (stream) != 0) 00519 return -1; 00520 00521 return this->queue_message_i (stream.begin (), max_wait_time); 00522 }
int TAO_Transport::generate_locate_request | ( | TAO_Target_Specification & | spec, | |
TAO_Operation_Details & | opdetails, | |||
TAO_OutputCDR & | output | |||
) |
This is a request for the transport object to write a LocateRequest header before it is sent out.
Definition at line 388 of file Transport.cpp.
References ACE_DEBUG, ACE_TEXT, LM_DEBUG, and TAO_debug_level.
Referenced by TAO::LocateRequest_Invocation::invoke().
00392 { 00393 if (this->messaging_object ()->generate_locate_request_header (opdetails, 00394 spec, 00395 output) == -1) 00396 { 00397 if (TAO_debug_level > 0) 00398 { 00399 ACE_DEBUG ((LM_DEBUG, 00400 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ") 00401 ACE_TEXT ("error while marshalling the LocateRequest header\n"), 00402 this->id ())); 00403 } 00404 00405 return -1; 00406 } 00407 00408 return 0; 00409 }
int TAO_Transport::generate_request_header | ( | TAO_Operation_Details & | opd, | |
TAO_Target_Specification & | spec, | |||
TAO_OutputCDR & | msg | |||
) | [virtual] |
This is a request for the transport object to write a request header before it sends out the request
Reimplemented in TAO_IIOP_Transport.
Definition at line 412 of file Transport.cpp.
References ACE_DEBUG, ACE_TEXT, TAO_ORB_Core::codeset_manager(), TAO_Codeset_Manager::generate_service_context(), LM_DEBUG, orb_core(), and TAO_debug_level.
Referenced by TAO_IIOP_Transport::generate_request_header().
00416 { 00417 // codeset service context is only supposed to be sent in the first request 00418 // on a particular connection. 00419 if (this->first_request_) 00420 { 00421 TAO_Codeset_Manager * const csm = this->orb_core ()->codeset_manager (); 00422 if (csm) 00423 csm->generate_service_context (opdetails,*this); 00424 } 00425 00426 if (this->messaging_object ()->generate_request_header (opdetails, 00427 spec, 00428 output) == -1) 00429 { 00430 if (TAO_debug_level > 0) 00431 { 00432 ACE_DEBUG ((LM_DEBUG, 00433 ACE_TEXT ("(%P|%t) - Transport[%d]::generate_request_header, ") 00434 ACE_TEXT ("error while marshalling the Request header\n"), 00435 this->id())); 00436 } 00437 00438 return -1; 00439 } 00440 00441 return 0; 00442 }
int TAO_Transport::handle_input | ( | TAO_Resume_Handle & | rh, | |
ACE_Time_Value * | max_wait_time = 0 | |||
) | [virtual] |
Callback to read incoming data.
The ACE_Event_Handler adapter invokes this method as part of its handle_input() operation.
max_wait_time | In some cases the I/O is synchronous, e.g. a thread-per-connection server or when Wait_On_Read is enabled. In those cases a maximum read time can be specified. |
Definition at line 1555 of file Transport.cpp.
References ACE_DEBUG, ACE_ERROR, ACE_TEXT, LM_DEBUG, LM_ERROR, TAO_Queued_Data::missing_data(), process_queue_head(), TAO_debug_level, and TAO_MISSING_DATA_UNDEFINED.
Referenced by TAO_Connection_Handler::handle_input_internal(), TAO_Connection_Handler::svc_i(), and TAO_Wait_On_Read::wait().
01557 { 01558 if (TAO_debug_level > 3) 01559 { 01560 ACE_DEBUG ((LM_DEBUG, 01561 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"), 01562 this->id ())); 01563 } 01564 01565 // First try to process messages of the head of the incoming queue. 01566 int const retval = this->process_queue_head (rh); 01567 01568 if (retval <= 0) 01569 { 01570 if (retval == -1) 01571 { 01572 if (TAO_debug_level > 2) 01573 { 01574 ACE_DEBUG ((LM_DEBUG, 01575 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") 01576 ACE_TEXT ("error while parsing the head of the queue\n"), 01577 this->id())); 01578 01579 } 01580 return -1; 01581 } 01582 else 01583 { 01584 // retval == 0 01585 01586 // Processed a message in queue successfully. This 01587 // thread must return to thread-pool now. 01588 return 0; 01589 } 01590 } 01591 01592 TAO_Queued_Data *q_data = 0; 01593 01594 if (this->incoming_message_stack_.top (q_data) != -1 01595 && q_data->missing_data () != TAO_MISSING_DATA_UNDEFINED) 01596 { 01597 /* PRE: q_data->missing_data_ > 0 as all QD on stack must be incomplete */ 01598 if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1) 01599 { 01600 if (TAO_debug_level > 0) 01601 { 01602 ACE_ERROR ((LM_ERROR, 01603 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") 01604 ACE_TEXT ("error consolidating incoming message\n"), 01605 this->id ())); 01606 } 01607 return -1; 01608 } 01609 } 01610 else 01611 { 01612 if (this->handle_input_parse_data (rh, max_wait_time) == -1) 01613 { 01614 if (TAO_debug_level > 0) 01615 { 01616 ACE_ERROR ((LM_ERROR, 01617 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") 01618 ACE_TEXT ("error parsing incoming message\n"), 01619 this->id ())); 01620 } 01621 return -1; 01622 } 01623 } 01624 01625 return 0; 01626 }
int TAO_Transport::handle_input_missing_data | ( | TAO_Resume_Handle & | rh, | |
ACE_Time_Value * | max_wait_time, | |||
TAO_Queued_Data * | q_data | |||
) | [private] |
Is invoked by handle_input operation. It consolidate message on top of incoming_message_stack. The amount of missing data is known and recv operation copies data directly into message buffer, as much as a single recv-invocation provides.
Definition at line 1773 of file Transport.cpp.
References ACE_DEBUG, ACE_TEXT, ACE_CDR::grow(), ACE_Message_Block::length(), LM_DEBUG, TAO_Queued_Data::missing_data(), TAO_Queued_Data::msg_block(), recv(), recv_buffer_size_, ACE_Message_Block::space(), TAO_debug_level, and ACE_Message_Block::wr_ptr().
01776 { 01777 // paranoid check 01778 if (q_data == 0) 01779 { 01780 return -1; 01781 } 01782 01783 if (TAO_debug_level > 3) 01784 { 01785 ACE_DEBUG ((LM_DEBUG, 01786 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ") 01787 ACE_TEXT ("enter (missing data == %d)\n"), 01788 this->id (), q_data->missing_data ())); 01789 } 01790 01791 size_t const recv_size = q_data->missing_data (); 01792 01793 if (q_data->msg_block ()->space() < recv_size) 01794 { 01795 // make sure the message_block has enough space 01796 size_t const message_size = recv_size + q_data->msg_block ()->length(); 01797 01798 if (ACE_CDR::grow (q_data->msg_block (), message_size) == -1) 01799 { 01800 return -1; 01801 } 01802 } 01803 01804 // Saving the size of the received buffer in case any one needs to 01805 // get the size of the message thats received in the 01806 // context. Obviously the value will be changed for each recv call 01807 // and the user is supposed to invoke the accessor only in the 01808 // invocation context to get meaningful information. 01809 this->recv_buffer_size_ = recv_size; 01810 01811 // Read the message into the existing message block on heap 01812 ssize_t const n = this->recv (q_data->msg_block ()->wr_ptr(), 01813 recv_size, 01814 max_wait_time); 01815 01816 if (n <= 0) 01817 { 01818 return n; 01819 } 01820 01821 if (TAO_debug_level > 3) 01822 { 01823 ACE_DEBUG ((LM_DEBUG, 01824 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ") 01825 ACE_TEXT ("read bytes %d\n"), 01826 this->id (), n)); 01827 } 01828 01829 q_data->msg_block ()->wr_ptr(n); 01830 q_data->missing_data (q_data->missing_data () - n); 01831 01832 if (q_data->missing_data () == 0) 01833 { 01834 // paranoid check 01835 if (this->incoming_message_stack_.pop (q_data) == -1) 01836 { 01837 return -1; 01838 } 01839 01840 if (this->consolidate_process_message (q_data, rh) == -1) 01841 { 01842 return -1; 01843 } 01844 } 01845 01846 return 0; 01847 }
int TAO_Transport::handle_input_parse_data | ( | TAO_Resume_Handle & | rh, | |
ACE_Time_Value * | max_wait_time | |||
) | [private] |
Is invoked by handle_input operation. It parses new messages from input stream or consolidates messages whose header has been partially read, the message size being unknown so far. It parses as much data as a single recv-invocation provides.
Definition at line 1894 of file Transport.cpp.
References ACE_DEBUG, ACE_ERROR, ACE_TEXT, allocate_partial_message_block(), ACE_Message_Block::DONT_DELETE, TAO_Queued_Data::duplicate(), ACE_CDR::grow(), handle_input_parse_extra_messages(), TAO_GIOP_Message_Base::header_length(), incoming_message_stack_, TAO_ORB_Core::input_cdr_dblock_allocator(), ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO_ORB_Core::locking_strategy(), ACE_CDR::MAX_ALIGNMENT, ACE_CDR::mb_align(), ACE_Message_Block::MB_DATA, ACE_OS::memset(), messaging_object(), TAO_Queued_Data::missing_data(), TAO_Queued_Data::msg_block(), notify_reactor(), orb_core_, partial_message_, ACE_Message_Block::rd_ptr(), recv(), recv_buffer_size_, ACE_Message_Block::reset(), TAO_Resume_Handle::set_flag(), TAO_debug_level, TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED, TAO_Resume_Handle::TAO_HANDLE_RESUMABLE, TAO_MAXBUFSIZE, and TAO_MISSING_DATA_UNDEFINED.
01896 { 01897 01898 if (TAO_debug_level > 3) 01899 { 01900 ACE_DEBUG ((LM_DEBUG, 01901 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 01902 ACE_TEXT ("enter\n"), 01903 this->id ())); 01904 } 01905 01906 01907 // The buffer on the stack which will be used to hold the input 01908 // messages, ACE_CDR::MAX_ALIGNMENT compensates the 01909 // memory-alignment. This improves performance with SUN-Java-ORB-1.4 01910 // and higher that sends fragmented requests of size 1024 bytes. 01911 char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT]; 01912 01913 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) 01914 (void) ACE_OS::memset (buf, 01915 '\0', 01916 sizeof buf); 01917 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ 01918 01919 // Create a data block 01920 ACE_Data_Block db (sizeof (buf), 01921 ACE_Message_Block::MB_DATA, 01922 buf, 01923 this->orb_core_->input_cdr_buffer_allocator (), 01924 this->orb_core_->locking_strategy (), 01925 ACE_Message_Block::DONT_DELETE, 01926 this->orb_core_->input_cdr_dblock_allocator ()); 01927 01928 // Create a message block 01929 ACE_Message_Block message_block (&db, 01930 ACE_Message_Block::DONT_DELETE, 01931 this->orb_core_->input_cdr_msgblock_allocator ()); 01932 01933 01934 // Align the message block 01935 ACE_CDR::mb_align (&message_block); 01936 01937 size_t recv_size = 0; // Note: unsigned integer 01938 01939 // Pointer to newly parsed message 01940 TAO_Queued_Data *q_data = 0; 01941 01942 // optimizing access of constants 01943 size_t const header_length = this->messaging_object ()->header_length (); 01944 01945 // paranoid check 01946 if (header_length > message_block.space ()) 01947 { 01948 return -1; 01949 } 01950 01951 if (this->orb_core_->orb_params ()->single_read_optimization ()) 01952 { 01953 recv_size = message_block.space (); 01954 } 01955 else 01956 { 01957 // Single read optimization has been de-activated. That means 01958 // that we need to read from transport the GIOP header first 01959 // before the payload. This codes first checks the incoming 01960 // stack for partial messages which needs to be 01961 // consolidated. Otherwise we are in new cycle, reading complete 01962 // GIOP header of new incoming message. 01963 if (this->incoming_message_stack_.top (q_data) != -1 01964 && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED) 01965 { 01966 // There is a partial message on incoming_message_stack_ 01967 // whose length is unknown so far. We need to consolidate 01968 // the GIOP header to get to know the payload size, 01969 recv_size = header_length - q_data->msg_block ()->length (); 01970 } 01971 else 01972 { 01973 // Read amount of data forming GIOP header of new incoming 01974 // message. 01975 recv_size = header_length; 01976 } 01977 // POST: 0 <= recv_size <= header_length 01978 } 01979 // POST: 0 <= recv_size <= message_block->space () 01980 01981 // If we have a partial message, copy it into our message block and 01982 // clear out the partial message. 01983 if (this->partial_message_ != 0 && this->partial_message_->length () > 0) 01984 { 01985 // (*) Copy back the partial message into current read-buffer, 01986 // verify that the read-strategy of "recv_size" bytes is not 01987 // exceeded. The latter check guarantees that recv_size does not 01988 // roll-over and keeps in range 01989 // 0<=recv_size<=message_block->space() 01990 if (this->partial_message_->length () <= recv_size && 01991 message_block.copy (this->partial_message_->rd_ptr (), 01992 this->partial_message_->length ()) == 0) 01993 { 01994 01995 recv_size -= this->partial_message_->length (); 01996 this->partial_message_->reset (); 01997 } 01998 else 01999 { 02000 return -1; 02001 } 02002 } 02003 // POST: 0 <= recv_size <= buffer_space 02004 02005 if (0 >= recv_size) // paranoid: the check above (*) guarantees recv_size>=0 02006 { 02007 // This event would cause endless looping, trying frequently to 02008 // read zero bytes from stream. This might happen, if TAOs 02009 // protocol implementation is not correct and tries to read data 02010 // beyond header without "single_read_optimazation" being 02011 // activated. 02012 if (TAO_debug_level > 0) 02013 { 02014 ACE_ERROR ((LM_ERROR, 02015 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 02016 ACE_TEXT ("Error - endless loop detection, closing connection"), 02017 this->id ())); 02018 } 02019 return -1; 02020 } 02021 02022 // Saving the size of the received buffer in case any one needs to 02023 // get the size of the message thats received in the 02024 // context. Obviously the value will be changed for each recv call 02025 // and the user is supposed to invoke the accessor only in the 02026 // invocation context to get meaningful information. 02027 this->recv_buffer_size_ = recv_size; 02028 02029 // Read the message into the message block that we have created on 02030 // the stack. 02031 ssize_t const n = this->recv (message_block.wr_ptr (), 02032 recv_size, 02033 max_wait_time); 02034 02035 // If there is an error return to the reactor.. 02036 if (n <= 0) 02037 { 02038 return n; 02039 } 02040 02041 if (TAO_debug_level > 3) 02042 { 02043 ACE_DEBUG ((LM_DEBUG, 02044 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 02045 ACE_TEXT ("read %d bytes\n"), 02046 this->id (), n)); 02047 } 02048 02049 // Set the write pointer in the stack buffer 02050 message_block.wr_ptr (n); 02051 02052 // 02053 // STACK PROCESSING OR MESSAGE CONSOLIDATION 02054 // 02055 02056 // PRE: data in buffer is aligned && message_block.length() > 0 02057 02058 if (this->incoming_message_stack_.top (q_data) != -1 02059 && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED) 02060 { 02061 // 02062 // MESSAGE CONSOLIDATION 02063 // 02064 02065 // Partial message on incoming_message_stack_ needs to be 02066 // consolidated. The message header could not be parsed so far 02067 // and therefor the message size is unknown yet. Consolidating 02068 // the message destroys the memory alignment of succeeding 02069 // messages sharing the buffer, for that reason consolidation 02070 // and stack based processing are mutial exclusive. 02071 if (this->messaging_object ()->consolidate_node (q_data, 02072 message_block) == -1) 02073 { 02074 if (TAO_debug_level > 0) 02075 { 02076 ACE_ERROR ((LM_ERROR, 02077 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 02078 ACE_TEXT ("error consolidating message from input buffer\n"), 02079 this->id () )); 02080 } 02081 return -1; 02082 } 02083 02084 // Complete message are to be enqueued and later processed 02085 if (q_data->missing_data () == 0) 02086 { 02087 if (this->incoming_message_stack_.pop (q_data) == -1) 02088 { 02089 return -1; 02090 } 02091 02092 if (this->consolidate_enqueue_message (q_data) == -1) 02093 { 02094 return -1; 02095 } 02096 } 02097 02098 if (message_block.length () > 0 02099 && this->handle_input_parse_extra_messages (message_block) == -1) 02100 { 02101 return -1; 02102 } 02103 02104 // In any case try to process the enqueued messages 02105 if (this->process_queue_head (rh) == -1) 02106 { 02107 return -1; 02108 } 02109 } 02110 else 02111 { 02112 // 02113 // STACK PROCESSING (critical path) 02114 // 02115 02116 // Process the first message in buffer on stack 02117 02118 // (PRE: first message resides in aligned memory) Make a node of 02119 // the message-block.. 02120 02121 TAO_Queued_Data qd (&message_block, 02122 this->orb_core_->transport_message_buffer_allocator ()); 02123 02124 size_t mesg_length = 0; 02125 02126 if (this->messaging_object ()->parse_next_message (qd, 02127 mesg_length) == -1 02128 || (qd.missing_data () == 0 02129 && mesg_length > message_block.length ()) ) 02130 { 02131 // extracting message failed 02132 return -1; 02133 } 02134 // POST: qd.missing_data_ == 0 --> mesg_length <= message_block.length() 02135 // This prevents seeking rd_ptr behind the wr_ptr 02136 02137 if (qd.missing_data () != 0 || 02138 qd.more_fragments () || 02139 qd.msg_type () == GIOP::Fragment) 02140 { 02141 if (qd.missing_data () == 0) 02142 { 02143 // Dealing with a fragment 02144 TAO_Queued_Data *nqd = 02145 TAO_Queued_Data::duplicate (qd); 02146 02147 if (nqd == 0) 02148 { 02149 return -1; 02150 } 02151 02152 // mark the end of message in new buffer 02153 char* end_mark = nqd->msg_block ()->rd_ptr () 02154 + mesg_length; 02155 nqd->msg_block ()->wr_ptr (end_mark); 02156 02157 // move the read pointer forward in old buffer 02158 message_block.rd_ptr (mesg_length); 02159 02160 // enqueue the message 02161 if (this->consolidate_enqueue_message (nqd) == -1) 02162 { 02163 return -1; 02164 } 02165 02166 if (message_block.length () > 0 02167 && this->handle_input_parse_extra_messages (message_block) == -1) 02168 { 02169 return -1; 02170 } 02171 02172 // In any case try to process the enqueued messages 02173 if (this->process_queue_head (rh) == -1) 02174 { 02175 return -1; 02176 } 02177 } 02178 else if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED) 02179 { 02180 // Incomplete message, must be the last one in buffer 02181 02182 if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED && 02183 qd.missing_data () > message_block.space ()) 02184 { 02185 // Re-Allocate correct size on heap 02186 if (ACE_CDR::grow (qd.msg_block (), 02187 message_block.length () 02188 + qd.missing_data ()) == -1) 02189 { 02190 return -1; 02191 } 02192 } 02193 02194 TAO_Queued_Data *nqd = 02195 TAO_Queued_Data::duplicate (qd); 02196 02197 if (nqd == 0) 02198 { 02199 return -1; 02200 } 02201 02202 // move read-pointer to end of buffer 02203 message_block.rd_ptr (message_block.length()); 02204 02205 this->incoming_message_stack_.push (nqd); 02206 } 02207 } 02208 else 02209 { 02210 // 02211 // critical path 02212 // 02213 02214 // We cant process the message on stack right now. First we 02215 // have got to parse extra messages from message_block, 02216 // putting them into queue. When this is done we can return 02217 // to process this message, and notifying other threads to 02218 // process the messages in queue. 02219 02220 char * end_marker = message_block.rd_ptr () 02221 + mesg_length; 02222 02223 if (message_block.length () > mesg_length) 02224 { 02225 // There are more message in data stream to be parsed. 02226 // Safe the rd_ptr to restore later. 02227 char *rd_ptr_stack_mesg = message_block.rd_ptr (); 02228 02229 // Skip parsed message, jump to next message in buffer 02230 // PRE: mesg_length <= message_block.length () 02231 message_block.rd_ptr (mesg_length); 02232 02233 // Extract remaining messages and enqueue them for later 02234 // heap processing 02235 if (this->handle_input_parse_extra_messages (message_block) == -1) 02236 { 02237 return -1; 02238 } 02239 02240 // correct the end_marker 02241 end_marker = message_block.rd_ptr (); 02242 02243 // Restore rd_ptr 02244 message_block.rd_ptr (rd_ptr_stack_mesg); 02245 } 02246 02247 // The following if-else has been copied from 02248 // process_queue_head(). While process_queue_head() 02249 // processes message on heap, here we will process a message 02250 // on stack. 02251 02252 // Now that we have one message on stack to be processed, 02253 // check whether we have one more message in the queue... 02254 if (this->incoming_message_queue_.queue_length () > 0) 02255 { 02256 if (TAO_debug_level > 0) 02257 { 02258 ACE_DEBUG ((LM_DEBUG, 02259 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 02260 ACE_TEXT ("notify reactor\n"), 02261 this->id ())); 02262 02263 } 02264 02265 const int retval = this->notify_reactor (); 02266 02267 if (retval == 1) 02268 { 02269 // Let the class know that it doesn't need to resume the 02270 // handle.. 02271 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); 02272 } 02273 else if (retval < 0) 02274 return -1; 02275 } 02276 else 02277 { 02278 // As there are no further messages in queue just resume 02279 // the handle. Set the flag incase someone had reset the flag.. 02280 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); 02281 } 02282 02283 // PRE: incoming_message_queue is empty 02284 if (this->process_parsed_messages (&qd, 02285 rh) == -1) 02286 { 02287 return -1; 02288 } 02289 02290 // move the rd_ptr tp position of end_marker 02291 message_block.rd_ptr (end_marker); 02292 } 02293 } 02294 02295 // Now that all cases have been processed, there might be kept some data 02296 // in buffer that needs to be safed for next "handle_input" invocations. 02297 if (message_block.length () > 0) 02298 { 02299 if (this->partial_message_ == 0) 02300 { 02301 this->allocate_partial_message_block (); 02302 } 02303 02304 if (this->partial_message_ != 0 && 02305 this->partial_message_->copy (message_block.rd_ptr (), 02306 message_block.length ()) == 0) 02307 { 02308 message_block.rd_ptr (message_block.length ()); 02309 } 02310 else 02311 { 02312 return -1; 02313 } 02314 } 02315 02316 return 0; 02317 }
int TAO_Transport::handle_input_parse_extra_messages | ( | ACE_Message_Block & | message_block | ) | [private] |
Is invoked by handle_input_parse_data. Parses all messages remaining in message_block.
Definition at line 1851 of file Transport.cpp.
References TAO_GIOP_Message_Base::extract_next_message(), incoming_message_stack_, ACE_Message_Block::length(), messaging_object(), and TAO_Queued_Data::missing_data().
Referenced by handle_input_parse_data().
01852 { 01853 01854 // store buffer status of last extraction: -1 parse error, 0 01855 // incomplete message header in buffer, 1 complete messages header 01856 // parsed 01857 int buf_status = 0; 01858 01859 TAO_Queued_Data *q_data = 0; // init 01860 01861 // parse buffer until all messages have been extracted, consolidate 01862 // and enqueue complete messages, if the last message being parsed 01863 // has missin data, it is stays on top of incoming_message_stack. 01864 while (message_block.length () > 0 && 01865 (buf_status = this->messaging_object ()->extract_next_message 01866 (message_block, q_data)) != -1 && 01867 q_data != 0) // paranoid check 01868 { 01869 if (q_data->missing_data () == 0) 01870 { 01871 if (this->consolidate_enqueue_message (q_data) == -1) 01872 { 01873 return -1; 01874 } 01875 } 01876 else // incomplete message read, probably the last message in buffer 01877 { 01878 // can not fail 01879 this->incoming_message_stack_.push (q_data); 01880 } 01881 01882 q_data = 0; // reset 01883 } // while 01884 01885 if (buf_status == -1) 01886 { 01887 return -1; 01888 } 01889 01890 return 0; 01891 }
int TAO_Transport::handle_output | ( | ACE_Time_Value * | max_wait_time | ) |
Callback method to reactively drain the outgoing data queue.
Definition at line 487 of file Transport.cpp.
References ACE_DEBUG, ACE_TEXT, drain_queue(), LM_DEBUG, and TAO_debug_level.
Referenced by TAO_Block_Flushing_Strategy::flush_message(), TAO_Block_Flushing_Strategy::flush_transport(), and TAO_Connection_Handler::handle_output_eh().
00488 { 00489 if (TAO_debug_level > 3) 00490 { 00491 ACE_DEBUG ((LM_DEBUG, 00492 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output\n"), 00493 this->id ())); 00494 } 00495 00496 // The flushing strategy (potentially via the Reactor) wants to send 00497 // more data, first check if there is a current message that needs 00498 // more sending... 00499 int const retval = this->drain_queue (max_wait_time); 00500 00501 if (TAO_debug_level > 3) 00502 { 00503 ACE_DEBUG ((LM_DEBUG, 00504 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ") 00505 ACE_TEXT ("drain_queue returns %d/%d\n"), 00506 this->id (), 00507 retval, errno)); 00508 } 00509 00510 // Any errors are returned directly to the Reactor 00511 return retval; 00512 }
int TAO_Transport::handle_timeout | ( | const ACE_Time_Value & | current_time, | |
const void * | act | |||
) |
current_time | The current time as reported from the Reactor | |
act | The Asynchronous Completion Token. Currently it is interpreted as follows:
|
This is the only legal ACT in the current configuration....
Definition at line 821 of file Transport.cpp.
References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT, TAO_Flushing_Strategy::flush_transport(), TAO_ORB_Core::flushing_strategy(), LM_DEBUG, TAO_Flushing_Strategy::MUST_FLUSH, orb_core(), TAO_Flushing_Strategy::schedule_output(), and TAO_debug_level.
Referenced by TAO_Transport_Timer::handle_timeout().
00823 { 00824 if (TAO_debug_level > 6) 00825 { 00826 ACE_DEBUG ((LM_DEBUG, 00827 ACE_TEXT ("TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, ") 00828 ACE_TEXT ("timer expired\n"), 00829 this->id ())); 00830 } 00831 00832 /// This is the only legal ACT in the current configuration.... 00833 if (act != &this->current_deadline_) 00834 { 00835 return -1; 00836 } 00837 00838 if (this->flush_timer_pending ()) 00839 { 00840 // The timer is always a oneshot timer, so mark is as not 00841 // pending. 00842 this->reset_flush_timer (); 00843 00844 TAO_Flushing_Strategy *flushing_strategy = 00845 this->orb_core ()->flushing_strategy (); 00846 int const result = flushing_strategy->schedule_output (this); 00847 if (result == TAO_Flushing_Strategy::MUST_FLUSH) 00848 { 00849 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; 00850 TAO_REVERSE_LOCK reverse (*this->handler_lock_); 00851 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); 00852 if (flushing_strategy->flush_transport (this, 0) == -1) { 00853 return -1; 00854 } 00855 } 00856 } 00857 00858 return 0; 00859 }
ACE_INLINE void TAO_Transport::id | ( | size_t | id | ) |
ACE_INLINE size_t TAO_Transport::id | ( | void | ) | const |
Set and Get the identifier for this transport instance.
If not set, this will return an integer representation of the this
pointer for the instance on which it's called.
Definition at line 86 of file Transport.inl.
References id_.
Referenced by TAO::Transport_Cache_Manager::bind_i(), TAO_Connection_Handler::close_connection_eh(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Transport_Cache_Manager::find_transport(), TAO_Connection_Handler::handle_input_internal(), TAO_Connector::parallel_connect(), TAO_GIOP_Message_Base::process_reply_message(), TAO::Transport_Cache_Manager::purge(), TAO_GIOP_Message_Base::send_close_connection(), TAO_GIOP_Message_Base::send_error(), TAO_Connector::wait_for_connection_completion(), and TAO_Leader_Follower::wait_for_event().
00087 { 00088 return this->id_; 00089 }
bool TAO_Transport::idle_after_reply | ( | void | ) |
Request is sent and the reply is received. Idle the transport now.
Definition at line 271 of file Transport.cpp.
References TAO_Transport_Mux_Strategy::idle_after_reply(), and tms().
Referenced by TAO::Synch_Twoway_Invocation::remote_twoway().
00272 { 00273 return this->tms ()->idle_after_reply (); 00274 }
bool TAO_Transport::idle_after_send | ( | void | ) |
Request has been just sent, but the reply is not received. Idle the transport now.
Definition at line 265 of file Transport.cpp.
References TAO_Transport_Mux_Strategy::idle_after_send(), and tms().
Referenced by TAO::Synch_Twoway_Invocation::remote_twoway().
00266 { 00267 return this->tms ()->idle_after_send (); 00268 }
ACE_INLINE bool TAO_Transport::is_connected | ( | void | ) | const |
Is this transport really connected.
Definition at line 170 of file Transport.inl.
References ACE_GUARD_RETURN, and is_connected_.
Referenced by TAO::Cache_IntId::Cache_IntId(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Synch_Oneway_Invocation::remote_oneway(), and TAO_Connector::wait_for_connection_completion().
00171 { 00172 ACE_GUARD_RETURN (ACE_Lock, 00173 ace_mon, 00174 *this->handler_lock_, 00175 false); 00176 00177 return this->is_connected_; 00178 }
ACE_INLINE CORBA::Boolean TAO_Transport::is_tcs_set | ( | ) | const |
Return true if the tcs has been set.
Definition at line 158 of file Transport.inl.
References tcs_set_.
00159 { 00160 return tcs_set_; 00161 }
int TAO_Transport::make_idle | ( | void | ) |
Cache management.
Definition at line 463 of file Transport.cpp.
References ACE_DEBUG, ACE_TEXT, LM_DEBUG, TAO_debug_level, and transport_cache_manager().
Referenced by TAO_IIOP_Connection_Handler::process_listen_point_list().
00464 { 00465 if (TAO_debug_level > 3) 00466 { 00467 ACE_DEBUG ((LM_DEBUG, 00468 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"), 00469 this->id ())); 00470 } 00471 00472 return this->transport_cache_manager ().make_idle (this->cache_map_entry_); 00473 }
void TAO_Transport::messaging_init | ( | TAO_GIOP_Message_Version const & | version | ) |
Initialising the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects.
Definition at line 2600 of file Transport.cpp.
References TAO_GIOP_Message_Base::init(), TAO_GIOP_Message_Version::major, messaging_object(), and TAO_GIOP_Message_Version::minor.
02601 { 02602 this->messaging_object ()->init (version.major, version.minor); 02603 }
ACE_INLINE TAO_GIOP_Message_Base * TAO_Transport::messaging_object | ( | void | ) |
Return the messaging object that is used to format the data that needs to be sent.
Definition at line 121 of file Transport.inl.
References messaging_object_.
Referenced by allocate_partial_message_block(), handle_input_parse_data(), handle_input_parse_extra_messages(), messaging_init(), and out_stream().
00122 { 00123 return this->messaging_object_; 00124 }
int TAO_Transport::notify_reactor | ( | void | ) | [private] |
Definition at line 2508 of file Transport.cpp.
References ACE_DEBUG, ACE_TEXT, event_handler_i(), LM_DEBUG, ACE_Reactor::notify(), orb_core(), TAO_ORB_Core::reactor(), ACE_Event_Handler::READ_MASK, and TAO_debug_level.
Referenced by handle_input_parse_data(), and process_queue_head().
02509 { 02510 if (!this->ws_->is_registered ()) 02511 { 02512 return 0; 02513 } 02514 02515 ACE_Event_Handler *eh = this->event_handler_i (); 02516 02517 // Get the reactor associated with the event handler 02518 ACE_Reactor *reactor = this->orb_core ()->reactor (); 02519 02520 if (TAO_debug_level > 0) 02521 { 02522 ACE_DEBUG ((LM_DEBUG, 02523 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ") 02524 ACE_TEXT ("notify to Reactor\n"), 02525 this->id ())); 02526 } 02527 02528 02529 // Send a notification to the reactor... 02530 int const retval = reactor->notify (eh, ACE_Event_Handler::READ_MASK); 02531 02532 if (retval < 0 && TAO_debug_level > 2) 02533 { 02534 // @todo: need to think about what is the action that 02535 // we can take when we get here. 02536 ACE_DEBUG ((LM_DEBUG, 02537 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ") 02538 ACE_TEXT ("notify to the reactor failed..\n"), 02539 this->id ())); 02540 } 02541 02542 return 1; 02543 }
ACE_INLINE void TAO_Transport::opened_as | ( | TAO::Connection_Role | ) |
Definition at line 51 of file Transport.inl.
References opening_connection_role_.
00052 { 00053 this->opening_connection_role_ = role; 00054 }
ACE_INLINE TAO::Connection_Role TAO_Transport::opened_as | ( | void | ) | const |
Methods dealing with the role of the connection, e.g., CLIENT or SERVER. See CORBA 2.6 Specification, Section 15.5.1 for origin of definitions.
Definition at line 45 of file Transport.inl.
References opening_connection_role_.
Referenced by TAO_Connector::connect(), and TAO_IIOP_Connection_Handler::open().
00046 { 00047 return this->opening_connection_role_; 00048 }
void TAO_Transport::operator= | ( | const TAO_Transport & | ) | [private] |
ACE_INLINE TAO_ORB_Core * TAO_Transport::orb_core | ( | void | ) | const |
Access the ORB that owns this connection.
Definition at line 14 of file Transport.inl.
References orb_core_.
Referenced by TAO::Wait_On_LF_No_Upcall::can_process_upcalls(), TAO_Connection_Handler::close_connection_eh(), drain_queue(), TAO::Transport_Cache_Manager::find_transport(), TAO_Reactive_Flushing_Strategy::flush_message(), TAO_Leader_Follower_Flushing_Strategy::flush_message(), TAO_Reactive_Flushing_Strategy::flush_transport(), TAO_Leader_Follower_Flushing_Strategy::flush_transport(), generate_request_header(), handle_timeout(), notify_reactor(), send_asynchronous_message_i(), send_reply_message_i(), TAO_IIOP_Transport::send_request(), send_synchronous_message_i(), TAO_IIOP_Transport::set_bidir_context_info(), TAO_Transport(), TAO_Wait_On_Read::wait(), TAO_Wait_On_Reactor::wait(), and TAO_Wait_On_Leader_Follower::wait().
00015 { 00016 return this->orb_core_; 00017 }
TAO_OutputCDR & TAO_Transport::out_stream | ( | void | ) |
Accessor for the output CDR stream.
Definition at line 2594 of file Transport.cpp.
References messaging_object(), and TAO_GIOP_Message_Base::out_stream().
Referenced by TAO::LocateRequest_Invocation::invoke(), TAO::Synch_Oneway_Invocation::remote_oneway(), and TAO::Synch_Twoway_Invocation::remote_twoway().
02595 { 02596 return this->messaging_object ()->out_stream (); 02597 }
bool TAO_Transport::post_connect_hook | ( | void | ) | [virtual] |
Hooks that can be overridden in concrete transports.
These hooks are invoked just after connection establishment (or after a connection is fetched from cache). The return value signifies whether the invoker should proceed with post connection establishment activities. Protocols like SSLIOP need this to verify whether connections already established have valid certificates. There are no pre_connect_hooks () since the transport doesn't exist before a connection establishment. :-)
Definition at line 323 of file Transport.cpp.
Referenced by TAO_Connector::connect().
bool TAO_Transport::post_open | ( | size_t | id | ) |
Perform all the actions when this transport get opened.
Definition at line 2617 of file Transport.cpp.
References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT, id_, is_connected_, LM_ERROR, purge_entry(), and TAO_debug_level.
02618 { 02619 this->id_ = id; 02620 02621 { 02622 ACE_GUARD_RETURN (ACE_Lock, 02623 ace_mon, 02624 *this->handler_lock_, 02625 false); 02626 this->is_connected_ = true; 02627 } 02628 02629 // When we have data in our outgoing queue schedule ourselves 02630 // for output 02631 if (this->queue_is_empty_i ()) 02632 return true; 02633 02634 // If the wait strategy wants us to be registered with the reactor 02635 // then we do so. If registeration is required and it succeeds, 02636 // #REFCOUNT# becomes two. 02637 if (this->wait_strategy ()->register_handler () != 0) 02638 { 02639 // Registration failures. 02640 02641 // Purge from the connection cache, if we are not in the cache, this 02642 // just does nothing. 02643 (void) this->purge_entry (); 02644 02645 // Close the handler. 02646 (void) this->close_connection (); 02647 02648 if (TAO_debug_level > 0) 02649 ACE_ERROR ((LM_ERROR, 02650 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_connect , ") 02651 ACE_TEXT ("could not register the transport ") 02652 ACE_TEXT ("in the reactor.\n"), 02653 this->id ())); 02654 02655 return false; 02656 } 02657 02658 return true; 02659 }
void TAO_Transport::pre_close | ( | void | ) |
do what needs to be done when closing the transport
Definition at line 2606 of file Transport.cpp.
References ACE_GUARD, cleanup_queue_i(), is_connected_, and purge_entry().
Referenced by TAO_Connection_Handler::close_connection_eh().
02607 { 02608 this->is_connected_ = false; 02609 this->purge_entry (); 02610 { 02611 ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_)); 02612 this->cleanup_queue_i (); 02613 } 02614 }
int TAO_Transport::process_parsed_messages | ( | TAO_Queued_Data * | qd, | |
TAO_Resume_Handle & | rh | |||
) | [protected] |
Process the message by sending it to the higher layers of the ORB.
Definition at line 2321 of file Transport.cpp.
References ACE_DEBUG, ACE_ERROR, ACE_TEXT, ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO::Transport::Stats::messages_received(), TAO_Queued_Data::missing_data(), TAO_Queued_Data::msg_block(), TAO_Queued_Data::msg_type(), TAO_Resume_Handle::resume_handle(), stats_, and TAO_debug_level.
02323 { 02324 if (TAO_debug_level > 7) 02325 { 02326 ACE_DEBUG ((LM_DEBUG, 02327 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02328 ACE_TEXT ("entering (missing data == %d)\n"), 02329 this->id(), qd->missing_data ())); 02330 } 02331 02332 #if TAO_HAS_TRANSPORT_CURRENT == 1 02333 // Update stats, if any 02334 if (this->stats_ != 0) 02335 this->stats_->messages_received (qd->msg_block ()->length ()); 02336 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 02337 02338 switch (qd->msg_type ()) 02339 { 02340 case GIOP::CloseConnection: 02341 { 02342 if (TAO_debug_level > 0) 02343 ACE_DEBUG ((LM_DEBUG, 02344 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02345 ACE_TEXT ("received CloseConnection message - %m\n"), 02346 this->id())); 02347 02348 // Return a "-1" so that the next stage can take care of 02349 // closing connection and the necessary memory management. 02350 return -1; 02351 } 02352 break; 02353 case GIOP::Request: 02354 case GIOP::LocateRequest: 02355 { 02356 // Let us resume the handle before we go ahead to process the 02357 // request. This will open up the handle for other threads. 02358 rh.resume_handle (); 02359 02360 if (this->messaging_object ()->process_request_message ( 02361 this, 02362 qd) == -1) 02363 { 02364 // Return a "-1" so that the next stage can take care of 02365 // closing connection and the necessary memory management. 02366 return -1; 02367 } 02368 } 02369 break; 02370 case GIOP::Reply: 02371 case GIOP::LocateReply: 02372 { 02373 rh.resume_handle (); 02374 02375 TAO_Pluggable_Reply_Params params (this); 02376 02377 if (this->messaging_object ()->process_reply_message (params, qd) == -1) 02378 { 02379 if (TAO_debug_level > 0) 02380 ACE_DEBUG ((LM_DEBUG, 02381 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02382 ACE_TEXT ("error in process_reply_message - %m\n"), 02383 this->id ())); 02384 02385 return -1; 02386 } 02387 02388 } 02389 break; 02390 case GIOP::CancelRequest: 02391 { 02392 // The associated request might be incomplete residing 02393 // fragmented in messaging object. We must make sure the 02394 // resources allocated by fragments are released. 02395 if (this->messaging_object ()->discard_fragmented_message (qd) == -1) 02396 { 02397 if (TAO_debug_level > 0) 02398 { 02399 ACE_ERROR ((LM_ERROR, 02400 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02401 ACE_TEXT ("error processing CancelRequest\n"), 02402 this->id ())); 02403 } 02404 } 02405 02406 // We are not able to cancel requests being processed already; 02407 // this is declared as optional feature by CORBA, and TAO does 02408 // not support this currently. 02409 02410 // Just continue processing, CancelRequest does not mean to cut 02411 // off the connection. 02412 } 02413 break; 02414 case GIOP::MessageError: 02415 { 02416 if (TAO_debug_level > 0) 02417 { 02418 ACE_ERROR ((LM_ERROR, 02419 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02420 ACE_TEXT ("received MessageError, closing connection\n"), 02421 this->id ())); 02422 } 02423 return -1; 02424 } 02425 break; 02426 case GIOP::Fragment: 02427 { 02428 // Nothing to be done. 02429 } 02430 break; 02431 } 02432 02433 // If not, just return back.. 02434 return 0; 02435 }
int TAO_Transport::process_queue_head | ( | TAO_Resume_Handle & | rh | ) | [private] |
Definition at line 2438 of file Transport.cpp.
References ACE_DEBUG, ACE_TEXT, incoming_message_queue_, LM_DEBUG, notify_reactor(), TAO_Incoming_Message_Queue::queue_length(), TAO_Queued_Data::release(), TAO_Resume_Handle::set_flag(), TAO_debug_level, TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED, and TAO_Resume_Handle::TAO_HANDLE_RESUMABLE.
Referenced by handle_input().
02439 { 02440 if (TAO_debug_level > 3) 02441 { 02442 ACE_DEBUG ((LM_DEBUG, 02443 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"), 02444 this->id (), this->incoming_message_queue_.queue_length () )); 02445 } 02446 02447 // See if message in queue ... 02448 if (this->incoming_message_queue_.queue_length () > 0) 02449 { 02450 // Get the message on the head of the queue.. 02451 TAO_Queued_Data *qd = 02452 this->incoming_message_queue_.dequeue_head (); 02453 02454 if (TAO_debug_level > 3) 02455 { 02456 ACE_DEBUG ((LM_DEBUG, 02457 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ") 02458 ACE_TEXT ("the size of the queue is [%d]\n"), 02459 this->id (), 02460 this->incoming_message_queue_.queue_length())); 02461 } 02462 // Now that we have pulled out out one message out of the queue, 02463 // check whether we have one more message in the queue... 02464 if (this->incoming_message_queue_.queue_length () > 0) 02465 { 02466 if (TAO_debug_level > 0) 02467 { 02468 ACE_DEBUG ((LM_DEBUG, 02469 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ") 02470 ACE_TEXT ("notify reactor\n"), 02471 this->id ())); 02472 } 02473 02474 int const retval = this->notify_reactor (); 02475 02476 if (retval == 1) 02477 { 02478 // Let the class know that it doesn't need to resume the 02479 // handle.. 02480 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); 02481 } 02482 else if (retval < 0) 02483 return -1; 02484 } 02485 else 02486 { 02487 // As we are ready to process the last message just resume 02488 // the handle. Set the flag incase someone had reset the flag.. 02489 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); 02490 } 02491 02492 // Process the message... 02493 if (this->process_parsed_messages (qd, rh) == -1) 02494 { 02495 return -1; 02496 } 02497 02498 // Delete the Queued_Data.. 02499 TAO_Queued_Data::release (qd); 02500 02501 return 0; 02502 } 02503 02504 return 1; 02505 }
bool TAO_Transport::provide_blockable_handler | ( | TAO::Connection_Handler_Set & | handlers | ) |
Called by the cache when the ORB is shuting down.
handlers | The TAO_Connection_Handler_Set into which the transport should place its handler if the transport has RW strategy on. |
Definition at line 251 of file Transport.cpp.
References opening_connection_role_, and TAO::TAO_SERVER_ROLE.
00252 { 00253 if (this->ws_->non_blocking () || 00254 this->opening_connection_role_ == TAO::TAO_SERVER_ROLE) 00255 return false; 00256 00257 (void) this->add_reference (); 00258 00259 h.insert (this->connection_handler_i ()); 00260 00261 return true; 00262 }
void TAO_Transport::provide_handler | ( | TAO::Connection_Handler_Set & | handlers | ) |
Added event handler to the handlers set.
Called by the cache when the cache is closing.
handlers | The TAO_Connection_Handler_Set into which the transport should place its handler |
Definition at line 243 of file Transport.cpp.
00244 { 00245 (void) this->add_reference (); 00246 00247 handlers.insert (this->connection_handler_i ()); 00248 }
int TAO_Transport::purge_entry | ( | void | ) |
Cache management.
Definition at line 457 of file Transport.cpp.
References transport_cache_manager().
Referenced by TAO_Connection_Handler::close_handler(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), post_open(), pre_close(), recache_transport(), and ~TAO_Transport().
00458 { 00459 return this->transport_cache_manager ().purge_entry (this->cache_map_entry_); 00460 }
ACE_INLINE void TAO_Transport::purging_order | ( | unsigned long | value | ) |
Definition at line 76 of file Transport.inl.
References purging_order_.
00077 { 00078 // This should only be called by the Transport Cache Manager when 00079 // it is holding it's lock. 00080 // The transport should still be here since the cache manager still 00081 // has a reference to it. 00082 this->purging_order_ = value; 00083 }
ACE_INLINE unsigned long TAO_Transport::purging_order | ( | void | ) | const |
Get and Set the purging order. The purging strategy uses the set version to set the purging order.
Definition at line 70 of file Transport.inl.
References purging_order_.
Referenced by TAO_LRU_Connection_Purging_Strategy::update_item().
00071 { 00072 return this->purging_order_; 00073 }
ACE_INLINE bool TAO_Transport::queue_is_empty | ( | void | ) |
Check if there are messages pending in the queue.
Definition at line 98 of file Transport.inl.
References ACE_GUARD_RETURN, and queue_is_empty_i().
Referenced by TAO_Reactive_Flushing_Strategy::flush_transport(), TAO_Leader_Follower_Flushing_Strategy::flush_transport(), and TAO_Block_Flushing_Strategy::flush_transport().
00099 { 00100 ACE_GUARD_RETURN (ACE_Lock, 00101 ace_mon, 00102 *this->handler_lock_, 00103 false); 00104 return this->queue_is_empty_i (); 00105 }
bool TAO_Transport::queue_is_empty_i | ( | void | ) | [private] |
Check if there are messages pending in the queue.
This version assumes that the lock is already held. Use with care!
Definition at line 755 of file Transport.cpp.
Referenced by queue_is_empty().
int TAO_Transport::queue_message_i | ( | const ACE_Message_Block * | message_block, | |
ACE_Time_Value * | max_wait_time, | |||
bool | back = true | |||
) | [protected] |
Queue a message for message_block
max_wait_time | The maximum time that the operation can block, used in the implementation of timeouts. | |
back | If true, the message will be pushed to the back of the queue. If false, the message will be pushed to the front of the queue. |
Definition at line 1527 of file Transport.cpp.
References ACE_NEW_RETURN, TAO_Queued_Message::push_back(), and TAO_Queued_Message::push_front().
Referenced by format_queue_message().
01529 { 01530 TAO_Queued_Message *queued_message = 0; 01531 ACE_NEW_RETURN (queued_message, 01532 TAO_Asynch_Queued_Message (message_block, 01533 this->orb_core_, 01534 max_wait_time, 01535 0, 01536 true), 01537 -1); 01538 if (back) { 01539 queued_message->push_back (this->head_, this->tail_); 01540 } 01541 else { 01542 queued_message->push_front (this->head_, this->tail_); 01543 } 01544 01545 return 0; 01546 }
int TAO_Transport::recache_transport | ( | TAO_Transport_Descriptor_Interface * | desc | ) |
Recache ourselves in the cache.
purge_entry has a return value, use it
Definition at line 447 of file Transport.cpp.
References purge_entry(), and transport_cache_manager().
00448 { 00449 // First purge our entry 00450 this->purge_entry (); 00451 00452 // Then add ourselves to the cache 00453 return this->transport_cache_manager ().cache_transport (desc, this); 00454 }
virtual ssize_t TAO_Transport::recv | ( | char * | buffer, | |
size_t | len, | |||
const ACE_Time_Value * | timeout = 0 | |||
) | [pure virtual] |
Read len bytes from into buf.
This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.
buffer | ORB allocated buffer where the data should be @ The ACE_Time_Value *s is just a place holder for now. It is not clear this this is the best place to specify this. The actual timeout values will be kept in the Policies. |
Implemented in TAO_IIOP_Transport.
Referenced by handle_input_missing_data(), and handle_input_parse_data().
ACE_INLINE size_t TAO_Transport::recv_buffer_size | ( | void | ) | const |
Accessor to recv_buffer_size_.
Definition at line 187 of file Transport.inl.
References recv_buffer_size_.
00188 { 00189 return this->recv_buffer_size_; 00190 }
int TAO_Transport::register_handler | ( | void | ) | [virtual] |
Register the handler with the reactor.
Register the handler with the reactor. This method is used by the Wait_On_Reactor strategy. The transport must register its event handler with the ORB's Reactor.
Definition at line 335 of file Transport.cpp.
References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT, TAO_Wait_Strategy::is_registered(), LM_DEBUG, orb_core_, TAO_ORB_Core::reactor(), ACE_Event_Handler::READ_MASK, ACE_Reactor::register_handler(), TAO_debug_level, and ws_.
Referenced by TAO_Wait_On_Reactor::register_handler(), TAO_Wait_On_Leader_Follower::register_handler(), and TAO_Wait_On_Leader_Follower::sending_request().
00336 { 00337 if (TAO_debug_level > 4) 00338 { 00339 ACE_DEBUG ((LM_DEBUG, 00340 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"), 00341 this->id ())); 00342 } 00343 00344 ACE_Reactor * const r = this->orb_core_->reactor (); 00345 00346 // @@note: This should be okay since the register handler call will 00347 // not make a nested call into the transport. 00348 ACE_GUARD_RETURN (ACE_Lock, 00349 ace_mon, 00350 *this->handler_lock_, 00351 false); 00352 00353 if (r == this->event_handler_i ()->reactor ()) 00354 { 00355 return 0; 00356 } 00357 00358 // Set the flag in the Connection Handler and in the Wait Strategy 00359 // @@Maybe we should set these flags after registering with the 00360 // reactor. What if the registration fails??? 00361 this->ws_->is_registered (true); 00362 00363 // Register the handler with the reactor 00364 return r->register_handler (this->event_handler_i (), 00365 ACE_Event_Handler::READ_MASK); 00366 }
ACE_Event_Handler::Reference_Count TAO_Transport::remove_reference | ( | void | ) |
Initialising the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects.
Definition at line 2588 of file Transport.cpp.
References event_handler_i(), and ACE_Event_Handler::remove_reference().
Referenced by TAO_Connection_Handler::cancel_pending_connection(), TAO_Connection_Handler::close_handler(), TAO::Cache_IntId::operator=(), TAO_Asynch_Reply_Dispatcher_Base::transport(), TAO::Cache_IntId::~Cache_IntId(), TAO_Asynch_Reply_Dispatcher_Base::~TAO_Asynch_Reply_Dispatcher_Base(), and TAO_Thread_Per_Connection_Handler::~TAO_Thread_Per_Connection_Handler().
02589 { 02590 return this->event_handler_i ()->remove_reference (); 02591 }
void TAO_Transport::report_invalid_event_handler | ( | const char * | caller | ) | [private] |
Print out error messages if the event handler is not valid.
Definition at line 1211 of file Transport.cpp.
References ACE_DEBUG, ACE_TEXT, ACE_TEXT_CHAR_TO_TCHAR, LM_DEBUG, tag_, and TAO_debug_level.
01212 { 01213 if (TAO_debug_level > 0) 01214 { 01215 ACE_DEBUG ((LM_DEBUG, 01216 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler") 01217 ACE_TEXT ("(%s) no longer associated with handler [tag=%d]\n"), 01218 this->id (), ACE_TEXT_CHAR_TO_TCHAR (caller), this->tag_)); 01219 } 01220 }
ACE_INLINE void TAO_Transport::reset_flush_timer | ( | void | ) | [private] |
The flush timer expired or was explicitly cancelled, mark it as not pending
Definition at line 114 of file Transport.inl.
References current_deadline_, flush_timer_id_, and ACE_Time_Value::zero.
Referenced by drain_queue_i().
00115 { 00116 this->flush_timer_id_ = -1; 00117 this->current_deadline_ = ACE_Time_Value::zero; 00118 }
int TAO_Transport::schedule_output_i | ( | void | ) | [private] |
Schedule handle_output() callbacks.
Definition at line 762 of file Transport.cpp.
References ACE_DEBUG, ACE_TEXT, event_handler_i(), ACE_Reactor::find_handler(), ACE_Event_Handler::get_handle(), LM_DEBUG, ACE_Event_Handler::reactor(), ACE_Event_Handler::remove_reference(), ACE_Reactor::schedule_wakeup(), TAO_debug_level, and ACE_Event_Handler::WRITE_MASK.
00763 { 00764 ACE_Event_Handler * const eh = this->event_handler_i (); 00765 ACE_Reactor * const reactor = eh->reactor (); 00766 00767 if (reactor == 0) 00768 return -1; 00769 00770 // Check to see if our event handler is still registered with the 00771 // reactor. It's possible for another thread to have run close_connection() 00772 // since we last used the event handler. 00773 ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ()); 00774 if (found) 00775 { 00776 found->remove_reference (); 00777 00778 if (found != eh) 00779 { 00780 if (TAO_debug_level > 3) 00781 { 00782 ACE_DEBUG ((LM_DEBUG, 00783 ACE_TEXT ("TAO (%P|%t) - ") 00784 ACE_TEXT ("Transport[%d]::schedule_output_i ") 00785 ACE_TEXT ("event handler not found in reactor,") 00786 ACE_TEXT ("returning -1\n"), 00787 this->id ())); 00788 } 00789 00790 return -1; 00791 } 00792 } 00793 00794 if (TAO_debug_level > 3) 00795 { 00796 ACE_DEBUG ((LM_DEBUG, 00797 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"), 00798 this->id ())); 00799 } 00800 00801 return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK); 00802 }
virtual ssize_t TAO_Transport::send | ( | iovec * | iov, | |
int | iovcnt, | |||
size_t & | bytes_transferred, | |||
const ACE_Time_Value * | timeout = 0 | |||
) | [pure virtual] |
Write the complete Message_Block chain to the connection.
This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.
Often the implementation simply forwards the arguments to the underlying ACE_Svc_Handler class. Using the code factored out into ACE.
Be careful with protocols that perform non-trivial transformations of the data, such as SSLIOP or protocols that compress the stream.
iov | contains the data that must be sent. | |
timeout | is the maximum time that the application is willing to wait for the data to be sent, useful in platforms that implement timed writes. The timeout value is obtained from the policies set by the application. | |
bytes_transferred | should return the total number of bytes successfully transferred before the connection blocked. This is required because in some platforms and/or protocols multiple system calls may be required to send the chain of message blocks. The first few calls can work successfully, but the final one can fail or signal a flow control situation (via EAGAIN). In this case the ORB expects the function to return -1, errno to be appropriately set and this argument to return the number of bytes already on the OS I/O subsystem. |
ENOENT
.
Implemented in TAO_IIOP_Transport.
Referenced by drain_queue_helper().
int TAO_Transport::send_asynchronous_message_i | ( | TAO_Stub * | stub, | |
const ACE_Message_Block * | message_block, | |||
ACE_Time_Value * | max_wait_time | |||
) | [private] |
Send an asynchronous message, i.e. do not block until the message is on the wire
Definition at line 1279 of file Transport.cpp.
References CORBA::SystemException::_tao_minor_code(), ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT, check_buffering_constraints_i(), CORBA::COMPLETED_NO, ACE_Message_Block::cont(), TAO_Flushing_Strategy::flush_transport(), TAO_ORB_Core::flushing_strategy(), head_, ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO_Flushing_Strategy::MUST_FLUSH, orb_core(), TAO_Flushing_Strategy::schedule_output(), send_message_block_chain_i(), sent_byte_count_, TAO_debug_level, TAO_TIMEOUT_SEND_MINOR_CODE, ACE_Message_Block::total_length(), and TAO_Stub::transport_queueing_strategy().
Referenced by send_message_shared_i().
01282 { 01283 // Let's figure out if the message should be queued without trying 01284 // to send first: 01285 bool try_sending_first = true; 01286 01287 bool const queue_empty = (this->head_ == 0); 01288 01289 TAO::Transport_Queueing_Strategy *queue_strategy = 01290 stub->transport_queueing_strategy (); 01291 01292 if (!queue_empty) 01293 { 01294 try_sending_first = false; 01295 } 01296 else if (queue_strategy) 01297 { 01298 if (queue_strategy->must_queue (queue_empty)) 01299 { 01300 try_sending_first = false; 01301 } 01302 } 01303 01304 bool partially_sent = false; 01305 bool timeout_encountered = false; 01306 01307 if (try_sending_first) 01308 { 01309 ssize_t n = 0; 01310 size_t byte_count = 0; 01311 // ... in this case we must try to send the message first ... 01312 01313 size_t const total_length = message_block->total_length (); 01314 01315 if (TAO_debug_level > 6) 01316 { 01317 ACE_DEBUG ((LM_DEBUG, 01318 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") 01319 ACE_TEXT ("trying to send the message (ml = %d)\n"), 01320 this->id (), total_length)); 01321 } 01322 01323 // @@ I don't think we want to hold the mutex here, however if 01324 // we release it we need to recheck the status of the transport 01325 // after we return... once I understand the final form for this 01326 // code I will re-visit this decision 01327 n = this->send_message_block_chain_i (message_block, 01328 byte_count, 01329 max_wait_time); 01330 01331 if (n == -1) 01332 { 01333 // ... if this is just an EWOULDBLOCK we must schedule the 01334 // message for later, if it is ETIME we still have to send 01335 // the complete message, because cutting off the message at 01336 // this point will destroy the synchronization with the 01337 // server ... 01338 if (errno != EWOULDBLOCK && errno != ETIME) 01339 { 01340 if (TAO_debug_level > 0) 01341 { 01342 ACE_ERROR ((LM_ERROR, 01343 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") 01344 ACE_TEXT ("fatal error in ") 01345 ACE_TEXT ("send_message_block_chain_i - %m\n"), 01346 this->id ())); 01347 } 01348 return -1; 01349 } 01350 } 01351 01352 // ... let's figure out if the complete message was sent ... 01353 if (total_length == byte_count) 01354 { 01355 // Done, just return. Notice that there are no allocations 01356 // or copies up to this point (though some fancy calling 01357 // back and forth). 01358 // This is the common case for the critical path, it should 01359 // be fast. 01360 return 0; 01361 } 01362 01363 if (byte_count > 0) 01364 { 01365 partially_sent = true; 01366 } 01367 01368 // If it was partially sent, then push to front of queue and don't flush 01369 if (errno == ETIME) 01370 { 01371 timeout_encountered = true; 01372 if (byte_count == 0) 01373 { 01374 //This request has timed out and none of it was sent to the transport 01375 //We can't return -1 here, since that would end up closing the tranpsort 01376 if (TAO_debug_level > 2) 01377 { 01378 ACE_DEBUG ((LM_DEBUG, 01379 ACE_TEXT ("TAO (%P|%t) - ") 01380 ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ") 01381 ACE_TEXT ("timeout encountered before any bytes sent\n"), 01382 this->id ())); 01383 } 01384 throw ::CORBA::TIMEOUT ( 01385 CORBA::SystemException::_tao_minor_code ( 01386 TAO_TIMEOUT_SEND_MINOR_CODE, 01387 ETIME), 01388 CORBA::COMPLETED_NO); 01389 } 01390 } 01391 01392 if (TAO_debug_level > 6) 01393 { 01394 ACE_DEBUG ((LM_DEBUG, 01395 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") 01396 ACE_TEXT ("partial send %d / %d bytes\n"), 01397 this->id (), byte_count, total_length)); 01398 } 01399 01400 // ... part of the data was sent, need to figure out what piece 01401 // of the message block chain must be queued ... 01402 while (message_block != 0 && message_block->length () == 0) 01403 { 01404 message_block = message_block->cont (); 01405 } 01406 01407 // ... at least some portion of the message block chain should 01408 // remain ... 01409 } 01410 01411 // ... either the message must be queued or we need to queue it 01412 // because it was not completely sent out ... 01413 01414 ACE_Time_Value *wait_time = (partially_sent ? 0: max_wait_time); 01415 if (this->queue_message_i (message_block, wait_time, !partially_sent) 01416 == -1) 01417 { 01418 if (TAO_debug_level > 0) 01419 { 01420 ACE_DEBUG ((LM_DEBUG, 01421 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") 01422 ACE_TEXT ("send_asynchronous_message_i, ") 01423 ACE_TEXT ("cannot queue message for - %m\n"), 01424 this->id ())); 01425 } 01426 return -1; 01427 } 01428 01429 if (TAO_debug_level > 6) 01430 { 01431 ACE_DEBUG ((LM_DEBUG, 01432 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") 01433 ACE_TEXT ("message is queued\n"), 01434 this->id ())); 01435 } 01436 01437 if (timeout_encountered && partially_sent) 01438 { 01439 //Must close down the transport here since we can't guarantee the 01440 //integrity of the GIOP stream (the next send may try to write to 01441 //the socket before looking at the queue). 01442 if (TAO_debug_level > 0) 01443 { 01444 ACE_DEBUG ((LM_DEBUG, 01445 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") 01446 ACE_TEXT ("send_asynchronous_message_i, ") 01447 ACE_TEXT ("timeout after partial send, closing.\n"), 01448 this->id ())); 01449 } 01450 return -1; 01451 } 01452 else if (!timeout_encountered) 01453 { 01454 // We can't flush if we have already encountered a timeout 01455 // ... if the queue is full we need to activate the output on the 01456 // queue ... 01457 bool must_flush = false; 01458 const bool constraints_reached = 01459 this->check_buffering_constraints_i (stub, 01460 must_flush); 01461 01462 // ... but we also want to activate it if the message was partially 01463 // sent.... Plus, when we use the blocking flushing strategy the 01464 // queue is flushed as a side-effect of 'schedule_output()' 01465 01466 TAO_Flushing_Strategy *flushing_strategy = 01467 this->orb_core ()->flushing_strategy (); 01468 01469 if (constraints_reached || try_sending_first) 01470 { 01471 int const result = flushing_strategy->schedule_output (this); 01472 if (result == TAO_Flushing_Strategy::MUST_FLUSH) 01473 { 01474 must_flush = true; 01475 } 01476 } 01477 01478 if (must_flush) 01479 { 01480 if (TAO_debug_level > 0) 01481 { 01482 ACE_DEBUG ((LM_DEBUG, 01483 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") 01484 ACE_TEXT ("send_asynchronous_message_i, ") 01485 ACE_TEXT ("flushing transport.\n"), 01486 this->id ())); 01487 } 01488 01489 size_t sent_byte = sent_byte_count_; 01490 int ret = 0; 01491 { 01492 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; 01493 TAO_REVERSE_LOCK reverse (*this->handler_lock_); 01494 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); 01495 ret = flushing_strategy->flush_transport (this, max_wait_time); 01496 } 01497 01498 if (ret == -1) 01499 { 01500 if (errno == ETIME) 01501 { 01502 if (sent_byte == sent_byte_count_) // if nothing was actually flushed 01503 { 01504 //This request has timed out and none of it was sent to the transport 01505 //We can't return -1 here, since that would end up closing the tranpsort 01506 if (TAO_debug_level > 2) 01507 { 01508 ACE_DEBUG ((LM_DEBUG, 01509 ACE_TEXT ("TAO (%P|%t) - ") 01510 ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ") 01511 ACE_TEXT ("2 timeout encountered before any bytes sent\n"), 01512 this->id ())); 01513 } 01514 throw ::CORBA::TIMEOUT (CORBA::SystemException::_tao_minor_code 01515 (TAO_TIMEOUT_SEND_MINOR_CODE, ETIME), 01516 CORBA::COMPLETED_NO); 01517 } 01518 } 01519 return -1; 01520 } 01521 } 01522 } 01523 return 0; 01524 }
void TAO_Transport::send_connection_closed_notifications | ( | void | ) |
Notify all the components inside a Transport when the underlying connection is closed.
Definition at line 1223 of file Transport.cpp.
References ACE_GUARD, TAO_Transport_Mux_Strategy::connection_closed(), send_connection_closed_notifications_i(), and tms().
Referenced by TAO_Connection_Handler::close_connection_eh().
01224 { 01225 { 01226 ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_)); 01227 01228 this->send_connection_closed_notifications_i (); 01229 } 01230 01231 this->tms ()->connection_closed (); 01232 }
void TAO_Transport::send_connection_closed_notifications_i | ( | void | ) | [private] |
Assume the lock is held.
Definition at line 1235 of file Transport.cpp.
References cleanup_queue_i().
Referenced by send_connection_closed_notifications().
01236 { 01237 this->cleanup_queue_i (); 01238 }
virtual int TAO_Transport::send_message | ( | TAO_OutputCDR & | stream, | |
TAO_Stub * | stub = 0 , |
|||
TAO_Message_Semantics | message_semantics = TAO_Transport::TAO_TWOWAY_REQUEST , |
|||
ACE_Time_Value * | max_time_wait = 0 | |||
) | [pure virtual] |
Once the ORB is prepared to receive a reply (see send_request() above), and all the arguments have been marshaled the CDR stream must be 'formatted', i.e. the message_size field in the GIOP header can finally be set to the proper value.
Referenced by TAO_GIOP_Message_Base::make_send_locate_reply(), TAO_GIOP_Message_Base::process_request(), and TAO_GIOP_Message_Base::send_reply_exception().
int TAO_Transport::send_message_block_chain | ( | const ACE_Message_Block * | message_block, | |
size_t & | bytes_transferred, | |||
ACE_Time_Value * | max_wait_time = 0 | |||
) |
Send a message block chain,.
Definition at line 525 of file Transport.cpp.
References ACE_GUARD_RETURN, and send_message_block_chain_i().
Referenced by TAO_GIOP_Message_Base::send_close_connection(), and TAO_GIOP_Message_Base::send_error().
00528 { 00529 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); 00530 00531 return this->send_message_block_chain_i (mb, 00532 bytes_transferred, 00533 max_wait_time); 00534 }
int TAO_Transport::send_message_block_chain_i | ( | const ACE_Message_Block * | message_block, | |
size_t & | bytes_transferred, | |||
ACE_Time_Value * | max_wait_time | |||
) |
Send a message block chain, assuming the lock is held.
Definition at line 537 of file Transport.cpp.
References drain_queue_i(), TAO_Synch_Queued_Message::message_length(), TAO_Queued_Message::push_back(), TAO_Queued_Message::remove_from_list(), and ACE_Message_Block::total_length().
Referenced by send_asynchronous_message_i(), and send_message_block_chain().
00540 { 00541 size_t const total_length = mb->total_length (); 00542 00543 // We are going to block, so there is no need to clone 00544 // the message block. 00545 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); 00546 00547 synch_message.push_back (this->head_, this->tail_); 00548 00549 int const n = this->drain_queue_i (max_wait_time); 00550 00551 if (n == -1) 00552 { 00553 synch_message.remove_from_list (this->head_, this->tail_); 00554 return -1; // Error while sending... 00555 } 00556 else if (n == 1) 00557 { 00558 bytes_transferred = total_length; 00559 return 1; // Empty queue, message was sent.. 00560 } 00561 00562 // Remove the temporary message from the queue... 00563 synch_message.remove_from_list (this->head_, this->tail_); 00564 00565 bytes_transferred = total_length - synch_message.message_length (); 00566 00567 return 0; 00568 }
int TAO_Transport::send_message_shared | ( | TAO_Stub * | stub, | |
TAO_Message_Semantics | message_semantics, | |||
const ACE_Message_Block * | message_block, | |||
ACE_Time_Value * | max_wait_time | |||
) | [virtual] |
Sent the contents of message_block.
stub | The object reference used for this operation, useful to obtain the current policies. | |
message_semantics | If this is set to TAO_TWO_REQUEST this method will block until the operation is completely written on the wire. If it is set to other values this operation could return. | |
message_block | The CDR encapsulation of the GIOP message that must be sent. The message may consist of multiple Message Blocks chained through the cont() field. | |
max_wait_time | The maximum time that the operation can block, used in the implementation of timeouts. |
Definition at line 293 of file Transport.cpp.
References ACE_GUARD_RETURN, close_connection(), and send_message_shared_i().
Referenced by TAO_IIOP_Transport::send_message().
00297 { 00298 int result = 0; 00299 00300 { 00301 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); 00302 00303 result = 00304 this->send_message_shared_i (stub, message_semantics, 00305 message_block, max_wait_time); 00306 } 00307 00308 if (result == -1) 00309 { 00310 // The connection needs to be closed here. 00311 // In the case of a partially written message this is the only way to cleanup 00312 // the physical connection as well as the Transport. An EOF on the remote end 00313 // will cancel the partially received message. 00314 this->close_connection (); 00315 } 00316 00317 return result; 00318 }
int TAO_Transport::send_message_shared_i | ( | TAO_Stub * | stub, | |
TAO_Message_Semantics | message_semantics, | |||
const ACE_Message_Block * | message_block, | |||
ACE_Time_Value * | max_wait_time | |||
) | [protected] |
Implement send_message_shared() assuming the handler_lock_ is held.
Definition at line 1241 of file Transport.cpp.
References ACE_Message_Block::length(), TAO::Transport::Stats::messages_sent(), send_asynchronous_message_i(), send_reply_message_i(), send_synchronous_message_i(), stats_, TAO_ONEWAY_REQUEST, TAO_REPLY, and TAO_TWOWAY_REQUEST.
Referenced by send_message_shared().
01245 { 01246 int ret = 0; 01247 01248 #if TAO_HAS_TRANSPORT_CURRENT == 1 01249 size_t const message_length = message_block->length (); 01250 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 01251 01252 switch (message_semantics) 01253 { 01254 case TAO_Transport::TAO_TWOWAY_REQUEST: 01255 ret = this->send_synchronous_message_i (message_block, max_wait_time); 01256 break; 01257 01258 case TAO_Transport::TAO_REPLY: 01259 ret = this->send_reply_message_i (message_block, max_wait_time); 01260 break; 01261 01262 case TAO_Transport::TAO_ONEWAY_REQUEST: 01263 ret = this->send_asynchronous_message_i (stub, 01264 message_block, 01265 max_wait_time); 01266 break; 01267 } 01268 01269 #if TAO_HAS_TRANSPORT_CURRENT == 1 01270 // "Count" the message, only if no error was encountered. 01271 if (ret != -1 && this->stats_ != 0) 01272 this->stats_->messages_sent (message_length); 01273 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 01274 01275 return ret; 01276 }
int TAO_Transport::send_reply_message_i | ( | const ACE_Message_Block * | message_block, | |
ACE_Time_Value * | max_wait_time | |||
) | [private] |
Send a reply message, i.e. do not block until the message is on the wire, but just return after adding them to the queue.
Definition at line 667 of file Transport.cpp.
References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT, TAO_Synch_Queued_Message::clone(), TAO_Queued_Message::destroy(), TAO_Flushing_Strategy::flush_message(), TAO_ORB_Core::flushing_strategy(), LM_DEBUG, TAO_Flushing_Strategy::MUST_FLUSH, orb_core(), TAO_Queued_Message::push_back(), TAO_Queued_Message::remove_from_list(), TAO_Flushing_Strategy::schedule_output(), send_synch_message_helper_i(), and TAO_debug_level.
Referenced by send_message_shared_i().
00669 { 00670 // Dont clone now.. We could be sent in one shot! 00671 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); 00672 00673 synch_message.push_back (this->head_, this->tail_); 00674 00675 int const n = 00676 this->send_synch_message_helper_i (synch_message, max_wait_time); 00677 00678 // What about partially sent messages. 00679 if (n == -1 || n == 1) 00680 { 00681 return n; 00682 } 00683 00684 if (TAO_debug_level > 3) 00685 { 00686 ACE_DEBUG ((LM_DEBUG, 00687 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ") 00688 ACE_TEXT ("preparing to add to queue before leaving\n"), 00689 this->id ())); 00690 } 00691 00692 // Till this point we shouldn't have any copying and that is the 00693 // point anyway. Now, remove the node from the list 00694 synch_message.remove_from_list (this->head_, this->tail_); 00695 00696 // Clone the node that we have. 00697 TAO_Queued_Message *msg = 00698 synch_message.clone (this->orb_core_->transport_message_buffer_allocator ()); 00699 00700 // Stick it in the queue 00701 msg->push_back (this->head_, this->tail_); 00702 00703 TAO_Flushing_Strategy *flushing_strategy = 00704 this->orb_core ()->flushing_strategy (); 00705 00706 int const result = flushing_strategy->schedule_output (this); 00707 00708 if (result == -1) 00709 { 00710 if (TAO_debug_level > 5) 00711 { 00712 ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_" 00713 "message_i, dequeuing msg due to schedule_output " 00714 "failure\n", this->id ())); 00715 } 00716 msg->remove_from_list (this->head_, this->tail_); 00717 msg->destroy (); 00718 } 00719 else if (result == TAO_Flushing_Strategy::MUST_FLUSH) 00720 { 00721 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; 00722 TAO_REVERSE_LOCK reverse (*this->handler_lock_); 00723 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); 00724 (void) flushing_strategy->flush_message (this, msg, 0); 00725 } 00726 00727 return 1; 00728 }
virtual int TAO_Transport::send_request | ( | TAO_Stub * | stub, | |
TAO_ORB_Core * | orb_core, | |||
TAO_OutputCDR & | stream, | |||
TAO_Message_Semantics | message_semantics, | |||
ACE_Time_Value * | max_time_wait | |||
) | [pure virtual] |
Preparing the ORB to receive the reply only once the request is completely sent opens the system to some subtle race conditions: suppose the ORB is running in a multi-threaded configuration, thread A makes a request while thread B is using the Reactor to process all incoming requests. Thread A could be implemented as follows: 1) send the request 2) setup the ORB to receive the reply 3) wait for the request
but in this case thread B may receive the reply between step (1) and (2), and drop it as an invalid or unexpected message. Consequently the correct implementation is: 1) setup the ORB to receive the reply 2) send the request 3) wait for the reply
The following method encapsulates this idiom.
Referenced by TAO::Remote_Invocation::send_message().
int TAO_Transport::send_synch_message_helper_i | ( | TAO_Synch_Queued_Message & | s, | |
ACE_Time_Value * | max_wait_time | |||
) | [private] |
A helper method used by send_synchronous_message_i() and send_reply_message_i(). Reusable code that could be used by both the methods.
Definition at line 731 of file Transport.cpp.
References TAO_Synch_Queued_Message::all_data_sent(), drain_queue_i(), and TAO_Queued_Message::remove_from_list().
Referenced by send_reply_message_i(), and send_synchronous_message_i().
00733 { 00734 int const n = this->drain_queue_i (max_wait_time); 00735 00736 if (n == -1) 00737 { 00738 synch_message.remove_from_list (this->head_, this->tail_); 00739 return -1; // Error while sending... 00740 } 00741 else if (n == 1) 00742 { 00743 return 1; // Empty queue, message was sent.. 00744 } 00745 00746 if (synch_message.all_data_sent ()) 00747 { 00748 return 1; 00749 } 00750 00751 return 0; 00752 }
int TAO_Transport::send_synchronous_message_i | ( | const ACE_Message_Block * | message_block, | |
ACE_Time_Value * | max_wait_time | |||
) | [private] |
Send a synchronous message, i.e. block until the message is on the wire
Definition at line 571 of file Transport.cpp.
References CORBA::SystemException::_tao_minor_code(), ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT, CORBA::COMPLETED_NO, TAO_Flushing_Strategy::flush_message(), TAO_ORB_Core::flushing_strategy(), LM_DEBUG, LM_ERROR, TAO_Synch_Queued_Message::message_length(), orb_core(), TAO_Queued_Message::push_back(), TAO_Queued_Message::remove_from_list(), TAO_Flushing_Strategy::schedule_output(), send_synch_message_helper_i(), TAO_debug_level, TAO_TIMEOUT_SEND_MINOR_CODE, and ACE_Message_Block::total_length().
Referenced by send_message_shared_i().
00573 { 00574 // We are going to block, so there is no need to clone 00575 // the message block. 00576 size_t const total_length = mb->total_length (); 00577 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); 00578 00579 synch_message.push_back (this->head_, this->tail_); 00580 00581 int const result = this->send_synch_message_helper_i (synch_message, 00582 max_wait_time); 00583 if (result == -1 && errno == ETIME) 00584 { 00585 if (total_length == synch_message.message_length ()) //none was sent 00586 { 00587 if (TAO_debug_level > 2) 00588 { 00589 ACE_DEBUG ((LM_DEBUG, 00590 ACE_TEXT ("TAO (%P|%t) - ") 00591 ACE_TEXT ("Transport[%d]::send_synchronous_message_i, ") 00592 ACE_TEXT ("timeout encountered before any bytes sent\n"), 00593 this->id ())); 00594 } 00595 throw ::CORBA::TIMEOUT ( 00596 CORBA::SystemException::_tao_minor_code ( 00597 TAO_TIMEOUT_SEND_MINOR_CODE, 00598 ETIME), 00599 CORBA::COMPLETED_NO); 00600 } 00601 else 00602 { 00603 return -1; 00604 } 00605 } 00606 else if(result == -1 || result == 1) 00607 { 00608 return result; 00609 } 00610 00611 TAO_Flushing_Strategy *flushing_strategy = 00612 this->orb_core ()->flushing_strategy (); 00613 if (flushing_strategy->schedule_output (this) == -1) 00614 { 00615 synch_message.remove_from_list (this->head_, this->tail_); 00616 if (TAO_debug_level > 0) 00617 { 00618 ACE_ERROR ((LM_ERROR, 00619 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") 00620 ACE_TEXT ("send_synchronous_message_i, ") 00621 ACE_TEXT ("error while scheduling flush - %m\n"), 00622 this->id ())); 00623 } 00624 return -1; 00625 } 00626 00627 // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH, 00628 // because we're always going to flush anyway. 00629 00630 // Release the mutex, other threads may modify the queue as we 00631 // block for a long time writing out data. 00632 int flush_result; 00633 { 00634 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; 00635 TAO_REVERSE_LOCK reverse (*this->handler_lock_); 00636 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); 00637 00638 flush_result = flushing_strategy->flush_message (this, 00639 &synch_message, 00640 max_wait_time); 00641 } 00642 00643 if (flush_result == -1) 00644 { 00645 synch_message.remove_from_list (this->head_, this->tail_); 00646 00647 // We don't need to do anything special for the timeout case. 00648 // The connection is going to get closed and the Transport destroyed. 00649 // The only thing to do maybe is to empty the queue. 00650 00651 if (TAO_debug_level > 0) 00652 { 00653 ACE_ERROR ((LM_ERROR, 00654 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ") 00655 ACE_TEXT ("error while sending message - %m\n"), 00656 this->id ())); 00657 } 00658 00659 return -1; 00660 } 00661 00662 return 1; 00663 }
ACE_INLINE size_t TAO_Transport::sent_byte_count | ( | void | ) | const |
Accessor to sent_byte_count_.
Definition at line 193 of file Transport.inl.
References sent_byte_count_.
00194 { 00195 return this->sent_byte_count_; 00196 }
TAO::Transport::Stats* TAO_Transport::stats | ( | void | ) | const |
Transport statistics.
TAO_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE CORBA::ULong TAO_Transport::tag | ( | void | ) | const |
Return the protocol tag.
The OMG assigns unique tags (a 32-bit unsigned number) to each protocol. New protocol tags can be obtained free of charge from the OMG, check the documents in corbafwd.h for more details.
Definition at line 8 of file Transport.inl.
References tag_.
00009 { 00010 return this->tag_; 00011 }
int TAO_Transport::tear_listen_point_list | ( | TAO_InputCDR & | cdr | ) | [virtual] |
Extracts the list of listen points from the cdr stream. The list would have the protocol specific details of the ListenPoints
Reimplemented in TAO_IIOP_Transport.
Definition at line 287 of file Transport.cpp.
References ACE_NOTSUP_RETURN.
00288 { 00289 ACE_NOTSUP_RETURN (-1); 00290 }
ACE_INLINE TAO_Transport_Mux_Strategy * TAO_Transport::tms | ( | void | ) | const |
Get the TAO_Tranport_Mux_Strategy used by this object.
The role of the TAO_Transport_Mux_Strategy is described in more detail in that class' documentation. Enough is to say that the class is used to control how many threads can have pending requests over the same connection. Multiplexing multiple threads over the same connection conserves resources and is almost required for AMI, but having only one pending request per connection is more efficient and reduces the possibilities of priority inversions.
Definition at line 20 of file Transport.inl.
References tms_.
Referenced by idle_after_reply(), idle_after_send(), TAO::LocateRequest_Invocation::invoke(), TAO_GIOP_Message_Base::process_reply_message(), TAO::Synch_Twoway_Invocation::remote_twoway(), and send_connection_closed_notifications().
00021 { 00022 return tms_; 00023 }
TAO::Transport_Cache_Manager & TAO_Transport::transport_cache_manager | ( | void | ) | [private] |
Helper method that returns the Transport Cache Manager.
Definition at line 2546 of file Transport.cpp.
References TAO_ORB_Core::lane_resources(), orb_core_, and TAO_Thread_Lane_Resources::transport_cache().
Referenced by make_idle(), purge_entry(), recache_transport(), and update_transport().
02547 { 02548 return this->orb_core_->lane_resources ().transport_cache (); 02549 }
int TAO_Transport::update_transport | ( | void | ) |
Cache management.
Definition at line 476 of file Transport.cpp.
References transport_cache_manager().
Referenced by TAO_Connection_Handler::svc_i().
00477 { 00478 return this->transport_cache_manager ().update_entry (this->cache_map_entry_); 00479 }
ACE_INLINE TAO_Wait_Strategy * TAO_Transport::wait_strategy | ( | void | ) | const |
Return the TAO_Wait_Strategy used by this object.
The role of the TAO_Wait_Strategy is described in more detail in that class' documentation. Enough is to say that the ORB can wait for a reply blocking on read(), using the Reactor to wait for multiple events concurrently or using the Leader/Followers protocol.
Definition at line 27 of file Transport.inl.
References ws_.
Referenced by TAO_Connection_Handler::close_connection_eh(), TAO_IIOP_Connector::complete_connection(), TAO::Transport_Cache_Manager::find_transport(), and TAO::Synch_Twoway_Invocation::wait_for_reply().
00028 { 00029 return this->ws_; 00030 }
ACE_INLINE void TAO_Transport::wchar_translator | ( | TAO_Codeset_Translator_Base * | ) |
CodeSet negotiation - Set the wchar codeset translator factory.
Definition at line 150 of file Transport.inl.
References tcs_set_, and wchar_translator_.
00151 { 00152 this->wchar_translator_ = tf; 00153 this->tcs_set_ = 1; 00154 }
ACE_INLINE TAO_Codeset_Translator_Base * TAO_Transport::wchar_translator | ( | void | ) | const |
CodeSet Negotiation - Get the wchar codeset translator factory.
Definition at line 137 of file Transport.inl.
References wchar_translator_.
00138 { 00139 return this->wchar_translator_; 00140 }
friend class TAO_Leader_Follower_Flushing_Strategy [friend] |
Definition at line 812 of file Transport.h.
friend class TAO_Reactive_Flushing_Strategy [friend] |
These classes need privileged access to:
Definition at line 811 of file Transport.h.
friend class TAO_Thread_Per_Connection_Handler [friend] |
Needs priveleged access to event_handler_i ()
Definition at line 816 of file Transport.h.
int TAO_Transport::bidirectional_flag_ [protected] |
Have we sent any info on bidirectional information or have we received any info regarding making the connection served by this transport bidirectional. The flag is used as follows: + We dont want to send the bidirectional context info more than once on the connection. Why? Waste of marshalling and demarshalling time on the client. + On the server side -- once a client that has established the connection asks the server to use the connection both ways, we *dont* want the server to pack service info to the client. That is not allowed. We need a flag to prevent such a things from happening.
The value of this flag will be 0 if the client sends info and 1 if the server receives the info.
Definition at line 967 of file Transport.h.
Referenced by bidirectional_flag().
TAO::Transport_Cache_Manager::HASH_MAP_ENTRY* TAO_Transport::cache_map_entry_ [protected] |
Our entry in the cache. We don't own this. It is here for our convenience. We cannot just change things around.
Definition at line 939 of file Transport.h.
Referenced by cache_map_entry().
Additional member values required to support codeset translation.
@Phil, I think it would be nice if we could think of a way to do the following. We have been trying to use the transport for marking about translator factories and such! IMHO this is a wrong encapulation ie. trying to populate the transport object with these details. We should probably have a class something like TAO_Message_Property or TAO_Message_Translator or whatever (I am sure you get the idea) and encapsulate all these details. Coupling these seems odd. if I have to be more cynical we can move this to the connection_handler and it may more sense with the DSCP stuff around there. Do you agree?
Definition at line 1045 of file Transport.h.
Referenced by assign_translators(), and char_translator().
ACE_Time_Value TAO_Transport::current_deadline_ [protected] |
The queue will start draining no later than <queeing_deadline_> if* the deadline is
Definition at line 984 of file Transport.h.
Referenced by check_buffering_constraints_i(), and reset_flush_timer().
CORBA::Boolean TAO_Transport::first_request_ [private] |
First_request_ is true until the first request is sent or received. This is necessary since codeset context information is necessary only on the first request. After that, the translators are fixed for the life of the connection.
Definition at line 1057 of file Transport.h.
Referenced by first_request_sent().
long TAO_Transport::flush_timer_id_ [protected] |
The timer ID.
Definition at line 987 of file Transport.h.
Referenced by check_buffering_constraints_i(), flush_timer_pending(), and reset_flush_timer().
ACE_Lock* TAO_Transport::handler_lock_ [mutable, protected] |
This is an ACE_Lock
that gets initialized from TAO_ORB_Core::resource_factory()->create_cached_connection_lock()
. This way, one can use a lock appropriate for the type of system, i.e., a null lock for single-threaded systems, and a real lock for multi-threaded systems.
Definition at line 1001 of file Transport.h.
Referenced by ~TAO_Transport().
TAO_Queued_Message* TAO_Transport::head_ [protected] |
Implement the outgoing data queue.
Definition at line 972 of file Transport.h.
Referenced by cleanup_queue(), cleanup_queue_i(), drain_queue_helper(), drain_queue_i(), and send_asynchronous_message_i().
size_t TAO_Transport::id_ [protected] |
A unique identifier for the transport.
This never *never* changes over the lifespan, so we don't have to worry about locking it.
HINT: Protocol-specific transports that use connection handler might choose to set this to the handle for their connection.
Definition at line 1011 of file Transport.h.
Referenced by id(), and post_open().
Queue of the consolidated, incoming messages..
Definition at line 976 of file Transport.h.
Referenced by process_queue_head().
TAO::Incoming_Message_Stack TAO_Transport::incoming_message_stack_ [protected] |
Stack of incoming fragments, consolidated messages are going to be enqueued in "incoming_message_queue_"
Definition at line 980 of file Transport.h.
Referenced by handle_input_parse_data(), and handle_input_parse_extra_messages().
bool TAO_Transport::is_connected_ [protected] |
Is this transport really connected or not. In case of oneways with SYNC_NONE Policy we don't wait until the connection is ready and we buffer the requests in this transport until the connection is ready
Definition at line 1025 of file Transport.h.
Referenced by is_connected(), post_open(), and pre_close().
TAO_GIOP_Message_Base* TAO_Transport::messaging_object_ [protected] |
Our messaging object.
Definition at line 1028 of file Transport.h.
Referenced by TAO_IIOP_Transport::generate_request_header(), messaging_object(), and ~TAO_Transport().
Definition at line 969 of file Transport.h.
Referenced by opened_as(), and provide_blockable_handler().
TAO_ORB_Core* const TAO_Transport::orb_core_ [protected] |
Global orbcore resource.
Definition at line 935 of file Transport.h.
Referenced by handle_input_parse_data(), orb_core(), TAO_GIOP_Message_Base::process_locate_request(), TAO_GIOP_Message_Base::process_request(), register_handler(), TAO_Transport(), and transport_cache_manager().
ACE_Message_Block* TAO_Transport::partial_message_ [private] |
Holds the partial GIOP message (if there is one).
Definition at line 1060 of file Transport.h.
Referenced by handle_input_parse_data().
unsigned long TAO_Transport::purging_order_ [protected] |
Used by the LRU, LFU and FIFO Connection Purging Strategies.
Definition at line 1014 of file Transport.h.
Referenced by purging_order().
size_t TAO_Transport::recv_buffer_size_ [protected] |
Size of the buffer received.
Definition at line 1017 of file Transport.h.
Referenced by handle_input_missing_data(), handle_input_parse_data(), and recv_buffer_size().
size_t TAO_Transport::sent_byte_count_ [protected] |
Number of bytes sent.
Definition at line 1020 of file Transport.h.
Referenced by drain_queue_helper(), drain_queue_i(), send_asynchronous_message_i(), and sent_byte_count().
TAO::Transport::Stats* TAO_Transport::stats_ [private] |
Statistics.
Definition at line 1073 of file Transport.h.
Referenced by process_parsed_messages(), send_message_shared_i(), and ~TAO_Transport().
CORBA::ULong const TAO_Transport::tag_ [protected] |
IOP protocol tag.
Definition at line 932 of file Transport.h.
Referenced by report_invalid_event_handler(), and tag().
TAO_Queued_Message* TAO_Transport::tail_ [protected] |
Definition at line 973 of file Transport.h.
CORBA::Boolean TAO_Transport::tcs_set_ [private] |
The tcs_set_ flag indicates that negotiation has occured and so the translators are correct, since a null translator is valid if both ends are using the same codeset, whatever that codeset might be.
Definition at line 1051 of file Transport.h.
Referenced by char_translator(), is_tcs_set(), and wchar_translator().
TAO_Transport_Mux_Strategy* TAO_Transport::tms_ [protected] |
Strategy to decide whether multiple requests can be sent over the same connection or the connection is exclusive for a request.
Definition at line 943 of file Transport.h.
Referenced by TAO_Transport(), tms(), and ~TAO_Transport().
TAO_Transport_Timer TAO_Transport::transport_timer_ [protected] |
The adapter used to receive timeout callbacks from the Reactor.
Definition at line 990 of file Transport.h.
Definition at line 1046 of file Transport.h.
Referenced by assign_translators(), and wchar_translator().
TAO_Wait_Strategy* TAO_Transport::ws_ [protected] |
Strategy for waiting for the reply after sending the request.
Definition at line 946 of file Transport.h.
Referenced by register_handler(), TAO_Transport(), wait_strategy(), and ~TAO_Transport().