#include <Transport.h>
Inheritance diagram for TAO_Transport:
Template methods | |
The Transport class uses the Template Method Pattern to implement the protocol specific functionality. Implementors of a pluggable protocol should override the following methods with the semantics documented below. | |
enum | { TAO_ONEWAY_REQUEST = 0, TAO_TWOWAY_REQUEST = 1, TAO_REPLY } |
virtual ACE_Event_Handler * | event_handler_i (void)=0 |
bool | is_connected (void) const |
Is this transport really connected. | |
bool | post_open (size_t id) |
Perform all the actions when this transport get opened. | |
TAO_Connection_Handler * | connection_handler (void) |
Get the connection handler for this transport. | |
TAO_OutputCDR & | out_stream (void) |
Accessor for the output CDR stream. | |
int | generate_locate_request (TAO_Target_Specification &spec, TAO_Operation_Details &opdetails, TAO_OutputCDR &output) |
virtual int | generate_request_header (TAO_Operation_Details &opd, TAO_Target_Specification &spec, TAO_OutputCDR &msg) |
int | recache_transport (TAO_Transport_Descriptor_Interface *desc) |
Recache ourselves in the cache. | |
virtual int | handle_input (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time=0, int block=0) |
Callback to read incoming data. | |
virtual int | send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, int message_semantics, ACE_Time_Value *max_time_wait)=0 |
virtual int | send_message (TAO_OutputCDR &stream, TAO_Stub *stub=0, int message_semantics=TAO_Transport::TAO_TWOWAY_REQUEST, ACE_Time_Value *max_time_wait=0)=0 |
virtual int | send_message_shared (TAO_Stub *stub, int message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
Sent the contents of message_block. | |
int | format_queue_message (TAO_OutputCDR &stream) |
Format and queue a message for stream. | |
int | send_message_block_chain (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time=0) |
Send a message block chain,. | |
int | send_message_block_chain_i (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time) |
Send a message block chain, assuming the lock is held. | |
int | purge_entry (void) |
Cache management. | |
int | make_idle (void) |
Cache management. | |
int | update_transport (void) |
Cache management. | |
int | handle_timeout (const ACE_Time_Value ¤t_time, const void *act) |
size_t | recv_buffer_size (void) const |
Accessor to recv_buffer_size_. | |
size_t | sent_byte_count (void) const |
Accessor to sent_byte_count_. | |
TAO_Codeset_Translator_Base * | char_translator (void) const |
CodeSet Negotiation - Get the char codeset translator factory. | |
TAO_Codeset_Translator_Base * | wchar_translator (void) const |
CodeSet Negotiation - Get the wchar codeset translator factory. | |
void | char_translator (TAO_Codeset_Translator_Base *) |
CodeSet negotiation - Set the char codeset translator factory. | |
void | wchar_translator (TAO_Codeset_Translator_Base *) |
CodeSet negotiation - Set the wchar codeset translator factory. | |
void | assign_translators (TAO_InputCDR *, TAO_OutputCDR *) |
void | clear_translators (TAO_InputCDR *, TAO_OutputCDR *) |
CORBA::Boolean | is_tcs_set () const |
Return true if the tcs has been set. | |
void | first_request_sent () |
Set the state of the first_request_ flag to 0. | |
void | send_connection_closed_notifications (void) |
virtual TAO_Connection_Handler * | connection_handler_i (void)=0 |
int | process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) |
int | send_message_shared_i (TAO_Stub *stub, int message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
int | queue_message_i (const ACE_Message_Block *message_block) |
Queue a message for message_block. | |
CORBA::ULong const | tag_ |
IOP protocol tag. | |
TAO_ORB_Core *const | orb_core_ |
Global orbcore resource. | |
TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * | cache_map_entry_ |
TAO_Transport_Mux_Strategy * | tms_ |
TAO_Wait_Strategy * | ws_ |
Strategy for waiting for the reply after sending the request. | |
int | bidirectional_flag_ |
TAO::Connection_Role | opening_connection_role_ |
TAO_Queued_Message * | head_ |
Implement the outgoing data queue. | |
TAO_Queued_Message * | tail_ |
TAO_Incoming_Message_Queue | incoming_message_queue_ |
Queue of the consolidated, incoming messages.. | |
TAO::Incoming_Message_Stack | incoming_message_stack_ |
ACE_Time_Value | current_deadline_ |
long | flush_timer_id_ |
The timer ID. | |
TAO_Transport_Timer | transport_timer_ |
The adapter used to receive timeout callbacks from the Reactor. | |
ACE_Lock * | handler_lock_ |
size_t | id_ |
A unique identifier for the transport. | |
unsigned long | purging_order_ |
Used by the LRU, LFU and FIFO Connection Purging Strategies. | |
size_t | recv_buffer_size_ |
Size of the buffer received. | |
size_t | sent_byte_count_ |
Number of bytes sent. | |
bool | is_connected_ |
TAO::Transport_Cache_Manager & | transport_cache_manager (void) |
Helper method that returns the Transport Cache Manager. | |
int | drain_queue (void) |
Send some of the data in the queue. | |
int | drain_queue_i (void) |
Implement drain_queue() assuming the lock is held. | |
int | queue_is_empty_i (void) |
Check if there are messages pending in the queue. | |
int | drain_queue_helper (int &iovcnt, iovec iov[]) |
A helper routine used in drain_queue_i(). | |
int | schedule_output_i (void) |
Schedule handle_output() callbacks. | |
int | cancel_output_i (void) |
Cancel handle_output() callbacks. | |
void | cleanup_queue (size_t byte_count) |
Cleanup the queue. | |
void | cleanup_queue_i () |
Cleanup the complete queue. | |
int | check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush) |
Check if the buffering constraints have been reached. | |
int | send_synchronous_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
int | send_reply_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
int | send_asynchronous_message_i (TAO_Stub *stub, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
int | send_synch_message_helper_i (TAO_Synch_Queued_Message &s, ACE_Time_Value *max_wait_time) |
int | flush_timer_pending (void) const |
Check if the flush timer is still pending. | |
void | reset_flush_timer (void) |
void | report_invalid_event_handler (const char *caller) |
Print out error messages if the event handler is not valid. | |
int | handle_input_missing_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time, TAO_Queued_Data *q_data) |
int | handle_input_parse_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time) |
int | handle_input_parse_extra_messages (ACE_Message_Block &message_block) |
int | consolidate_enqueue_message (TAO_Queued_Data *qd) |
-1 error, otherwise 0 | |
int | consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) |
-1 error, otherwise 0 | |
int | process_queue_head (TAO_Resume_Handle &rh) |
int | notify_reactor (void) |
void | send_connection_closed_notifications_i (void) |
Assume the lock is held. | |
void | allocate_partial_message_block (void) |
TAO_Transport (const TAO_Transport &) | |
void | operator= (const TAO_Transport &) |
TAO_Codeset_Translator_Base * | char_translator_ |
Additional member values required to support codeset translation. | |
TAO_Codeset_Translator_Base * | wchar_translator_ |
CORBA::Boolean | tcs_set_ |
CORBA::Boolean | first_request_ |
ACE_Message_Block * | partial_message_ |
Holds the partial GIOP message (if there is one). | |
class | TAO_Block_Flushing_Strategy |
class | TAO_Reactive_Flushing_Strategy |
class | TAO_Leader_Follower_Flushing_Strategy |
class | TAO_Thread_Per_Connection_Handler |
Public Types | |
Public Member Functions | |
TAO_Transport (CORBA::ULong tag, TAO_ORB_Core *orb_core) | |
Default creator, requires the tag value be supplied. | |
virtual | ~TAO_Transport (void) |
Destructor. | |
CORBA::ULong | tag (void) const |
Return the protocol tag. | |
TAO_ORB_Core * | orb_core (void) const |
Access the ORB that owns this connection. | |
TAO_Transport_Mux_Strategy * | tms (void) const |
Get the TAO_Tranport_Mux_Strategy used by this object. | |
TAO_Wait_Strategy * | wait_strategy (void) const |
Return the TAO_Wait_Strategy used by this object. | |
int | handle_output (void) |
Callback method to reactively drain the outgoing data queue. | |
int | bidirectional_flag (void) const |
Get the bidirectional flag. | |
void | bidirectional_flag (int flag) |
Set the bidirectional flag. | |
void | cache_map_entry (TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *entry) |
Set the Cache Map entry. | |
TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * | cache_map_entry (void) |
Get the Cache Map entry. | |
size_t | id (void) const |
Set and Get the identifier for this transport instance. | |
void | id (size_t id) |
TAO::Connection_Role | opened_as (void) const |
void | opened_as (TAO::Connection_Role) |
unsigned long | purging_order (void) const |
void | purging_order (unsigned long value) |
int | queue_is_empty (void) |
Check if there are messages pending in the queue. | |
void | provide_handler (TAO::Connection_Handler_Set &handlers) |
Added event handler to the handlers set. | |
bool | provide_blockable_handler (TAO::Connection_Handler_Set &handlers) |
virtual int | register_handler (void) |
Register the handler with the reactor. | |
virtual ssize_t | send (iovec *iov, int iovcnt, size_t &bytes_transferred, const ACE_Time_Value *timeout=0)=0 |
Write the complete Message_Block chain to the connection. | |
virtual ssize_t | recv (char *buffer, size_t len, const ACE_Time_Value *timeout=0)=0 |
Read len bytes from into buf. | |
Control connection lifecycle | |
These methods are routed through the TMS object. The TMS strategies implement them correctly. | |
bool | idle_after_send (void) |
bool | idle_after_reply (void) |
virtual void | close_connection (void) |
Call the implementation method after obtaining the lock. | |
Template methods | |
The Transport class uses the Template Method Pattern to implement the protocol specific functionality. Implementors of a pluggable protocol should override the following methods with the semantics documented below. | |
virtual int | messaging_init (CORBA::Octet major, CORBA::Octet minor)=0 |
virtual int | tear_listen_point_list (TAO_InputCDR &cdr) |
virtual bool | post_connect_hook (void) |
Hooks that can be overridden in concrete transports. | |
ACE_Event_Handler::Reference_Count | add_reference (void) |
Memory management routines. | |
ACE_Event_Handler::Reference_Count | remove_reference (void) |
virtual TAO_Pluggable_Messaging * | messaging_object (void)=0 |
The transport object is created in the Service handler constructor and deleted in the Service Handler's destructor!!
The main responsability of a Transport object is to encapsulate a connection, and provide a transport independent way to send and receive data. Since TAO is heavily based on the Reactor for all if not all its I/O the Transport class is usually implemented with a helper Connection Handler that adapts the generic Transport interface to the Reactor types.
One of the responsibilities of the TAO_Transport class is to send out GIOP messages as efficiently as possible. In most cases messages are put out in FIFO order, the transport object will put out the message using a single system call and return control to the application. However, for oneways and AMI requests it may be more efficient (or required if the SYNC_NONE policy is in effect) to queue the messages until a large enough data set is available. Another reason to queue is that some applications cannot block for I/O, yet they want to send messages so large that a single write() operation would not be able to cope with them. In such cases we need to queue the data and use the Reactor to drain the queue.
Therefore, the Transport class may need to use a queue to temporarily hold the messages, and, in some configurations, it may need to use the Reactor to concurrently drain such queues.
TAO provides explicit policies to send 'urgent' messages. Such messages may put at the head of the queue. However, they cannot be sent immediately because the transport may already be sending another message in a reactive fashion.
Consequently, the Transport must also know if the head of the queue has been partially sent. In that case new messages can only follow the head. Only once the head is completely sent we can start sending new messages.
One or more threads can be blocked waiting for the connection to completely send the message. The thread should return as soon as its message has been sent, so a per-thread condition is required. This suggest that simply using a ACE_Message_Queue would not be enough: there is a significant amount of ancillary information, to keep on each message that the Message_Block class does not provide room for.
Blocking I/O is still attractive for some applications. First, my eliminating the Reactor overhead performance is improved when sending large blocks of data. Second, using the Reactor to send out data opens the door for nested upcalls, yet some applications cannot deal with the reentrancy issues in this case.
Some or all messages could have a timeout period attached to them. The timeout source could either be some high-level policy or maybe some strategy to prevent denial of service attacks. In any case the timeouts are per-message, and later messages could have shorter timeouts. In fact, some kind of scheduling (such as EDF) could be required in a few applications.
The outgoing data path consist in several components:
The Transport object provides a single method to send request messages (send_request_message ()).
One of the main responsibilities of the transport is to read and process the incoming GIOP message as quickly and efficiently as possible. There are other forces that needs to be given due consideration. They are
The messages should be checked for validity and the right information should be sent to the higher layer for processing. The process of doing a sanity check and preparing the messages for the higher layers of the ORB are done by the messaging protocol.
To keep things as efficient as possible for medium sized requests, it would be good to minimise data copying and locking along the incoming path ie. from the time of reading the data from the handle to the application. We achieve this by creating a buffer on stack and reading the data from the handle into the buffer. We then pass the same data block (the buffer is encapsulated into a data block) to the higher layers of the ORB. The problems stem from the following (a) Data is bigger than the buffer that we have on stack (b) Transports like TCP do not guarantee availability of the whole chunk of data in one shot. Data could trickle in byte by byte. (c) Single read gives multiple messages
We solve the problems as follows
(a) First do a read with the buffer on stack. Query the underlying messaging object whether the message has any incomplete portion. If so, data will be copied into new buffer being able to hold full message and is queued; succeeding events will read data from socket and write directly into this buffer. Otherwise, if if the message in local buffer is complete, we free the handle and then send the message to the higher layers of the ORB for processing.
(b) If buffer with incomplete message has been enqueued, while trying to do the above, the reactor will call us back when the handle becomes read ready. The read-operation will copy data directly into the enqueued buffer. If the message has bee read completely the message is sent to the higher layers of the ORB for processing.
(c) If we get multiple messages (possible if the client connected to the server sends oneways or AMI requests), we parse and split the messages. Every message is put in the queue. Once the messages are queued, the thread picks up one message to send to the higher layers of the ORB. Before doing that, if it finds more messages, it sends a notify to the reactor without resuming the handle. The next thread picks up a message from the queue and processes that. Once the queue is drained the last thread resumes the handle.
We could use the outgoing path of the ORB to send replies. This would allow us to reuse most of the code in the outgoing data path. We were doing this till TAO-1.2.3. We run in to problems. When writing the reply the ORB gets flow controlled, and the ORB tries to flush the message by going into the reactor. This resulted in unnecessary nesting. The thread that gets into the Reactor could potentially handle other messages (incoming or outgoing) and the stack starts growing leading to crashes.
The solution that we (plan to) adopt is pretty straight forward. The thread sending replies will not block to send the replies but queue the replies and return to the Reactor. (Note the careful usages of the terms "blocking in the Reactor" as opposed to "return back to the Reactor".
See Also:
http://cvs.doc.wustl.edu/ace-latest.cgi/ACE_wrappers/TAO/docs/pluggable_protocols/index.html
Definition at line 237 of file Transport.h.
|
Definition at line 592 of file Transport.h.
00593 { 00594 TAO_ONEWAY_REQUEST = 0, 00595 TAO_TWOWAY_REQUEST = 1, 00596 TAO_REPLY 00597 }; |
|
Default creator, requires the tag value be supplied.
Definition at line 118 of file Transport.cpp. References TAO_ORB_Core::client_factory(), TAO_Client_Strategy_Factory::create_transport_mux_strategy(), and TAO_Client_Strategy_Factory::create_wait_strategy().
00120 : tag_ (tag) 00121 , orb_core_ (orb_core) 00122 , cache_map_entry_ (0) 00123 , bidirectional_flag_ (-1) 00124 , opening_connection_role_ (TAO::TAO_UNSPECIFIED_ROLE) 00125 , head_ (0) 00126 , tail_ (0) 00127 , incoming_message_queue_ (orb_core) 00128 , current_deadline_ (ACE_Time_Value::zero) 00129 , flush_timer_id_ (-1) 00130 , transport_timer_ (this) 00131 , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ()) 00132 , id_ ((size_t) this) 00133 , purging_order_ (0) 00134 , recv_buffer_size_ (0) 00135 , sent_byte_count_ (0) 00136 , is_connected_ (false) 00137 , char_translator_ (0) 00138 , wchar_translator_ (0) 00139 , tcs_set_ (0) 00140 , first_request_ (1) 00141 , partial_message_ (0) 00142 #ifdef ACE_HAS_SENDFILE 00143 // The ORB has been configured to use the MMAP allocator, meaning 00144 // we could/should use sendfile() to send data. Cast once rather 00145 // here rather than during each send. This assumes that all 00146 // TAO_OutputCDR instances are using the same TAO_MMAP_Allocator 00147 // instance as the underlying output CDR buffer allocator. 00148 , mmap_allocator_ ( 00149 dynamic_cast<TAO_MMAP_Allocator *> ( 00150 orb_core->output_cdr_buffer_allocator ())) 00151 #endif /* ACE_HAS_SENDFILE */ 00152 { 00153 TAO_Client_Strategy_Factory *cf = 00154 this->orb_core_->client_factory (); 00155 00156 // Create WS now. 00157 this->ws_ = cf->create_wait_strategy (this); 00158 00159 // Create TMS now. 00160 this->tms_ = cf->create_transport_mux_strategy (this); 00161 00162 /* 00163 * Hook to add code that initializes components that 00164 * belong to the concrete protocol implementation. 00165 * Further additions to this Transport class will 00166 * need to add code *before* this hook. 00167 */ 00168 //@@ TAO_TRANSPORT_SPL_CONSTRUCTOR_ADD_HOOK 00169 } |
|
Destructor.
Definition at line 171 of file Transport.cpp. References ACE_ASSERT, cleanup_queue_i(), handler_lock_, is_connected_, purge_entry(), and ACE_Message_Block::release().
00172 { 00173 delete this->ws_; 00174 00175 delete this->tms_; 00176 00177 delete this->handler_lock_; 00178 00179 if (!this->is_connected_) 00180 { 00181 // When we have a not connected transport we could have buffered 00182 // messages on this transport which we have to cleanup now. 00183 this->cleanup_queue_i(); 00184 00185 // Cleanup our cache entry 00186 this->purge_entry(); 00187 } 00188 00189 // Release the partial message block, however we may 00190 // have never allocated one. 00191 ACE_Message_Block::release (this->partial_message_); 00192 00193 // By the time the destructor is reached here all the connection stuff 00194 // *must* have been cleaned up. 00195 00196 // The following assert is needed for the test "Bug_2494_Regression". 00197 // See the bugzilla bug #2494 for details. 00198 ACE_ASSERT (this->head_ == 0); 00199 ACE_ASSERT (this->cache_map_entry_ == 0); 00200 00201 /* 00202 * Hook to add code that cleans up components 00203 * belong to the concrete protocol implementation. 00204 * Further additions to this Transport class will 00205 * need to add code *before* this hook. 00206 */ 00207 //@@ TAO_TRANSPORT_SPL_DESTRUCTOR_ADD_HOOK 00208 } |
|
|
|
Memory management routines.
Definition at line 2352 of file Transport.cpp. References ACE_Event_Handler::add_reference(), and event_handler_i(). Referenced by TAO::Cache_IntId::Cache_IntId(), TAO::Cache_IntId::operator=(), provide_blockable_handler(), provide_handler(), TAO::Transport_Cache_Manager::purge(), TAO_Thread_Per_Connection_Handler::TAO_Thread_Per_Connection_Handler(), and TAO_Asynch_Reply_Dispatcher_Base::transport().
02353 { 02354 return this->event_handler_i ()->add_reference (); 02355 } |
|
Allocate a partial message block and store it in our partial_message_ data member. Definition at line 2421 of file Transport.cpp. References ACE_NEW, TAO_Pluggable_Messaging::header_length(), messaging_object(), and partial_message_. Referenced by handle_input_parse_data().
02422 { 02423 if (this->partial_message_ == 0) 02424 { 02425 // This value must be at least large enough to hold a GIOP message 02426 // header plus a GIOP fragment header 02427 const size_t partial_message_size = 02428 this->messaging_object ()->header_length (); 02429 // + this->messaging_object ()->fragment_header_length (); 02430 // deprecated, conflicts with not-single_read_opt. 02431 02432 ACE_NEW (this->partial_message_, 02433 ACE_Message_Block (partial_message_size)); 02434 } 02435 } |
|
Use the Transport's codeset factories to set the translator for input and output CDRs. Definition at line 2322 of file Transport.cpp. References TAO_Codeset_Translator_Base::assign(). Referenced by TAO::LocateRequest_Invocation::check_reply(), TAO::Synch_Twoway_Invocation::check_reply_status(), TAO_ServerRequest::init_reply(), TAO_GIOP_Message_Lite::process_request(), TAO_GIOP_Message_Base::process_request(), TAO_GIOP_Message_Lite::process_request_message(), TAO_GIOP_Message_Base::process_request_message(), TAO_ServerRequest::send_cached_reply(), TAO_GIOP_Message_Lite::send_reply_exception(), TAO_ServerRequest::tao_send_reply_exception(), and TAO::Remote_Invocation::write_header().
02323 { 02324 if (this->char_translator_) 02325 { 02326 this->char_translator_->assign (inp); 02327 this->char_translator_->assign (outp); 02328 } 02329 if (this->wchar_translator_) 02330 { 02331 this->wchar_translator_->assign (inp); 02332 this->wchar_translator_->assign (outp); 02333 } 02334 } |
|
Set the bidirectional flag.
Definition at line 39 of file Transport.inl. References bidirectional_flag_.
00040 { 00041 this->bidirectional_flag_ = flag; 00042 } |
|
Get the bidirectional flag.
Definition at line 33 of file Transport.inl. References bidirectional_flag_. Referenced by TAO::Wait_On_LF_No_Upcall::can_process_upcalls(), TAO_IIOP_Transport::generate_request_header(), TAO_Muxed_TMS::request_id(), TAO_Exclusive_TMS::request_id(), and TAO_IIOP_Transport::tear_listen_point_list().
00034 { 00035 return this->bidirectional_flag_; 00036 } |
|
Get the Cache Map entry.
Definition at line 57 of file Transport.inl. References cache_map_entry_.
00058 { 00059 return this->cache_map_entry_; 00060 } |
|
Set the Cache Map entry.
Definition at line 63 of file Transport.inl. References cache_map_entry_, and TAO::Transport_Cache_Manager::HASH_MAP_ENTRY. Referenced by TAO::Transport_Cache_Manager::bind_i().
00065 { 00066 this->cache_map_entry_ = entry; 00067 } |
|
Cancel handle_output() callbacks.
Definition at line 759 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, ACE_Reactor::cancel_wakeup(), event_handler_i(), LM_DEBUG, ACE_Event_Handler::reactor(), and TAO_debug_level. Referenced by TAO_Reactive_Flushing_Strategy::cancel_output(), and TAO_Leader_Follower_Flushing_Strategy::cancel_output().
00760 { 00761 ACE_Event_Handler * const eh = this->event_handler_i (); 00762 ACE_Reactor *const reactor = eh->reactor (); 00763 00764 if (TAO_debug_level > 3) 00765 { 00766 ACE_DEBUG ((LM_DEBUG, 00767 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"), 00768 this->id ())); 00769 } 00770 00771 return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK); 00772 } |
|
CodeSet negotiation - Set the char codeset translator factory.
Definition at line 137 of file Transport.inl. References tcs_set_.
00138 { 00139 this->char_translator_ = tf; 00140 this->tcs_set_ = 1; 00141 } |
|
CodeSet Negotiation - Get the char codeset translator factory.
Definition at line 125 of file Transport.inl.
00126 { 00127 return this->char_translator_; 00128 } |
|
Check if the buffering constraints have been reached.
Definition at line 1056 of file Transport.cpp. References ACE_Reactor::cancel_timer(), current_deadline_, event_handler_i(), flush_timer_id_, flush_timer_pending(), ACE_OS::gettimeofday(), TAO_Queued_Message::message_length(), TAO_Queued_Message::next(), ACE_Event_Handler::reactor(), ACE_Reactor::schedule_timer(), and TAO_Stub::transport_queueing_strategy(). Referenced by send_asynchronous_message_i().
01058 { 01059 // First let's compute the size of the queue: 01060 size_t msg_count = 0; 01061 size_t total_bytes = 0; 01062 01063 for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) 01064 { 01065 ++msg_count; 01066 total_bytes += i->message_length (); 01067 } 01068 01069 bool set_timer; 01070 ACE_Time_Value new_deadline; 01071 01072 bool constraints_reached = 01073 stub->transport_queueing_strategy (). 01074 buffering_constraints_reached (stub, 01075 msg_count, 01076 total_bytes, 01077 must_flush, 01078 this->current_deadline_, 01079 set_timer, 01080 new_deadline); 01081 01082 // ... set the new timer, also cancel any previous timers ... 01083 if (set_timer) 01084 { 01085 ACE_Event_Handler *eh = this->event_handler_i (); 01086 ACE_Reactor *reactor = eh->reactor (); 01087 this->current_deadline_ = new_deadline; 01088 ACE_Time_Value delay = 01089 new_deadline - ACE_OS::gettimeofday (); 01090 01091 if (this->flush_timer_pending ()) 01092 { 01093 reactor->cancel_timer (this->flush_timer_id_); 01094 } 01095 01096 this->flush_timer_id_ = 01097 reactor->schedule_timer (&this->transport_timer_, 01098 &this->current_deadline_, 01099 delay); 01100 } 01101 01102 return constraints_reached; 01103 } |
|
Cleanup the queue. Exactly byte_count bytes have been sent, the queue must be cleaned up as potentially several messages have been completely sent out. It leaves on head_ the next message to send out. Definition at line 1019 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_Queued_Message::all_data_sent(), TAO_Queued_Message::bytes_transferred(), TAO_Queued_Message::destroy(), LM_DEBUG, TAO_Queued_Message::message_length(), TAO_Queued_Message::remove_from_list(), and TAO_debug_level. Referenced by drain_queue_helper().
01020 { 01021 while (this->head_ != 0 && byte_count > 0) 01022 { 01023 TAO_Queued_Message *i = this->head_; 01024 01025 if (TAO_debug_level > 4) 01026 { 01027 ACE_DEBUG ((LM_DEBUG, 01028 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ") 01029 ACE_TEXT ("byte_count = %d\n"), 01030 this->id (), byte_count)); 01031 } 01032 01033 // Update the state of the first message 01034 i->bytes_transferred (byte_count); 01035 01036 if (TAO_debug_level > 4) 01037 { 01038 ACE_DEBUG ((LM_DEBUG, 01039 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ") 01040 ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"), 01041 this->id (), byte_count, i->all_data_sent (), 01042 i->message_length ())); 01043 } 01044 01045 // ... if all the data was sent the message must be removed from 01046 // the queue... 01047 if (i->all_data_sent ()) 01048 { 01049 i->remove_from_list (this->head_, this->tail_); 01050 i->destroy (); 01051 } 01052 } 01053 } |
|
Cleanup the complete queue.
Definition at line 992 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_Queued_Message::destroy(), LM_DEBUG, TAO_Queued_Message::remove_from_list(), TAO_LF_Event::state_changed(), and TAO_debug_level. Referenced by send_connection_closed_notifications_i(), and ~TAO_Transport().
00993 { 00994 if (TAO_debug_level > 4) 00995 { 00996 ACE_DEBUG ((LM_DEBUG, 00997 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ") 00998 ACE_TEXT ("cleaning up complete queue\n"), 00999 this->id ())); 01000 } 01001 01002 // Cleanup all messages 01003 while (this->head_ != 0) 01004 { 01005 TAO_Queued_Message *i = this->head_; 01006 01007 // @@ This is a good point to insert a flag to indicate that a 01008 // CloseConnection message was successfully received. 01009 i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED, 01010 this->orb_core_->leader_follower ()); 01011 01012 i->remove_from_list (this->head_, this->tail_); 01013 01014 i->destroy (); 01015 } 01016 } |
|
It is necessary to clear the codeset translator when a CDR stream is used for more than one GIOP message. This is required since the header must not be translated, whereas the body must be. Definition at line 2337 of file Transport.cpp. References ACE_OutputCDR::char_translator(), ACE_InputCDR::char_translator(), ACE_OutputCDR::wchar_translator(), and ACE_InputCDR::wchar_translator(). Referenced by TAO::Remote_Invocation::write_header().
02338 { 02339 if (inp) 02340 { 02341 inp->char_translator (0); 02342 inp->wchar_translator (0); 02343 } 02344 if (outp) 02345 { 02346 outp->char_translator (0); 02347 outp->wchar_translator (0); 02348 } 02349 } |
|
Call the implementation method after obtaining the lock.
Definition at line 293 of file Transport.cpp. References TAO_Connection_Handler::close_connection(), and connection_handler_i(). Referenced by TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), post_open(), TAO::Transport_Cache_Manager::purge(), TAO_GIOP_Message_Base::send_close_connection(), TAO::Remote_Invocation::send_message(), send_message_shared(), TAO_IIOP_Transport::send_message_shared(), and TAO_Wait_On_Read::wait().
00294 { 00295 this->connection_handler_i ()->close_connection (); 00296 } |
|
Get the connection handler for this transport.
Definition at line 175 of file Transport.inl. References connection_handler_i(). Referenced by TAO_IIOP_Connector::complete_connection(), TAO::Remote_Invocation::send_message(), TAO_Connect_Strategy::wait(), and TAO_Connector::wait_for_connection_completion().
00176 { 00177 return this->connection_handler_i(); 00178 } |
|
Implemented in TAO_IIOP_Transport. Referenced by close_connection(), and connection_handler(). |
|
-1 error, otherwise 0
Definition at line 1491 of file Transport.cpp. References ACE_ERROR, ACE_TEXT, TAO_Pluggable_Messaging::consolidate_fragmented_message(), TAO_Incoming_Message_Queue::enqueue_tail(), incoming_message_queue_, LM_ERROR, messaging_object(), TAO_Queued_Data::missing_data_, TAO_Queued_Data::more_fragments_, TAO_Queued_Data::msg_type_, TAO_Queued_Data::release(), TAO_debug_level, and TAO_PLUGGABLE_MESSAGE_FRAGMENT. Referenced by handle_input_parse_data(), and handle_input_parse_extra_messages().
01492 { 01493 // consolidate message on top of stack, only for fragmented messages 01494 01495 // paranoid check 01496 if (q_data->missing_data_ != 0) 01497 { 01498 return -1; 01499 } 01500 01501 if (q_data->more_fragments_ || 01502 q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) 01503 { 01504 TAO_Queued_Data *new_q_data = 0; 01505 01506 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data)) 01507 { 01508 case -1: // error 01509 return -1; 01510 01511 case 0: // returning consolidated message in new_q_data 01512 if (!new_q_data) 01513 { 01514 if (TAO_debug_level > 0) 01515 { 01516 ACE_ERROR ((LM_ERROR, 01517 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ") 01518 ACE_TEXT ("error, consolidated message is NULL\n"), 01519 this->id ())); 01520 } 01521 return -1; 01522 } 01523 01524 if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0) 01525 { 01526 TAO_Queued_Data::release (new_q_data); 01527 return -1; 01528 } 01529 break; 01530 01531 case 1: // fragment has been stored in messaging_oject() 01532 break; 01533 } 01534 } 01535 else 01536 { 01537 if (this->incoming_message_queue_.enqueue_tail (q_data) != 0) 01538 { 01539 TAO_Queued_Data::release (q_data); 01540 return -1; 01541 } 01542 } 01543 01544 return 0; // success 01545 } |
|
-1 error, otherwise 0
Definition at line 1404 of file Transport.cpp. References ACE_ERROR, ACE_TEXT, TAO_Pluggable_Messaging::consolidate_fragmented_message(), LM_ERROR, messaging_object(), TAO_Queued_Data::missing_data_, TAO_Queued_Data::more_fragments_, TAO_Queued_Data::msg_type_, process_parsed_messages(), TAO_Queued_Data::release(), TAO_debug_level, and TAO_PLUGGABLE_MESSAGE_FRAGMENT. Referenced by handle_input_missing_data().
01406 { 01407 // paranoid check 01408 if (q_data->missing_data_ != 0) 01409 { 01410 if (TAO_debug_level > 0) 01411 { 01412 ACE_ERROR ((LM_ERROR, 01413 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") 01414 ACE_TEXT ("missing data\n"), 01415 this->id ())); 01416 } 01417 return -1; 01418 } 01419 01420 if (q_data->more_fragments_ || 01421 q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) 01422 { 01423 // consolidate message on top of stack, only for fragmented messages 01424 TAO_Queued_Data *new_q_data = 0; 01425 01426 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data)) 01427 { 01428 case -1: // error 01429 return -1; 01430 01431 case 0: // returning consolidated message in q_data 01432 if (!new_q_data) 01433 { 01434 if (TAO_debug_level > 0) 01435 { 01436 ACE_ERROR ((LM_ERROR, 01437 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") 01438 ACE_TEXT ("error, consolidated message is NULL\n"), 01439 this->id ())); 01440 } 01441 return -1; 01442 } 01443 01444 01445 if (this->process_parsed_messages (new_q_data, rh) == -1) 01446 { 01447 TAO_Queued_Data::release (new_q_data); 01448 01449 if (TAO_debug_level > 0) 01450 { 01451 ACE_ERROR ((LM_ERROR, 01452 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") 01453 ACE_TEXT ("error processing consolidated message\n"), 01454 this->id ())); 01455 } 01456 return -1; 01457 } 01458 01459 TAO_Queued_Data::release (new_q_data); 01460 01461 break; 01462 01463 case 1: // fragment has been stored in messaging_oject() 01464 break; 01465 } 01466 } 01467 else 01468 { 01469 if (this->process_parsed_messages (q_data, rh) == -1) 01470 { 01471 TAO_Queued_Data::release (q_data); 01472 01473 if (TAO_debug_level > 0) 01474 { 01475 ACE_ERROR ((LM_ERROR, 01476 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") 01477 ACE_TEXT ("error processing message\n"), 01478 this->id ())); 01479 } 01480 return -1; 01481 } 01482 01483 TAO_Queued_Data::release (q_data); 01484 01485 } 01486 01487 return 0; 01488 } |
|
Send some of the data in the queue. As the outgoing data is drained this method is invoked to send as much of the current message as possible. Returns 0 if there is more data to send, -1 if there was an error and 1 if the message was completely sent. Definition at line 807 of file Transport.cpp. References ACE_GUARD_RETURN, TAO_Flushing_Strategy::cancel_output(), drain_queue_i(), TAO_ORB_Core::flushing_strategy(), and orb_core(). Referenced by handle_output().
00808 { 00809 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); 00810 int const retval = this->drain_queue_i (); 00811 00812 if (retval == 1) 00813 { 00814 // ... there is no current message or it was completely 00815 // sent, cancel output... 00816 TAO_Flushing_Strategy *flushing_strategy = 00817 this->orb_core ()->flushing_strategy (); 00818 00819 flushing_strategy->cancel_output (this); 00820 00821 return 0; 00822 } 00823 00824 return retval; 00825 } |
|
A helper routine used in drain_queue_i().
Definition at line 828 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, cleanup_queue(), dump_iov(), EWOULDBLOCK, LM_DEBUG, send(), sent_byte_count_, ssize_t, and TAO_debug_level. Referenced by drain_queue_i().
00829 { 00830 size_t byte_count = 0; 00831 00832 // ... send the message ... 00833 ssize_t retval = -1; 00834 00835 #ifdef ACE_HAS_SENDFILE 00836 if (this->mmap_allocator_) 00837 retval = this->sendfile (this->mmap_allocator_, 00838 iov, 00839 iovcnt, 00840 byte_count); 00841 else 00842 #endif /* ACE_HAS_SENDFILE */ 00843 retval = this->send (iov, iovcnt, byte_count); 00844 00845 if (TAO_debug_level == 5) 00846 { 00847 dump_iov (iov, iovcnt, this->id (), 00848 byte_count, "drain_queue_helper"); 00849 } 00850 00851 // ... now we need to update the queue, removing elements 00852 // that have been sent, and updating the last element if it 00853 // was only partially sent ... 00854 this->cleanup_queue (byte_count); 00855 iovcnt = 0; 00856 00857 if (retval == 0) 00858 { 00859 if (TAO_debug_level > 4) 00860 { 00861 ACE_DEBUG ((LM_DEBUG, 00862 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") 00863 ACE_TEXT ("send() returns 0\n"), 00864 this->id ())); 00865 } 00866 return -1; 00867 } 00868 else if (retval == -1) 00869 { 00870 if (TAO_debug_level > 4) 00871 { 00872 ACE_DEBUG ((LM_DEBUG, 00873 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") 00874 ACE_TEXT ("error during %p\n"), 00875 this->id (), ACE_TEXT ("send()"))); 00876 } 00877 00878 if (errno == EWOULDBLOCK || errno == EAGAIN) 00879 { 00880 return 0; 00881 } 00882 00883 return -1; 00884 } 00885 00886 // ... start over, how do we guarantee progress? Because if 00887 // no bytes are sent send() can only return 0 or -1 00888 00889 // Total no. of bytes sent for a send call 00890 this->sent_byte_count_ += byte_count; 00891 00892 if (TAO_debug_level > 4) 00893 { 00894 ACE_DEBUG ((LM_DEBUG, 00895 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") 00896 ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"), 00897 this->id(), byte_count, (this->head_ == 0))); 00898 } 00899 00900 return 1; 00901 } |
|
Implement drain_queue() assuming the lock is held.
Definition at line 904 of file Transport.cpp. References ACE_DEBUG, ACE_IOV_MAX, ACE_TEXT, ACE_Reactor::cancel_timer(), drain_queue_helper(), event_handler_i(), TAO_Queued_Message::fill_iov(), flush_timer_pending(), LM_DEBUG, TAO_Queued_Message::next(), ACE_Event_Handler::reactor(), reset_flush_timer(), sent_byte_count_, and TAO_debug_level. Referenced by drain_queue(), TAO_Block_Flushing_Strategy::schedule_output(), send_message_block_chain_i(), and send_synch_message_helper_i().
00905 { 00906 // This is the vector used to send data, it must be declared outside 00907 // the loop because after the loop there may still be data to be 00908 // sent 00909 int iovcnt = 0; 00910 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) 00911 iovec iov[ACE_IOV_MAX] = { 0 , 0 }; 00912 #else 00913 iovec iov[ACE_IOV_MAX]; 00914 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ 00915 00916 // We loop over all the elements in the queue ... 00917 TAO_Queued_Message *i = this->head_; 00918 00919 // Reset the value so that the counting is done for each new send 00920 // call. 00921 this->sent_byte_count_ = 0; 00922 00923 while (i != 0) 00924 { 00925 // ... each element fills the iovector ... 00926 i->fill_iov (ACE_IOV_MAX, iovcnt, iov); 00927 00928 // ... the vector is full, no choice but to send some data out. 00929 // We need to loop because a single message can span multiple 00930 // IOV_MAX elements ... 00931 if (iovcnt == ACE_IOV_MAX) 00932 { 00933 int const retval = 00934 this->drain_queue_helper (iovcnt, iov); 00935 00936 if (TAO_debug_level > 4) 00937 { 00938 ACE_DEBUG ((LM_DEBUG, 00939 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ") 00940 ACE_TEXT ("helper retval = %d\n"), 00941 this->id (), retval)); 00942 } 00943 00944 if (retval != 1) 00945 { 00946 return retval; 00947 } 00948 00949 i = this->head_; 00950 continue; 00951 } 00952 // ... notice that this line is only reached if there is still 00953 // room in the iovector ... 00954 i = i->next (); 00955 } 00956 00957 if (iovcnt != 0) 00958 { 00959 int const retval = this->drain_queue_helper (iovcnt, iov); 00960 00961 if (TAO_debug_level > 4) 00962 { 00963 ACE_DEBUG ((LM_DEBUG, 00964 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ") 00965 ACE_TEXT ("helper retval = %d\n"), 00966 this->id (), retval)); 00967 } 00968 00969 if (retval != 1) 00970 { 00971 return retval; 00972 } 00973 } 00974 00975 if (this->head_ == 0) 00976 { 00977 if (this->flush_timer_pending ()) 00978 { 00979 ACE_Event_Handler *eh = this->event_handler_i (); 00980 ACE_Reactor * const reactor = eh->reactor (); 00981 reactor->cancel_timer (this->flush_timer_id_); 00982 this->reset_flush_timer (); 00983 } 00984 00985 return 1; 00986 } 00987 00988 return 0; 00989 } |
|
Normally a concrete TAO_Transport object has-a ACE_Event_Handler member that functions as an adapter between the ACE_Reactor framework and the TAO pluggable protocol framework. In all the protocols implemented so far this role is fullfilled by an instance of ACE_Svc_Handler.
Implemented in TAO_IIOP_Transport. Referenced by add_reference(), cancel_output_i(), check_buffering_constraints_i(), drain_queue_i(), TAO::Transport_Cache_Manager::find_transport(), notify_reactor(), register_handler(), remove_reference(), schedule_output_i(), TAO_Connection_Handler::transport(), and TAO_Wait_On_Read::wait(). |
|
Set the state of the first_request_ flag to 0.
Definition at line 158 of file Transport.inl. References first_request_. Referenced by TAO_IIOP_Transport::send_request().
00159 { 00160 this->first_request_ = 0; 00161 } |
|
Check if the flush timer is still pending.
Definition at line 108 of file Transport.inl. References flush_timer_id_. Referenced by check_buffering_constraints_i(), drain_queue_i(), and handle_timeout().
00109 { 00110 return this->flush_timer_id_ != -1; 00111 } |
|
Format and queue a message for stream.
Definition at line 481 of file Transport.cpp. References ACE_OutputCDR::begin(), TAO_Pluggable_Messaging::format_message(), messaging_object(), and queue_message_i(). Referenced by TAO::Synch_Oneway_Invocation::remote_oneway().
00482 { 00483 if (this->messaging_object ()->format_message (stream) != 0) 00484 return -1; 00485 00486 return this->queue_message_i (stream.begin()); 00487 } |
|
This is a request for the transport object to write a LocateRequest header before it is sent out. Definition at line 352 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_Pluggable_Messaging::generate_locate_request_header(), LM_DEBUG, messaging_object(), and TAO_debug_level. Referenced by TAO::LocateRequest_Invocation::invoke().
00356 { 00357 if (this->messaging_object ()->generate_locate_request_header (opdetails, 00358 spec, 00359 output) 00360 == -1) 00361 { 00362 if (TAO_debug_level > 0) 00363 { 00364 ACE_DEBUG ((LM_DEBUG, 00365 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ") 00366 ACE_TEXT ("error while marshalling the LocateRequest header\n"), 00367 this->id ())); 00368 } 00369 00370 return -1; 00371 } 00372 00373 return 0; 00374 } |
|
This is a request for the transport object to write a request header before it sends out the request Reimplemented in TAO_IIOP_Transport. Definition at line 377 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_ORB_Core::codeset_manager(), first_request_, TAO_Pluggable_Messaging::generate_request_header(), TAO_Codeset_Manager::generate_service_context(), LM_DEBUG, messaging_object(), orb_core(), and TAO_debug_level. Referenced by TAO_IIOP_Transport::generate_request_header(), and TAO::Remote_Invocation::write_header().
00381 { 00382 // codeset service context is only supposed to be sent in the first request 00383 // on a particular connection. 00384 if (this->first_request_) 00385 { 00386 TAO_Codeset_Manager * const csm = this->orb_core ()->codeset_manager (); 00387 if (csm) 00388 csm->generate_service_context (opdetails,*this); 00389 } 00390 00391 if (this->messaging_object ()->generate_request_header (opdetails, 00392 spec, 00393 output) == -1) 00394 { 00395 if (TAO_debug_level > 0) 00396 { 00397 ACE_DEBUG ((LM_DEBUG, 00398 ACE_TEXT ("(%P|%t) - Transport[%d]::generate_request_header, ") 00399 ACE_TEXT ("error while marshalling the Request header\n"), 00400 this->id())); 00401 } 00402 00403 return -1; 00404 } 00405 00406 return 0; 00407 } |
|
Callback to read incoming data. The ACE_Event_Handler adapter invokes this method as part of its handle_input() operation.
Definition at line 1329 of file Transport.cpp. References ACE_DEBUG, ACE_ERROR, ACE_TEXT, handle_input_missing_data(), handle_input_parse_data(), incoming_message_stack_, LM_DEBUG, LM_ERROR, TAO_Queued_Data::missing_data_, process_queue_head(), TAO_debug_level, TAO_MISSING_DATA_UNDEFINED, and TAO::Incoming_Message_Stack::top(). Referenced by TAO_Connection_Handler::handle_input_internal(), TAO_Connection_Handler::svc_i(), and TAO_Wait_On_Read::wait().
01332 { 01333 if (TAO_debug_level > 3) 01334 { 01335 ACE_DEBUG ((LM_DEBUG, 01336 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"), 01337 this->id ())); 01338 } 01339 01340 // First try to process messages of the head of the incoming queue. 01341 int const retval = this->process_queue_head (rh); 01342 01343 if (retval <= 0) 01344 { 01345 if (retval == -1) 01346 { 01347 if (TAO_debug_level > 2) 01348 { 01349 ACE_DEBUG ((LM_DEBUG, 01350 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") 01351 ACE_TEXT ("error while parsing the head of the queue\n"), 01352 this->id())); 01353 01354 } 01355 return -1; 01356 } 01357 else 01358 { 01359 // retval == 0 01360 01361 // Processed a message in queue successfully. This 01362 // thread must return to thread-pool now. 01363 return 0; 01364 } 01365 } 01366 01367 TAO_Queued_Data *q_data = 0; 01368 01369 if (this->incoming_message_stack_.top (q_data) != -1 01370 && q_data->missing_data_ != TAO_MISSING_DATA_UNDEFINED) 01371 { 01372 /* PRE: q_data->missing_data_ > 0 as all QD on stack must be incomplete */ 01373 if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1) 01374 { 01375 if (TAO_debug_level > 0) 01376 { 01377 ACE_ERROR ((LM_ERROR, 01378 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") 01379 ACE_TEXT ("error consolidating incoming message\n"), 01380 this->id ())); 01381 } 01382 return -1; 01383 } 01384 } 01385 else 01386 { 01387 if (this->handle_input_parse_data (rh, max_wait_time) == -1) 01388 { 01389 if (TAO_debug_level > 0) 01390 { 01391 ACE_ERROR ((LM_ERROR, 01392 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") 01393 ACE_TEXT ("error parsing incoming message\n"), 01394 this->id ())); 01395 } 01396 return -1; 01397 } 01398 } 01399 01400 return 0; 01401 } |
|
Is invoked by handle_input operation. It consolidate message on top of incoming_message_stack. The amount of missing data is known and recv operation copies data directly into message buffer, as much as a single recv-invocation provides. Definition at line 1548 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, consolidate_process_message(), ACE_CDR::grow(), incoming_message_stack_, ACE_Message_Block::length(), LM_DEBUG, TAO_Queued_Data::missing_data_, TAO_Queued_Data::msg_block_, TAO::Incoming_Message_Stack::pop(), recv(), ACE_Message_Block::space(), ssize_t, TAO_debug_level, and ACE_Message_Block::wr_ptr(). Referenced by handle_input().
01551 { 01552 // paranoid check 01553 if (q_data == 0) 01554 { 01555 return -1; 01556 } 01557 01558 if (TAO_debug_level > 3) 01559 { 01560 ACE_DEBUG ((LM_DEBUG, 01561 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ") 01562 ACE_TEXT ("enter (missing data == %d)\n"), 01563 this->id (), q_data->missing_data_)); 01564 } 01565 01566 size_t const recv_size = q_data->missing_data_; 01567 01568 // make sure the message_block has enough space 01569 size_t const message_size = recv_size 01570 + q_data->msg_block_->length(); 01571 01572 if (q_data->msg_block_->space() < recv_size) 01573 { 01574 if (ACE_CDR::grow (q_data->msg_block_, message_size) == -1) 01575 { 01576 return -1; 01577 } 01578 } 01579 01580 // Saving the size of the received buffer in case any one needs to 01581 // get the size of the message thats received in the 01582 // context. Obviously the value will be changed for each recv call 01583 // and the user is supposed to invoke the accessor only in the 01584 // invocation context to get meaningful information. 01585 this->recv_buffer_size_ = recv_size; 01586 01587 // Read the message into the existing message block on heap 01588 ssize_t const n = this->recv (q_data->msg_block_->wr_ptr(), 01589 recv_size, 01590 max_wait_time); 01591 01592 01593 if (n <= 0) 01594 { 01595 return n; 01596 } 01597 01598 if (TAO_debug_level > 3) 01599 { 01600 ACE_DEBUG ((LM_DEBUG, 01601 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ") 01602 ACE_TEXT ("read bytes %d\n"), 01603 this->id (), n)); 01604 } 01605 01606 q_data->msg_block_->wr_ptr(n); 01607 q_data->missing_data_ -= n; 01608 01609 if (q_data->missing_data_ == 0) 01610 { 01611 // paranoid check 01612 if (this->incoming_message_stack_.pop (q_data) == -1) 01613 { 01614 return -1; 01615 } 01616 01617 if (this->consolidate_process_message (q_data, rh) == -1) 01618 { 01619 return -1; 01620 } 01621 } 01622 01623 return 0; 01624 } |
|
Is invoked by handle_input operation. It parses new messages from input stream or consolidates messages whose header has been partially read, the message size being unknown so far. It parses as much data as a single recv-invocation provides. Definition at line 1671 of file Transport.cpp. References ACE_DEBUG, ACE_ERROR, ACE_TEXT, allocate_partial_message_block(), consolidate_enqueue_message(), TAO_Pluggable_Messaging::consolidate_node(), ACE_Message_Block::copy(), TAO_Queued_Data::duplicate(), ACE_CDR::grow(), handle_input_parse_extra_messages(), TAO_Pluggable_Messaging::header_length(), incoming_message_queue_, incoming_message_stack_, TAO_ORB_Core::input_cdr_dblock_allocator(), ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO_ORB_Core::locking_strategy(), ACE_CDR::mb_align(), ACE_OS::memset(), messaging_object(), TAO_Queued_Data::missing_data_, TAO_Queued_Data::more_fragments_, TAO_Queued_Data::msg_block_, TAO_Queued_Data::msg_type_, notify_reactor(), TAO_ORB_Core::orb_params(), TAO_Pluggable_Messaging::parse_next_message(), partial_message_, TAO::Incoming_Message_Stack::pop(), process_parsed_messages(), process_queue_head(), TAO::Incoming_Message_Stack::push(), TAO_Incoming_Message_Queue::queue_length(), ACE_Message_Block::rd_ptr(), recv(), ACE_Message_Block::reset(), TAO_Resume_Handle::set_flag(), TAO_ORB_Parameters::single_read_optimization(), ACE_Message_Block::space(), ssize_t, TAO_debug_level, TAO_MAXBUFSIZE, TAO_MISSING_DATA_UNDEFINED, TAO_PLUGGABLE_MESSAGE_FRAGMENT, TAO::Incoming_Message_Stack::top(), and ACE_Message_Block::wr_ptr(). Referenced by handle_input().
01673 { 01674 01675 if (TAO_debug_level > 3) 01676 { 01677 ACE_DEBUG ((LM_DEBUG, 01678 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 01679 ACE_TEXT ("enter\n"), 01680 this->id ())); 01681 } 01682 01683 01684 // The buffer on the stack which will be used to hold the input 01685 // messages, ACE_CDR::MAX_ALIGNMENT compensates the 01686 // memory-alignment. This improves performance with SUN-Java-ORB-1.4 01687 // and higher that sends fragmented requests of size 1024 bytes. 01688 char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT]; 01689 01690 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) 01691 (void) ACE_OS::memset (buf, 01692 '\0', 01693 sizeof buf); 01694 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ 01695 01696 // Create a data block 01697 ACE_Data_Block db (sizeof (buf), 01698 ACE_Message_Block::MB_DATA, 01699 buf, 01700 this->orb_core_->input_cdr_buffer_allocator (), 01701 this->orb_core_->locking_strategy (), 01702 ACE_Message_Block::DONT_DELETE, 01703 this->orb_core_->input_cdr_dblock_allocator ()); 01704 01705 // Create a message block 01706 ACE_Message_Block message_block (&db, 01707 ACE_Message_Block::DONT_DELETE, 01708 this->orb_core_->input_cdr_msgblock_allocator ()); 01709 01710 01711 // Align the message block 01712 ACE_CDR::mb_align (&message_block); 01713 01714 size_t recv_size = 0; // Note: unsigned integer 01715 01716 // Pointer to newly parsed message 01717 TAO_Queued_Data *q_data = 0; 01718 01719 // optimizing access of constants 01720 const size_t header_length = 01721 this->messaging_object ()->header_length (); 01722 01723 // paranoid check 01724 if (header_length > message_block.space ()) 01725 { 01726 return -1; 01727 } 01728 01729 if (this->orb_core_->orb_params ()->single_read_optimization ()) 01730 { 01731 recv_size = 01732 message_block.space (); 01733 } 01734 else 01735 { 01736 // Single read optimization has been de-activated. That means 01737 // that we need to read from transport the GIOP header first 01738 // before the payload. This codes first checks the incoming 01739 // stack for partial messages which needs to be 01740 // consolidated. Otherwise we are in new cycle, reading complete 01741 // GIOP header of new incoming message. 01742 if (this->incoming_message_stack_.top (q_data) != -1 01743 && q_data->missing_data_ == TAO_MISSING_DATA_UNDEFINED) 01744 { 01745 // There is a partial message on incoming_message_stack_ 01746 // whose length is unknown so far. We need to consolidate 01747 // the GIOP header to get to know the payload size, 01748 recv_size = header_length - q_data->msg_block_->length (); 01749 } 01750 else 01751 { 01752 // Read amount of data forming GIOP header of new incoming 01753 // message. 01754 recv_size = header_length; 01755 } 01756 // POST: 0 <= recv_size <= header_length 01757 } 01758 // POST: 0 <= recv_size <= message_block->space () 01759 01760 // If we have a partial message, copy it into our message block and 01761 // clear out the partial message. 01762 if (this->partial_message_ != 0 && this->partial_message_->length () > 0) 01763 { 01764 // (*) Copy back the partial message into current read-buffer, 01765 // verify that the read-strategy of "recv_size" bytes is not 01766 // exceeded. The latter check guarantees that recv_size does not 01767 // roll-over and keeps in range 01768 // 0<=recv_size<=message_block->space() 01769 if (this->partial_message_->length () <= recv_size && 01770 message_block.copy (this->partial_message_->rd_ptr (), 01771 this->partial_message_->length ()) == 0) 01772 { 01773 01774 recv_size -= this->partial_message_->length (); 01775 this->partial_message_->reset (); 01776 } 01777 else 01778 { 01779 return -1; 01780 } 01781 } 01782 // POST: 0 <= recv_size <= buffer_space 01783 01784 if (0 >= recv_size) // paranoid: the check above (*) guarantees recv_size>=0 01785 { 01786 // This event would cause endless looping, trying frequently to 01787 // read zero bytes from stream. This might happen, if TAOs 01788 // protocol implementation is not correct and tries to read data 01789 // beyond header without "single_read_optimazation" being 01790 // activated. 01791 if (TAO_debug_level > 0) 01792 { 01793 ACE_ERROR ((LM_ERROR, 01794 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 01795 ACE_TEXT ("Error - endless loop detection, closing connection"), 01796 this->id ())); 01797 } 01798 return -1; 01799 } 01800 01801 // Saving the size of the received buffer in case any one needs to 01802 // get the size of the message thats received in the 01803 // context. Obviously the value will be changed for each recv call 01804 // and the user is supposed to invoke the accessor only in the 01805 // invocation context to get meaningful information. 01806 this->recv_buffer_size_ = recv_size; 01807 01808 // Read the message into the message block that we have created on 01809 // the stack. 01810 const ssize_t n = this->recv (message_block.wr_ptr (), 01811 recv_size, 01812 max_wait_time); 01813 01814 // If there is an error return to the reactor.. 01815 if (n <= 0) 01816 { 01817 return n; 01818 } 01819 01820 if (TAO_debug_level > 3) 01821 { 01822 ACE_DEBUG ((LM_DEBUG, 01823 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 01824 ACE_TEXT ("read %d bytes\n"), 01825 this->id (), n)); 01826 } 01827 01828 // Set the write pointer in the stack buffer 01829 message_block.wr_ptr (n); 01830 01831 // 01832 // STACK PROCESSING OR MESSAGE CONSOLIDATION 01833 // 01834 01835 // PRE: data in buffer is aligned && message_block.length() > 0 01836 01837 if (this->incoming_message_stack_.top (q_data) != -1 01838 && q_data->missing_data_ == TAO_MISSING_DATA_UNDEFINED) 01839 { 01840 // 01841 // MESSAGE CONSOLIDATION 01842 // 01843 01844 // Partial message on incoming_message_stack_ needs to be 01845 // consolidated. The message header could not be parsed so far 01846 // and therefor the message size is unknown yet. Consolidating 01847 // the message destroys the memory alignment of succeeding 01848 // messages sharing the buffer, for that reason consolidation 01849 // and stack based processing are mutial exclusive. 01850 if (this->messaging_object ()->consolidate_node (q_data, 01851 message_block) == -1) 01852 { 01853 if (TAO_debug_level > 0) 01854 { 01855 ACE_ERROR ((LM_ERROR, 01856 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 01857 ACE_TEXT ("error consolidating message from input buffer\n"), 01858 this->id () )); 01859 } 01860 return -1; 01861 } 01862 01863 // Complete message are to be enqueued and later processed 01864 if (q_data->missing_data_ == 0) 01865 { 01866 if (this->incoming_message_stack_.pop (q_data) == -1) 01867 { 01868 return -1; 01869 } 01870 01871 if (this->consolidate_enqueue_message (q_data) == -1) 01872 { 01873 return -1; 01874 } 01875 } 01876 01877 if (message_block.length () > 0 01878 && this->handle_input_parse_extra_messages (message_block) == -1) 01879 { 01880 return -1; 01881 } 01882 01883 // In any case try to process the enqueued messages 01884 if (this->process_queue_head (rh) == -1) 01885 { 01886 return -1; 01887 } 01888 } 01889 else 01890 { 01891 // 01892 // STACK PROCESSING (critical path) 01893 // 01894 01895 // Process the first message in buffer on stack 01896 01897 // (PRE: first message resides in aligned memory) Make a node of 01898 // the message-block.. 01899 01900 TAO_Queued_Data qd (&message_block, 01901 this->orb_core_->transport_message_buffer_allocator ()); 01902 01903 size_t mesg_length = 0; 01904 01905 if (this->messaging_object ()->parse_next_message (message_block, 01906 qd, 01907 mesg_length) == -1 01908 || (qd.missing_data_ == 0 01909 && mesg_length > message_block.length ()) ) 01910 { 01911 // extracting message failed 01912 return -1; 01913 } 01914 // POST: qd.missing_data_ == 0 --> mesg_length <= message_block.length() 01915 // This prevents seeking rd_ptr behind the wr_ptr 01916 01917 if (qd.missing_data_ != 0 || 01918 qd.more_fragments_ || 01919 qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) 01920 { 01921 if (qd.missing_data_ == 0) 01922 { 01923 // Dealing with a fragment 01924 TAO_Queued_Data *nqd = 01925 TAO_Queued_Data::duplicate (qd); 01926 01927 if (nqd == 0) 01928 { 01929 return -1; 01930 } 01931 01932 // mark the end of message in new buffer 01933 char* end_mark = nqd->msg_block_->rd_ptr () 01934 + mesg_length; 01935 nqd->msg_block_->wr_ptr (end_mark); 01936 01937 // move the read pointer forward in old buffer 01938 message_block.rd_ptr (mesg_length); 01939 01940 // enqueue the message 01941 if (this->consolidate_enqueue_message (nqd) == -1) 01942 { 01943 return -1; 01944 } 01945 01946 if (message_block.length () > 0 01947 && this->handle_input_parse_extra_messages (message_block) == -1) 01948 { 01949 return -1; 01950 } 01951 01952 // In any case try to process the enqueued messages 01953 if (this->process_queue_head (rh) == -1) 01954 { 01955 return -1; 01956 } 01957 } 01958 else if (qd.missing_data_ != TAO_MISSING_DATA_UNDEFINED) 01959 { 01960 // Incomplete message, must be the last one in buffer 01961 01962 if (qd.missing_data_ != TAO_MISSING_DATA_UNDEFINED && 01963 qd.missing_data_ > message_block.space ()) 01964 { 01965 // Re-Allocate correct size on heap 01966 if (ACE_CDR::grow (qd.msg_block_, 01967 message_block.length () 01968 + qd.missing_data_) == -1) 01969 { 01970 return -1; 01971 } 01972 } 01973 01974 TAO_Queued_Data *nqd = 01975 TAO_Queued_Data::duplicate (qd); 01976 01977 if (nqd == 0) 01978 { 01979 return -1; 01980 } 01981 01982 // move read-pointer to end of buffer 01983 message_block.rd_ptr (message_block.length()); 01984 01985 this->incoming_message_stack_.push (nqd); 01986 } 01987 } 01988 else 01989 { 01990 // 01991 // critical path 01992 // 01993 01994 // We cant process the message on stack right now. First we 01995 // have got to parse extra messages from message_block, 01996 // putting them into queue. When this is done we can return 01997 // to process this message, and notifying other threads to 01998 // process the messages in queue. 01999 02000 char * end_marker = message_block.rd_ptr () 02001 + mesg_length; 02002 02003 if (message_block.length () > mesg_length) 02004 { 02005 // There are more message in data stream to be parsed. 02006 // Safe the rd_ptr to restore later. 02007 char *rd_ptr_stack_mesg = message_block.rd_ptr (); 02008 02009 // Skip parsed message, jump to next message in buffer 02010 // PRE: mesg_length <= message_block.length () 02011 message_block.rd_ptr (mesg_length); 02012 02013 // Extract remaining messages and enqueue them for later 02014 // heap processing 02015 if (this->handle_input_parse_extra_messages (message_block) == -1) 02016 { 02017 return -1; 02018 } 02019 02020 // correct the end_marker 02021 end_marker = message_block.rd_ptr (); 02022 02023 // Restore rd_ptr 02024 message_block.rd_ptr (rd_ptr_stack_mesg); 02025 } 02026 02027 // The following if-else has been copied from 02028 // process_queue_head(). While process_queue_head() 02029 // processes message on heap, here we will process a message 02030 // on stack. 02031 02032 // Now that we have one message on stack to be processed, 02033 // check whether we have one more message in the queue... 02034 if (this->incoming_message_queue_.queue_length () > 0) 02035 { 02036 if (TAO_debug_level > 0) 02037 { 02038 ACE_DEBUG ((LM_DEBUG, 02039 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 02040 ACE_TEXT ("notify reactor\n"), 02041 this->id ())); 02042 02043 } 02044 02045 const int retval = this->notify_reactor (); 02046 02047 if (retval == 1) 02048 { 02049 // Let the class know that it doesn't need to resume the 02050 // handle.. 02051 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); 02052 } 02053 else if (retval < 0) 02054 return -1; 02055 } 02056 else 02057 { 02058 // As there are no further messages in queue just resume 02059 // the handle. Set the flag incase someone had reset the flag.. 02060 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); 02061 } 02062 02063 // PRE: incoming_message_queue is empty 02064 if (this->process_parsed_messages (&qd, 02065 rh) == -1) 02066 { 02067 return -1; 02068 } 02069 02070 // move the rd_ptr tp position of end_marker 02071 message_block.rd_ptr (end_marker); 02072 } 02073 } 02074 02075 // Now that all cases have been processed, there might be kept some data 02076 // in buffer that needs to be safed for next "handle_input" invocations. 02077 if (message_block.length () > 0) 02078 { 02079 if (this->partial_message_ == 0) 02080 { 02081 this->allocate_partial_message_block (); 02082 } 02083 02084 if (this->partial_message_ != 0 && 02085 this->partial_message_->copy (message_block.rd_ptr (), 02086 message_block.length ()) == 0) 02087 { 02088 message_block.rd_ptr (message_block.length ()); 02089 } 02090 else 02091 { 02092 return -1; 02093 } 02094 } 02095 02096 return 0; 02097 } |
|
Is invoked by handle_input_parse_data. Parses all messages remaining in message_block. Definition at line 1628 of file Transport.cpp. References consolidate_enqueue_message(), TAO_Pluggable_Messaging::extract_next_message(), incoming_message_stack_, ACE_Message_Block::length(), messaging_object(), TAO_Queued_Data::missing_data_, and TAO::Incoming_Message_Stack::push(). Referenced by handle_input_parse_data().
01629 { 01630 01631 // store buffer status of last extraction: -1 parse error, 0 01632 // incomplete message header in buffer, 1 complete messages header 01633 // parsed 01634 int buf_status = 0; 01635 01636 TAO_Queued_Data *q_data = 0; // init 01637 01638 // parse buffer until all messages have been extracted, consolidate 01639 // and enqueue complete messages, if the last message being parsed 01640 // has missin data, it is stays on top of incoming_message_stack. 01641 while (message_block.length () > 0 && 01642 (buf_status = this->messaging_object ()->extract_next_message 01643 (message_block, q_data)) != -1 && 01644 q_data != 0) // paranoid check 01645 { 01646 if (q_data->missing_data_ == 0) 01647 { 01648 if (this->consolidate_enqueue_message (q_data) == -1) 01649 { 01650 return -1; 01651 } 01652 } 01653 else // incomplete message read, probably the last message in buffer 01654 { 01655 // can not fail 01656 this->incoming_message_stack_.push (q_data); 01657 } 01658 01659 q_data = 0; // reset 01660 } // while 01661 01662 if (buf_status == -1) 01663 { 01664 return -1; 01665 } 01666 01667 return 0; 01668 } |
|
Callback method to reactively drain the outgoing data queue.
Definition at line 453 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, drain_queue(), LM_DEBUG, and TAO_debug_level. Referenced by TAO_Block_Flushing_Strategy::flush_message(), TAO_Block_Flushing_Strategy::flush_transport(), and TAO_Connection_Handler::handle_output_eh().
00454 { 00455 if (TAO_debug_level > 3) 00456 { 00457 ACE_DEBUG ((LM_DEBUG, 00458 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output\n"), 00459 this->id ())); 00460 } 00461 00462 // The flushing strategy (potentially via the Reactor) wants to send 00463 // more data, first check if there is a current message that needs 00464 // more sending... 00465 int const retval = this->drain_queue (); 00466 00467 if (TAO_debug_level > 3) 00468 { 00469 ACE_DEBUG ((LM_DEBUG, 00470 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ") 00471 ACE_TEXT ("drain_queue returns %d/%d\n"), 00472 this->id (), 00473 retval, errno)); 00474 } 00475 00476 // Any errors are returned directly to the Reactor 00477 return retval; 00478 } |
|
Definition at line 775 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, current_deadline_, flush_timer_pending(), TAO_ORB_Core::flushing_strategy(), LM_DEBUG, orb_core(), reset_flush_timer(), TAO_Flushing_Strategy::schedule_output(), and TAO_debug_level. Referenced by TAO_Transport_Timer::handle_timeout().
00777 { 00778 if (TAO_debug_level > 6) 00779 { 00780 ACE_DEBUG ((LM_DEBUG, 00781 ACE_TEXT ("TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, ") 00782 ACE_TEXT ("timer expired\n"), 00783 this->id ())); 00784 } 00785 00786 /// This is the only legal ACT in the current configuration.... 00787 if (act != &this->current_deadline_) 00788 { 00789 return -1; 00790 } 00791 00792 if (this->flush_timer_pending ()) 00793 { 00794 // The timer is always a oneshot timer, so mark is as not 00795 // pending. 00796 this->reset_flush_timer (); 00797 00798 TAO_Flushing_Strategy *flushing_strategy = 00799 this->orb_core ()->flushing_strategy (); 00800 (void) flushing_strategy->schedule_output (this); 00801 } 00802 00803 return 0; 00804 } |
|
Definition at line 92 of file Transport.inl.
00093 { 00094 this->id_ = id; 00095 } |
|
Set and Get the identifier for this transport instance.
If not set, this will return an integer representation of the Definition at line 86 of file Transport.inl. Referenced by TAO::Transport_Cache_Manager::bind_i(), TAO_Connection_Handler::close_connection_eh(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Transport_Cache_Manager::find_transport(), TAO_Connection_Handler::handle_input_internal(), TAO_Connector::parallel_connect(), TAO_GIOP_Message_Lite::parse_reply(), TAO_GIOP_Message_Base::process_reply_message(), TAO::Transport_Cache_Manager::purge(), TAO_GIOP_Utils::read_buffer(), TAO_GIOP_Message_Base::send_close_connection(), TAO_GIOP_Message_Lite::send_error(), TAO_GIOP_Message_Base::send_error(), TAO_GIOP_Message_Lite::send_reply_exception(), TAO_Connector::wait_for_connection_completion(), and TAO_Leader_Follower::wait_for_event().
00087 { 00088 return this->id_; 00089 } |
|
Request is sent and the reply is received. Idle the transport now. Definition at line 239 of file Transport.cpp. References TAO_Transport_Mux_Strategy::idle_after_reply(), and tms().
00240 { 00241 return this->tms ()->idle_after_reply (); 00242 } |
|
Request has been just sent, but the reply is not received. Idle the transport now. Definition at line 233 of file Transport.cpp. References TAO_Transport_Mux_Strategy::idle_after_send(), and tms().
00234 { 00235 return this->tms ()->idle_after_send (); 00236 } |
|
Is this transport really connected.
Definition at line 164 of file Transport.inl. References ACE_GUARD_RETURN, and is_connected_. Referenced by TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Synch_Oneway_Invocation::remote_oneway(), and TAO_Connector::wait_for_connection_completion().
00165 { 00166 ACE_GUARD_RETURN (ACE_Lock, 00167 ace_mon, 00168 *this->handler_lock_, 00169 false); 00170 00171 return this->is_connected_; 00172 } |
|
Return true if the tcs has been set.
Definition at line 152 of file Transport.inl. References tcs_set_.
00153 { 00154 return tcs_set_; 00155 } |
|
Cache management.
Definition at line 429 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, LM_DEBUG, TAO::Transport_Cache_Manager::make_idle(), TAO_debug_level, and transport_cache_manager(). Referenced by TAO_Exclusive_TMS::idle_after_reply(), TAO_Muxed_TMS::idle_after_send(), TAO_IIOP_Connection_Handler::process_listen_point_list(), and TAO::Profile_Transport_Resolver::~Profile_Transport_Resolver().
00430 { 00431 if (TAO_debug_level > 3) 00432 { 00433 ACE_DEBUG ((LM_DEBUG, 00434 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"), 00435 this->id ())); 00436 } 00437 00438 return this->transport_cache_manager ().make_idle (this->cache_map_entry_); 00439 } |
|
Initialising the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects. Implemented in TAO_IIOP_Transport. |
|
Return the messaging object that is used to format the data that needs to be sent. Implemented in TAO_IIOP_Transport. Referenced by allocate_partial_message_block(), consolidate_enqueue_message(), consolidate_process_message(), format_queue_message(), TAO_On_Demand_Fragmentation_Strategy::fragment(), generate_locate_request(), generate_request_header(), handle_input_parse_data(), handle_input_parse_extra_messages(), out_stream(), process_parsed_messages(), and send_connection_closed_notifications_i(). |
|
Definition at line 2277 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, event_handler_i(), TAO_Wait_Strategy::is_registered(), LM_DEBUG, ACE_Reactor::notify(), orb_core(), TAO_ORB_Core::reactor(), and TAO_debug_level. Referenced by handle_input_parse_data(), and process_queue_head().
02278 { 02279 if (!this->ws_->is_registered ()) 02280 { 02281 return 0; 02282 } 02283 02284 ACE_Event_Handler *eh = this->event_handler_i (); 02285 02286 // Get the reactor associated with the event handler 02287 ACE_Reactor *reactor = this->orb_core ()->reactor (); 02288 02289 if (TAO_debug_level > 0) 02290 { 02291 ACE_DEBUG ((LM_DEBUG, 02292 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ") 02293 ACE_TEXT ("notify to Reactor\n"), 02294 this->id ())); 02295 } 02296 02297 02298 // Send a notification to the reactor... 02299 const int retval = reactor->notify (eh, 02300 ACE_Event_Handler::READ_MASK); 02301 02302 if (retval < 0 && TAO_debug_level > 2) 02303 { 02304 // @@todo: need to think about what is the action that 02305 // we can take when we get here. 02306 ACE_DEBUG ((LM_DEBUG, 02307 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ") 02308 ACE_TEXT ("notify to the reactor failed..\n"), 02309 this->id ())); 02310 } 02311 02312 return 1; 02313 } |
|
Definition at line 51 of file Transport.inl. References opening_connection_role_.
00052 { 00053 this->opening_connection_role_ = role; 00054 } |
|
Methods dealing with the role of the connection, e.g., CLIENT or SERVER. See CORBA 2.6 Specification, Section 15.5.1 for origin of definitions. Definition at line 45 of file Transport.inl. References opening_connection_role_. Referenced by TAO::Wait_On_LF_No_Upcall::can_process_upcalls(), TAO_Connector::connect(), and TAO_IIOP_Connection_Handler::open().
00046 { 00047 return this->opening_connection_role_; 00048 } |
|
|
|
|
Accessor for the output CDR stream.
Definition at line 2364 of file Transport.cpp. References messaging_object(), and TAO_Pluggable_Messaging::out_stream(). Referenced by TAO::LocateRequest_Invocation::invoke(), and TAO::Synch_Oneway_Invocation::remote_oneway().
02365 { 02366 return this->messaging_object ()->out_stream (); 02367 } |
|
Hooks that can be overridden in concrete transports. These hooks are invoked just after connection establishment (or after a connection is fetched from cache). The return value signifies whether the invoker should proceed with post connection establishment activities. Protocols like SSLIOP need this to verify whether connections already established have valid certificates. There are no pre_connect_hooks () since the transport doesn't exist before a connection establishment. :-)
Definition at line 287 of file Transport.cpp. Referenced by TAO_Connector::connect().
00288 { 00289 return true; 00290 } |
|
Perform all the actions when this transport get opened.
Definition at line 2370 of file Transport.cpp. References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT, close_connection(), TAO_ORB_Core::flushing_strategy(), is_connected_, LM_ERROR, orb_core(), purge_entry(), queue_is_empty_i(), TAO_Wait_Strategy::register_handler(), TAO_Flushing_Strategy::schedule_output(), TAO_debug_level, and wait_strategy(). Referenced by TAO_IIOP_Connection_Handler::open().
02371 { 02372 this->id_ = id; 02373 02374 { 02375 ACE_GUARD_RETURN (ACE_Lock, 02376 ace_mon, 02377 *this->handler_lock_, 02378 false); 02379 this->is_connected_ = true; 02380 } 02381 02382 // When we have data in our outgoing queue schedule ourselves 02383 // for output 02384 if (this->queue_is_empty_i ()) 02385 return true; 02386 02387 // If the wait strategy wants us to be registered with the reactor 02388 // then we do so. If registeration is required and it succeeds, 02389 // #REFCOUNT# becomes two. 02390 if (this->wait_strategy ()->register_handler () == 0) 02391 { 02392 TAO_Flushing_Strategy *flushing_strategy = 02393 this->orb_core ()->flushing_strategy (); 02394 (void) flushing_strategy->schedule_output (this); 02395 } 02396 else 02397 { 02398 // Registration failures. 02399 02400 // Purge from the connection cache, if we are not in the cache, this 02401 // just does nothing. 02402 (void) this->purge_entry (); 02403 02404 // Close the handler. 02405 (void) this->close_connection (); 02406 02407 if (TAO_debug_level > 0) 02408 ACE_ERROR ((LM_ERROR, 02409 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_connect , ") 02410 ACE_TEXT ("could not register the transport ") 02411 ACE_TEXT ("in the reactor.\n"), 02412 this->id ())); 02413 02414 return false; 02415 } 02416 02417 return true; 02418 } |
|
Process the message by sending it to the higher layers of the ORB. Definition at line 2101 of file Transport.cpp. References ACE_DEBUG, ACE_ERROR, ACE_TEXT, TAO_Pluggable_Messaging::discard_fragmented_message(), LM_DEBUG, LM_ERROR, messaging_object(), TAO_Queued_Data::missing_data_, TAO_Queued_Data::msg_type_, TAO_Pluggable_Messaging::process_reply_message(), TAO_Pluggable_Messaging::process_request_message(), TAO_Resume_Handle::resume_handle(), TAO_debug_level, TAO_PLUGGABLE_MESSAGE_CANCELREQUEST, TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION, TAO_PLUGGABLE_MESSAGE_LOCATEREPLY, TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST, TAO_PLUGGABLE_MESSAGE_MESSAGERROR, TAO_PLUGGABLE_MESSAGE_REPLY, TAO_PLUGGABLE_MESSAGE_REQUEST, and TAO_Pluggable_Message_Type. Referenced by consolidate_process_message(), handle_input_parse_data(), and process_queue_head().
02103 { 02104 if (TAO_debug_level > 7) 02105 { 02106 ACE_DEBUG ((LM_DEBUG, 02107 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02108 ACE_TEXT ("entering (missing data == %d)\n"), 02109 this->id(), qd->missing_data_)); 02110 } 02111 02112 // Get the <message_type> that we have received 02113 const TAO_Pluggable_Message_Type t = qd->msg_type_; 02114 02115 // int result = 0; 02116 02117 if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION) 02118 { 02119 if (TAO_debug_level > 0) 02120 ACE_DEBUG ((LM_DEBUG, 02121 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02122 ACE_TEXT ("received CloseConnection message - %m\n"), 02123 this->id())); 02124 02125 // Return a "-1" so that the next stage can take care of 02126 // closing connection and the necessary memory management. 02127 return -1; 02128 } 02129 else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST || 02130 t == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST) 02131 { 02132 // Let us resume the handle before we go ahead to process the 02133 // request. This will open up the handle for other threads. 02134 rh.resume_handle (); 02135 02136 if (this->messaging_object ()->process_request_message ( 02137 this, 02138 qd) == -1) 02139 { 02140 // Return a "-1" so that the next stage can take care of 02141 // closing connection and the necessary memory management. 02142 return -1; 02143 } 02144 } 02145 else if (t == TAO_PLUGGABLE_MESSAGE_REPLY || 02146 t == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY) 02147 { 02148 rh.resume_handle (); 02149 02150 TAO_Pluggable_Reply_Params params (this); 02151 02152 if (this->messaging_object ()->process_reply_message (params, 02153 qd) == -1) 02154 { 02155 if (TAO_debug_level > 0) 02156 ACE_DEBUG ((LM_DEBUG, 02157 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02158 ACE_TEXT ("error in process_reply_message - %m\n"), 02159 this->id ())); 02160 02161 return -1; 02162 } 02163 02164 } 02165 else if (t == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST) 02166 { 02167 // The associated request might be incomplpete residing 02168 // fragmented in messaging object. We must make sure the 02169 // resources allocated by fragments are released. 02170 02171 if (this->messaging_object ()->discard_fragmented_message (qd) == -1) 02172 { 02173 if (TAO_debug_level > 0) 02174 { 02175 ACE_ERROR ((LM_ERROR, 02176 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02177 ACE_TEXT ("error processing CancelRequest\n"), 02178 this->id ())); 02179 } 02180 } 02181 02182 // We are not able to cancel requests being processed already; 02183 // this is declared as optional feature by CORBA, and TAO does 02184 // not support this currently. 02185 02186 // Just continue processing, CancelRequest does not mean to cut 02187 // off the connection. 02188 } 02189 else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR) 02190 { 02191 if (TAO_debug_level > 0) 02192 { 02193 ACE_ERROR ((LM_ERROR, 02194 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02195 ACE_TEXT ("received MessageError, closing connection\n"), 02196 this->id ())); 02197 } 02198 return -1; 02199 } 02200 02201 // If not, just return back.. 02202 return 0; 02203 } |
|
Definition at line 2206 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_Incoming_Message_Queue::dequeue_head(), incoming_message_queue_, LM_DEBUG, notify_reactor(), process_parsed_messages(), TAO_Incoming_Message_Queue::queue_length(), TAO_Queued_Data::release(), TAO_Resume_Handle::set_flag(), and TAO_debug_level. Referenced by handle_input(), and handle_input_parse_data().
02207 { 02208 if (TAO_debug_level > 3) 02209 { 02210 ACE_DEBUG ((LM_DEBUG, 02211 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"), 02212 this->id (), this->incoming_message_queue_.queue_length () )); 02213 } 02214 02215 // See if message in queue ... 02216 if (this->incoming_message_queue_.queue_length () > 0) 02217 { 02218 // Get the message on the head of the queue.. 02219 TAO_Queued_Data *qd = 02220 this->incoming_message_queue_.dequeue_head (); 02221 02222 if (TAO_debug_level > 3) 02223 { 02224 ACE_DEBUG ((LM_DEBUG, 02225 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ") 02226 ACE_TEXT ("the size of the queue is [%d]\n"), 02227 this->id (), 02228 this->incoming_message_queue_.queue_length())); 02229 } 02230 // Now that we have pulled out out one message out of the queue, 02231 // check whether we have one more message in the queue... 02232 if (this->incoming_message_queue_.queue_length () > 0) 02233 { 02234 if (TAO_debug_level > 0) 02235 { 02236 ACE_DEBUG ((LM_DEBUG, 02237 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ") 02238 ACE_TEXT ("notify reactor\n"), 02239 this->id ())); 02240 02241 } 02242 02243 const int retval = this->notify_reactor (); 02244 02245 if (retval == 1) 02246 { 02247 // Let the class know that it doesn't need to resume the 02248 // handle.. 02249 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); 02250 } 02251 else if (retval < 0) 02252 return -1; 02253 } 02254 else 02255 { 02256 // As we are ready to process the last message just resume 02257 // the handle. Set the flag incase someone had reset the flag.. 02258 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); 02259 } 02260 02261 // Process the message... 02262 if (this->process_parsed_messages (qd, rh) == -1) 02263 { 02264 return -1; 02265 } 02266 02267 // Delete the Queued_Data.. 02268 TAO_Queued_Data::release (qd); 02269 02270 return 0; 02271 } 02272 02273 return 1; 02274 } |
|
Called by the cache when the ORB is shuting down.
Definition at line 219 of file Transport.cpp. References add_reference(), TAO::Connection_Handler_Set, ACE_Unbounded_Set< T >::insert(), TAO_Wait_Strategy::non_blocking(), and opening_connection_role_.
00220 { 00221 if (this->ws_->non_blocking () || 00222 this->opening_connection_role_ == TAO::TAO_SERVER_ROLE) 00223 return false; 00224 00225 (void) this->add_reference (); 00226 00227 h.insert (this->connection_handler_i ()); 00228 00229 return true; 00230 } |
|
Added event handler to the handlers set. Called by the cache when the cache is closing.
Definition at line 211 of file Transport.cpp. References add_reference(), TAO::Connection_Handler_Set, and ACE_Unbounded_Set< T >::insert().
00212 { 00213 (void) this->add_reference (); 00214 00215 handlers.insert (this->connection_handler_i ()); 00216 } |
|
Cache management.
Definition at line 423 of file Transport.cpp. References TAO::Transport_Cache_Manager::purge_entry(), and transport_cache_manager(). Referenced by TAO_Connection_Handler::close_connection_eh(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), post_open(), recache_transport(), and ~TAO_Transport().
00424 { 00425 return this->transport_cache_manager ().purge_entry (this->cache_map_entry_); 00426 } |
|
Definition at line 76 of file Transport.inl. References purging_order_.
00077 { 00078 // This should only be called by the Transport Cache Manager when 00079 // it is holding it's lock. 00080 // The transport should still be here since the cache manager still 00081 // has a reference to it. 00082 this->purging_order_ = value; 00083 } |
|
Get and Set the purging order. The purging strategy uses the set version to set the purging order. Definition at line 70 of file Transport.inl. References purging_order_. Referenced by TAO_LRU_Connection_Purging_Strategy::update_item().
00071 { 00072 return this->purging_order_; 00073 } |
|
Check if there are messages pending in the queue.
Definition at line 98 of file Transport.inl. References ACE_GUARD_RETURN, and queue_is_empty_i(). Referenced by TAO_Reactive_Flushing_Strategy::flush_transport(), TAO_Leader_Follower_Flushing_Strategy::flush_transport(), and TAO_Block_Flushing_Strategy::flush_transport().
00099 { 00100 ACE_GUARD_RETURN (ACE_Lock, 00101 ace_mon, 00102 *this->handler_lock_, 00103 -1); 00104 return this->queue_is_empty_i (); 00105 } |
|
Check if there are messages pending in the queue. This version assumes that the lock is already held. Use with care!
Definition at line 715 of file Transport.cpp. Referenced by post_open(), queue_is_empty(), and TAO_Block_Flushing_Strategy::schedule_output().
00716 { 00717 return (this->head_ == 0); 00718 } |
|
Queue a message for message_block.
Definition at line 1308 of file Transport.cpp. References ACE_NEW_RETURN, and TAO_Queued_Message::push_back(). Referenced by format_queue_message(), and send_asynchronous_message_i().
01309 { 01310 TAO_Queued_Message *queued_message = 0; 01311 ACE_NEW_RETURN (queued_message, 01312 TAO_Asynch_Queued_Message (message_block, 01313 this->orb_core_, 01314 0, 01315 1), 01316 -1); 01317 queued_message->push_back (this->head_, this->tail_); 01318 01319 return 0; 01320 } |
|
Recache ourselves in the cache.
Definition at line 412 of file Transport.cpp. References TAO::Transport_Cache_Manager::cache_transport(), purge_entry(), and transport_cache_manager(). Referenced by TAO_IIOP_Connection_Handler::process_listen_point_list().
00413 { 00414 // First purge our entry 00415 this->purge_entry (); 00416 00417 // Then add ourselves to the cache 00418 return this->transport_cache_manager ().cache_transport (desc, 00419 this); 00420 } |
|
Read len bytes from into buf. This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.
Implemented in TAO_IIOP_Transport. Referenced by handle_input_missing_data(), handle_input_parse_data(), TAO_GIOP_Utils::read_buffer(), and TAO_GIOP_Utils::read_bytes_input(). |
|
Accessor to recv_buffer_size_.
Definition at line 181 of file Transport.inl.
00182 { 00183 return this->recv_buffer_size_; 00184 } |
|
Register the handler with the reactor. Register the handler with the reactor. This method is used by the Wait_On_Reactor strategy. The transport must register its event handler with the ORB's Reactor.
Definition at line 299 of file Transport.cpp. References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT, event_handler_i(), TAO_Wait_Strategy::is_registered(), LM_DEBUG, ACE_Event_Handler::reactor(), TAO_ORB_Core::reactor(), ACE_Reactor::register_handler(), and TAO_debug_level. Referenced by TAO_Wait_On_Reactor::register_handler(), TAO_Wait_On_Leader_Follower::register_handler(), and TAO_Wait_On_Leader_Follower::sending_request().
00300 { 00301 if (TAO_debug_level > 4) 00302 { 00303 ACE_DEBUG ((LM_DEBUG, 00304 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"), 00305 this->id ())); 00306 } 00307 00308 ACE_Reactor * const r = this->orb_core_->reactor (); 00309 00310 // @@note: This should be okay since the register handler call will 00311 // not make a nested call into the transport. 00312 ACE_GUARD_RETURN (ACE_Lock, 00313 ace_mon, 00314 *this->handler_lock_, 00315 false); 00316 00317 if (r == this->event_handler_i ()->reactor ()) 00318 { 00319 return 0; 00320 } 00321 00322 // Set the flag in the Connection Handler and in the Wait Strategy 00323 // @@Maybe we should set these flags after registering with the 00324 // reactor. What if the registration fails??? 00325 this->ws_->is_registered (true); 00326 00327 // Register the handler with the reactor 00328 return r->register_handler (this->event_handler_i (), 00329 ACE_Event_Handler::READ_MASK); 00330 } |
|
Initialising the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects. Definition at line 2358 of file Transport.cpp. References event_handler_i(), and ACE_Event_Handler::remove_reference(). Referenced by TAO_Connection_Handler::close_handler(), TAO::Cache_IntId::operator=(), TAO::Transport_Cache_Manager::purge(), TAO_Asynch_Reply_Dispatcher_Base::transport(), TAO::Cache_IntId::~Cache_IntId(), TAO::Profile_Transport_Resolver::~Profile_Transport_Resolver(), TAO_Asynch_Reply_Dispatcher_Base::~TAO_Asynch_Reply_Dispatcher_Base(), and TAO_Thread_Per_Connection_Handler::~TAO_Thread_Per_Connection_Handler().
02359 { 02360 return this->event_handler_i ()->remove_reference (); 02361 } |
|
Print out error messages if the event handler is not valid.
Definition at line 1106 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, ACE_TEXT_CHAR_TO_TCHAR, LM_DEBUG, and TAO_debug_level.
01107 { 01108 if (TAO_debug_level > 0) 01109 { 01110 ACE_DEBUG ((LM_DEBUG, 01111 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler") 01112 ACE_TEXT ("(%s) no longer associated with handler [tag=%d]\n"), 01113 this->id (), ACE_TEXT_CHAR_TO_TCHAR (caller), this->tag_)); 01114 } 01115 } |
|
The flush timer expired or was explicitly cancelled, mark it as not pending Definition at line 114 of file Transport.inl. References current_deadline_, and flush_timer_id_. Referenced by drain_queue_i(), and handle_timeout().
00115 { 00116 this->flush_timer_id_ = -1; 00117 this->current_deadline_ = ACE_Time_Value::zero; 00118 } |
|
Schedule handle_output() callbacks.
Definition at line 722 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, event_handler_i(), ACE_Reactor::find_handler(), ACE_Event_Handler::get_handle(), LM_DEBUG, ACE_Event_Handler::reactor(), ACE_Event_Handler::remove_reference(), ACE_Reactor::schedule_wakeup(), and TAO_debug_level. Referenced by TAO_Reactive_Flushing_Strategy::schedule_output(), and TAO_Leader_Follower_Flushing_Strategy::schedule_output().
00723 { 00724 ACE_Event_Handler *eh = this->event_handler_i (); 00725 ACE_Reactor *reactor = eh->reactor (); 00726 00727 // Check to see if our event handler is still registered with the 00728 // reactor. It's possible for another thread to have run close_connection() 00729 // since we last used the event handler. 00730 ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ()); 00731 if (found != eh) 00732 { 00733 if(TAO_debug_level > 3) 00734 { 00735 ACE_DEBUG ((LM_DEBUG, 00736 "TAO (%P|%t) - Transport[%d]::schedule_output_i " 00737 "event handler not found in reactor, returning -1\n", 00738 this->id ())); 00739 } 00740 if (found) 00741 { 00742 found->remove_reference (); 00743 } 00744 return -1; 00745 } 00746 found->remove_reference (); 00747 00748 if (TAO_debug_level > 3) 00749 { 00750 ACE_DEBUG ((LM_DEBUG, 00751 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"), 00752 this->id ())); 00753 } 00754 00755 return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK); 00756 } |
|
Write the complete Message_Block chain to the connection. This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently. Often the implementation simply forwards the arguments to the underlying ACE_Svc_Handler class. Using the code factored out into ACE. Be careful with protocols that perform non-trivial transformations of the data, such as SSLIOP or protocols that compress the stream.
ENOENT .
Implemented in TAO_IIOP_Transport. Referenced by drain_queue_helper(). |
|
Send an asynchronous message, i.e. do not block until the message is on the wire Definition at line 1161 of file Transport.cpp. References ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT, check_buffering_constraints_i(), ACE_Message_Block::cont(), ETIME, EWOULDBLOCK, TAO_Flushing_Strategy::flush_transport(), TAO_ORB_Core::flushing_strategy(), ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO::Transport_Queueing_Strategy::must_queue(), orb_core(), queue_message_i(), TAO_Flushing_Strategy::schedule_output(), send_message_block_chain_i(), ssize_t, TAO_debug_level, ACE_Message_Block::total_length(), and TAO_Stub::transport_queueing_strategy(). Referenced by send_message_shared_i().
01164 { 01165 // Let's figure out if the message should be queued without trying 01166 // to send first: 01167 bool try_sending_first = true; 01168 01169 const bool queue_empty = (this->head_ == 0); 01170 01171 if (!queue_empty) 01172 { 01173 try_sending_first = false; 01174 } 01175 else if (stub->transport_queueing_strategy ().must_queue (queue_empty)) 01176 { 01177 try_sending_first = false; 01178 } 01179 01180 if (try_sending_first) 01181 { 01182 ssize_t n = 0; 01183 size_t byte_count = 0; 01184 // ... in this case we must try to send the message first ... 01185 01186 const size_t total_length = message_block->total_length (); 01187 01188 if (TAO_debug_level > 6) 01189 { 01190 ACE_DEBUG ((LM_DEBUG, 01191 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") 01192 ACE_TEXT ("trying to send the message (ml = %d)\n"), 01193 this->id (), total_length)); 01194 } 01195 01196 // @@ I don't think we want to hold the mutex here, however if 01197 // we release it we need to recheck the status of the transport 01198 // after we return... once I understand the final form for this 01199 // code I will re-visit this decision 01200 n = this->send_message_block_chain_i (message_block, 01201 byte_count, 01202 max_wait_time); 01203 if (n == -1) 01204 { 01205 // ... if this is just an EWOULDBLOCK we must schedule the 01206 // message for later, if it is ETIME we still have to send 01207 // the complete message, because cutting off the message at 01208 // this point will destroy the synchronization with the 01209 // server ... 01210 if (errno != EWOULDBLOCK && errno != ETIME) 01211 { 01212 if (TAO_debug_level > 0) 01213 { 01214 ACE_ERROR ((LM_ERROR, 01215 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") 01216 ACE_TEXT ("fatal error in ") 01217 ACE_TEXT ("send_message_block_chain_i - %m\n"), 01218 this->id ())); 01219 } 01220 return -1; 01221 } 01222 } 01223 01224 // ... let's figure out if the complete message was sent ... 01225 if (total_length == byte_count) 01226 { 01227 // Done, just return. Notice that there are no allocations 01228 // or copies up to this point (though some fancy calling 01229 // back and forth). 01230 // This is the common case for the critical path, it should 01231 // be fast. 01232 return 0; 01233 } 01234 01235 if (TAO_debug_level > 6) 01236 { 01237 ACE_DEBUG ((LM_DEBUG, 01238 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") 01239 ACE_TEXT ("partial send %d / %d bytes\n"), 01240 this->id (), byte_count, total_length)); 01241 } 01242 01243 // ... part of the data was sent, need to figure out what piece 01244 // of the message block chain must be queued ... 01245 while (message_block != 0 && message_block->length () == 0) 01246 { 01247 message_block = message_block->cont (); 01248 } 01249 01250 // ... at least some portion of the message block chain should 01251 // remain ... 01252 } 01253 01254 // ... either the message must be queued or we need to queue it 01255 // because it was not completely sent out ... 01256 01257 if (TAO_debug_level > 6) 01258 { 01259 ACE_DEBUG ((LM_DEBUG, 01260 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") 01261 ACE_TEXT ("message is queued\n"), 01262 this->id ())); 01263 } 01264 01265 if (this->queue_message_i(message_block) == -1) 01266 { 01267 if (TAO_debug_level > 0) 01268 ACE_DEBUG ((LM_DEBUG, 01269 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") 01270 ACE_TEXT ("cannot queue message for ") 01271 ACE_TEXT (" - %m\n"), 01272 this->id ())); 01273 return -1; 01274 } 01275 01276 // ... if the queue is full we need to activate the output on the 01277 // queue ... 01278 bool must_flush = false; 01279 const bool constraints_reached = 01280 this->check_buffering_constraints_i (stub, 01281 must_flush); 01282 01283 // ... but we also want to activate it if the message was partially 01284 // sent.... Plus, when we use the blocking flushing strategy the 01285 // queue is flushed as a side-effect of 'schedule_output()' 01286 01287 TAO_Flushing_Strategy *flushing_strategy = 01288 this->orb_core ()->flushing_strategy (); 01289 01290 if (constraints_reached || try_sending_first) 01291 { 01292 (void) flushing_strategy->schedule_output (this); 01293 } 01294 01295 if (must_flush) 01296 { 01297 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; 01298 TAO_REVERSE_LOCK reverse (*this->handler_lock_); 01299 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); 01300 01301 (void) flushing_strategy->flush_transport (this); 01302 } 01303 01304 return 0; 01305 } |
|
Notify all the components inside a Transport when the underlying connection is closed. Definition at line 1118 of file Transport.cpp. References ACE_GUARD, TAO_Transport_Mux_Strategy::connection_closed(), send_connection_closed_notifications_i(), and tms(). Referenced by TAO_Connection_Handler::close_connection_eh().
01119 { 01120 { 01121 ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_)); 01122 01123 this->send_connection_closed_notifications_i (); 01124 } 01125 01126 this->tms ()->connection_closed (); 01127 } |
|
Assume the lock is held.
Definition at line 1130 of file Transport.cpp. References cleanup_queue_i(), messaging_object(), and TAO_Pluggable_Messaging::reset(). Referenced by send_connection_closed_notifications().
01131 { 01132 this->cleanup_queue_i (); 01133 01134 this->messaging_object ()->reset (); 01135 } |
|
Once the ORB is prepared to receive a reply (see send_request() above), and all the arguments have been marshaled the CDR stream must be 'formatted', i.e. the message_size field in the GIOP header can finally be set to the proper value. Implemented in TAO_IIOP_Transport. Referenced by TAO_On_Demand_Fragmentation_Strategy::fragment(), TAO_GIOP_Message_Lite::make_send_locate_reply(), TAO_GIOP_Message_Base::make_send_locate_reply(), TAO_GIOP_Message_Lite::process_request(), TAO_GIOP_Message_Base::process_request(), TAO_ServerRequest::send_cached_reply(), TAO_ServerRequest::send_no_exception_reply(), TAO_GIOP_Message_Lite::send_reply_exception(), TAO_GIOP_Message_Base::send_reply_exception(), TAO_ServerRequest::tao_send_reply(), and TAO_ServerRequest::tao_send_reply_exception(). |
|
Send a message block chain,.
Definition at line 490 of file Transport.cpp. References ACE_GUARD_RETURN, and send_message_block_chain_i(). Referenced by TAO_GIOP_Message_Base::send_close_connection(), TAO_GIOP_Message_Lite::send_error(), and TAO_GIOP_Message_Base::send_error().
00493 { 00494 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); 00495 00496 return this->send_message_block_chain_i (mb, 00497 bytes_transferred, 00498 max_wait_time); 00499 } |
|
Send a message block chain, assuming the lock is held.
Definition at line 502 of file Transport.cpp. References drain_queue_i(), TAO_Synch_Queued_Message::message_length(), TAO_Queued_Message::push_back(), TAO_Queued_Message::remove_from_list(), and ACE_Message_Block::total_length(). Referenced by send_asynchronous_message_i(), and send_message_block_chain().
00505 { 00506 size_t const total_length = mb->total_length (); 00507 00508 // We are going to block, so there is no need to clone 00509 // the message block. 00510 TAO_Synch_Queued_Message synch_message (mb, 00511 this->orb_core_); 00512 00513 synch_message.push_back (this->head_, this->tail_); 00514 00515 int const n = this->drain_queue_i (); 00516 00517 if (n == -1) 00518 { 00519 synch_message.remove_from_list (this->head_, this->tail_); 00520 return -1; // Error while sending... 00521 } 00522 else if (n == 1) 00523 { 00524 bytes_transferred = total_length; 00525 return 1; // Empty queue, message was sent.. 00526 } 00527 00528 // Remove the temporary message from the queue... 00529 synch_message.remove_from_list (this->head_, this->tail_); 00530 00531 bytes_transferred = 00532 total_length - synch_message.message_length (); 00533 00534 return 0; 00535 } |
|
Sent the contents of message_block.
Reimplemented in TAO_IIOP_Transport. Definition at line 261 of file Transport.cpp. References ACE_GUARD_RETURN, close_connection(), and send_message_shared_i().
00265 { 00266 int result = 0; 00267 00268 { 00269 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); 00270 00271 result = 00272 this->send_message_shared_i (stub, message_semantics, 00273 message_block, max_wait_time); 00274 } 00275 00276 if (result == -1) 00277 { 00278 this->close_connection (); 00279 } 00280 00281 return result; 00282 } |
|
Implement send_message_shared() assuming the handler_lock_ is held. Definition at line 1138 of file Transport.cpp. References send_asynchronous_message_i(), send_reply_message_i(), send_synchronous_message_i(), TAO_ONEWAY_REQUEST, TAO_REPLY, and TAO_TWOWAY_REQUEST. Referenced by send_message_shared(), and TAO_IIOP_Transport::send_message_shared().
01142 { 01143 switch (message_semantics) 01144 { 01145 case TAO_Transport::TAO_TWOWAY_REQUEST: 01146 return this->send_synchronous_message_i (message_block, 01147 max_wait_time); 01148 case TAO_Transport::TAO_REPLY: 01149 return this->send_reply_message_i (message_block, 01150 max_wait_time); 01151 case TAO_Transport::TAO_ONEWAY_REQUEST: 01152 return this->send_asynchronous_message_i (stub, 01153 message_block, 01154 max_wait_time); 01155 } 01156 01157 return -1; 01158 } |
|
Send a reply message, i.e. do not block until the message is on the wire, but just return after adding them to the queue. Definition at line 630 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_Synch_Queued_Message::clone(), TAO_Queued_Message::destroy(), TAO_ORB_Core::flushing_strategy(), LM_DEBUG, orb_core(), TAO_Queued_Message::push_back(), TAO_Queued_Message::remove_from_list(), TAO_Flushing_Strategy::schedule_output(), send_synch_message_helper_i(), and TAO_debug_level. Referenced by send_message_shared_i().
00632 { 00633 // Dont clone now.. We could be sent in one shot! 00634 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); 00635 00636 synch_message.push_back (this->head_, 00637 this->tail_); 00638 00639 int const n = 00640 this->send_synch_message_helper_i (synch_message, 00641 max_wait_time); 00642 00643 if (n == -1 || n == 1) 00644 { 00645 return n; 00646 } 00647 00648 if (TAO_debug_level > 3) 00649 { 00650 ACE_DEBUG ((LM_DEBUG, 00651 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ") 00652 ACE_TEXT ("preparing to add to queue before leaving \n"), 00653 this->id ())); 00654 } 00655 00656 // Till this point we shouldn't have any copying and that is the 00657 // point anyway. Now, remove the node from the list 00658 synch_message.remove_from_list (this->head_, 00659 this->tail_); 00660 00661 // Clone the node that we have. 00662 TAO_Queued_Message *msg = 00663 synch_message.clone (this->orb_core_->transport_message_buffer_allocator ()); 00664 00665 // Stick it in the queue 00666 msg->push_back (this->head_, 00667 this->tail_); 00668 00669 TAO_Flushing_Strategy *flushing_strategy = 00670 this->orb_core ()->flushing_strategy (); 00671 00672 int result = flushing_strategy->schedule_output (this); 00673 00674 if (result == -1) 00675 { 00676 if (TAO_debug_level > 5) 00677 { 00678 ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_" 00679 "message_i dequeuing msg due to schedule_output " 00680 "failure\n", this->id ())); 00681 } 00682 msg->remove_from_list (this->head_, this->tail_); 00683 msg->destroy (); 00684 } 00685 00686 return 1; 00687 } |
|
Preparing the ORB to receive the reply only once the request is completely sent opens the system to some subtle race conditions: suppose the ORB is running in a multi-threaded configuration, thread A makes a request while thread B is using the Reactor to process all incoming requests. Thread A could be implemented as follows: 1) send the request 2) setup the ORB to receive the reply 3) wait for the request but in this case thread B may receive the reply between step (1) and (2), and drop it as an invalid or unexpected message. Consequently the correct implementation is: 1) setup the ORB to receive the reply 2) send the request 3) wait for the reply The following method encapsulates this idiom.
Implemented in TAO_IIOP_Transport. Referenced by TAO::Remote_Invocation::send_message(). |
|
A helper method used by send_synchronous_message_i() and send_reply_message_i(). Reusable code that could be used by both the methods. Definition at line 690 of file Transport.cpp. References TAO_Synch_Queued_Message::all_data_sent(), drain_queue_i(), and TAO_Queued_Message::remove_from_list(). Referenced by send_reply_message_i(), and send_synchronous_message_i().
00692 { 00693 // @@todo: Need to send timeouts for writing.. 00694 int const n = this->drain_queue_i (); 00695 00696 if (n == -1) 00697 { 00698 synch_message.remove_from_list (this->head_, this->tail_); 00699 return -1; // Error while sending... 00700 } 00701 else if (n == 1) 00702 { 00703 return 1; // Empty queue, message was sent.. 00704 } 00705 00706 if (synch_message.all_data_sent ()) 00707 { 00708 return 1; 00709 } 00710 00711 return 0; 00712 } |
|
Send a synchronous message, i.e. block until the message is on the wire Definition at line 538 of file Transport.cpp. References ACE_ERROR, ACE_GUARD_RETURN, ACE_NEW_RETURN, ACE_TEXT, TAO_Synch_Queued_Message::current_block(), ETIME, TAO_Flushing_Strategy::flush_message(), TAO_ORB_Core::flushing_strategy(), LM_ERROR, orb_core(), TAO_Queued_Message::push_back(), TAO_Queued_Message::push_front(), TAO_Queued_Message::remove_from_list(), TAO_Flushing_Strategy::schedule_output(), send_synch_message_helper_i(), and TAO_debug_level. Referenced by send_message_shared_i().
00540 { 00541 // We are going to block, so there is no need to clone 00542 // the message block. 00543 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); 00544 00545 // Push synch_message on to the back of the queue. 00546 synch_message.push_back (this->head_, this->tail_); 00547 00548 int const n = 00549 this->send_synch_message_helper_i (synch_message, 00550 max_wait_time); 00551 00552 if (n == -1 || n == 1) 00553 { 00554 return n; 00555 } 00556 00557 // @todo: Check for timeouts! 00558 // if (max_wait_time != 0 && errno == ETIME) return -1; 00559 TAO_Flushing_Strategy *flushing_strategy = 00560 this->orb_core ()->flushing_strategy (); 00561 (void) flushing_strategy->schedule_output (this); 00562 00563 // Release the mutex, other threads may modify the queue as we 00564 // block for a long time writing out data. 00565 int result; 00566 { 00567 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; 00568 TAO_REVERSE_LOCK reverse (*this->handler_lock_); 00569 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, 00570 ace_mon, 00571 reverse, 00572 -1); 00573 00574 result = flushing_strategy->flush_message (this, 00575 &synch_message, 00576 max_wait_time); 00577 } 00578 00579 if (result == -1) 00580 { 00581 synch_message.remove_from_list (this->head_, this->tail_); 00582 00583 if (errno == ETIME) 00584 { 00585 if (this->head_ == &synch_message) 00586 { 00587 // This is a timeout, there is only one nasty case: the 00588 // message has been partially sent! We simply cannot take 00589 // the message out of the queue, because that would corrupt 00590 // the connection. 00591 // 00592 // What we do is replace the queued message with an 00593 // asynchronous message, that contains only what remains of 00594 // the timed out request. If you think about sending 00595 // CancelRequests in this case: there is no much point in 00596 // doing that: the receiving ORB would probably ignore it, 00597 // and figuring out the request ID would be a bit of a 00598 // nightmare. 00599 // 00600 00601 synch_message.remove_from_list (this->head_, this->tail_); 00602 TAO_Queued_Message *queued_message = 0; 00603 ACE_NEW_RETURN (queued_message, 00604 TAO_Asynch_Queued_Message ( 00605 synch_message.current_block (), 00606 this->orb_core_, 00607 0, 00608 1), 00609 -1); 00610 queued_message->push_front (this->head_, this->tail_); 00611 } 00612 } 00613 00614 if (TAO_debug_level > 0) 00615 { 00616 ACE_ERROR ((LM_ERROR, 00617 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ") 00618 ACE_TEXT ("error while flushing message - %m\n"), 00619 this->id ())); 00620 } 00621 00622 return -1; 00623 } 00624 00625 return 1; 00626 } |
|
Accessor to sent_byte_count_.
Definition at line 187 of file Transport.inl. References sent_byte_count_.
00188 { 00189 return this->sent_byte_count_; 00190 } |
|
Return the protocol tag. The OMG assigns unique tags (a 32-bit unsigned number) to each protocol. New protocol tags can be obtained free of charge from the OMG, check the documents in corbafwd.h for more details. Definition at line 8 of file Transport.inl.
00009 { 00010 return this->tag_; 00011 } |
|
Extracts the list of listen points from the cdr stream. The list would have the protocol specific details of the ListenPoints Reimplemented in TAO_IIOP_Transport. Definition at line 255 of file Transport.cpp. References ACE_NOTSUP_RETURN. Referenced by TAO_GIOP_Message_Generator_Parser_12::process_bidir_context().
00256 { 00257 ACE_NOTSUP_RETURN (-1); 00258 } |
|
Get the TAO_Tranport_Mux_Strategy used by this object. The role of the TAO_Transport_Mux_Strategy is described in more detail in that class' documentation. Enough is to say that the class is used to control how many threads can have pending requests over the same connection. Multiplexing multiple threads over the same connection conserves resources and is almost required for AMI, but having only one pending request per connection is more efficient and reduces the possibilities of priority inversions. Definition at line 20 of file Transport.inl. Referenced by idle_after_reply(), idle_after_send(), TAO::LocateRequest_Invocation_Adapter::invoke(), TAO::Invocation_Adapter::invoke_remote_i(), TAO_GIOP_Message_Lite::parse_reply(), TAO_GIOP_Message_Base::process_reply_message(), and send_connection_closed_notifications().
00021 { 00022 return tms_; 00023 } |
|
Helper method that returns the Transport Cache Manager.
Definition at line 2316 of file Transport.cpp. References TAO_ORB_Core::lane_resources(), and TAO_Thread_Lane_Resources::transport_cache(). Referenced by make_idle(), purge_entry(), recache_transport(), and update_transport().
02317 { 02318 return this->orb_core_->lane_resources ().transport_cache (); 02319 } |
|
Cache management.
Definition at line 442 of file Transport.cpp. References transport_cache_manager(), and TAO::Transport_Cache_Manager::update_entry(). Referenced by TAO_Connection_Handler::handle_input_internal(), TAO_Connection_Handler::handle_output_eh(), and TAO_Connection_Handler::svc_i().
00443 { 00444 return this->transport_cache_manager ().update_entry (this->cache_map_entry_); 00445 } |
|
Return the TAO_Wait_Strategy used by this object. The role of the TAO_Wait_Strategy is described in more detail in that class' documentation. Enough is to say that the ORB can wait for a reply blocking on read(), using the Reactor to wait for multiple events concurrently or using the Leader/Followers protocol. Definition at line 27 of file Transport.inl. Referenced by TAO_Connection_Handler::close_connection_eh(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Transport_Cache_Manager::find_transport(), TAO_Connection_Handler::handle_input_eh(), TAO_IIOP_Connection_Handler::open(), and post_open().
00028 { 00029 return this->ws_; 00030 } |
|
CodeSet negotiation - Set the wchar codeset translator factory.
Definition at line 144 of file Transport.inl. References tcs_set_.
00145 { 00146 this->wchar_translator_ = tf; 00147 this->tcs_set_ = 1; 00148 } |
|
CodeSet Negotiation - Get the wchar codeset translator factory.
Definition at line 131 of file Transport.inl.
00132 { 00133 return this->wchar_translator_; 00134 } |
|
This class needs priviledged access to Definition at line 785 of file Transport.h. |
|
Definition at line 803 of file Transport.h. |
|
These classes need privileged access to: Definition at line 802 of file Transport.h. |
|
Needs priveleged access to event_handler_i () Definition at line 807 of file Transport.h. |
|
Have we sent any info on bidirectional information or have we received any info regarding making the connection served by this transport bidirectional. The flag is used as follows: + We dont want to send the bidirectional context info more than once on the connection. Why? Waste of marshalling and demarshalling time on the client. + On the server side -- once a client that has established the connection asks the server to use the connection both ways, we *dont* want the server to pack service info to the client. That is not allowed. We need a flag to prevent such a things from happening. The value of this flag will be 0 if the client sends info and 1 if the server receives the info. Definition at line 958 of file Transport.h. Referenced by bidirectional_flag(). |
|
Our entry in the cache. We don't own this. It is here for our convenience. We cannot just change things around. Definition at line 930 of file Transport.h. Referenced by cache_map_entry(). |
|
Additional member values required to support codeset translation. @Phil, I think it would be nice if we could think of a way to do the following. We have been trying to use the transport for marking about translator factories and such! IMHO this is a wrong encapulation ie. trying to populate the transport object with these details. We should probably have a class something like TAO_Message_Property or TAO_Message_Translator or whatever (I am sure you get the idea) and encapsulate all these details. Coupling these seems odd. if I have to be more cynical we can move this to the connection_handler and it may more sense with the DSCP stuff around there. Do you agree? Definition at line 1033 of file Transport.h. |
|
The queue will start draining no later than if* the deadline is Definition at line 975 of file Transport.h. Referenced by check_buffering_constraints_i(), handle_timeout(), and reset_flush_timer(). |
|
First_request_ is true until the first request is sent or received. This is necessary since codeset context information is necessary only on the first request. After that, the translators are fixed for the life of the connection. Definition at line 1045 of file Transport.h. Referenced by first_request_sent(), and generate_request_header(). |
|
The timer ID.
Definition at line 978 of file Transport.h. Referenced by check_buffering_constraints_i(), flush_timer_pending(), and reset_flush_timer(). |
|
This is an Definition at line 992 of file Transport.h. Referenced by ~TAO_Transport(). |
|
Implement the outgoing data queue.
Definition at line 963 of file Transport.h. |
|
A unique identifier for the transport. This never *never* changes over the lifespan, so we don't have to worry about locking it. HINT: Protocol-specific transports that use connection handler might choose to set this to the handle for their connection. Definition at line 1002 of file Transport.h. |
|
Queue of the consolidated, incoming messages..
Definition at line 967 of file Transport.h. Referenced by consolidate_enqueue_message(), handle_input_parse_data(), and process_queue_head(). |
|
Stack of incoming fragments, consolidated messages are going to be enqueued in "incoming_message_queue_" Definition at line 971 of file Transport.h. Referenced by handle_input(), handle_input_missing_data(), handle_input_parse_data(), and handle_input_parse_extra_messages(). |
|
Is this transport really connected or not. In case of oneways with SYNC_NONE Policy we don't wait until the connection is ready and we buffer the requests in this transport until the connection is ready Definition at line 1016 of file Transport.h. Referenced by is_connected(), post_open(), and ~TAO_Transport(). |
|
Definition at line 960 of file Transport.h. Referenced by opened_as(), and provide_blockable_handler(). |
|
Global orbcore resource.
Definition at line 926 of file Transport.h. Referenced by TAO_GIOP_Message_Lite::process_locate_request(), TAO_GIOP_Message_Base::process_locate_request(), TAO_GIOP_Message_Lite::process_request(), and TAO_GIOP_Message_Base::process_request(). |
|
Holds the partial GIOP message (if there is one).
Definition at line 1048 of file Transport.h. Referenced by allocate_partial_message_block(), and handle_input_parse_data(). |
|
Used by the LRU, LFU and FIFO Connection Purging Strategies.
Definition at line 1005 of file Transport.h. Referenced by purging_order(). |
|
Size of the buffer received.
Definition at line 1008 of file Transport.h. |
|
Number of bytes sent.
Definition at line 1011 of file Transport.h. Referenced by drain_queue_helper(), drain_queue_i(), and sent_byte_count(). |
|
IOP protocol tag.
Definition at line 923 of file Transport.h. |
|
Definition at line 964 of file Transport.h. |
|
The tcs_set_ flag indicates that negotiation has occured and so the translators are correct, since a null translator is valid if both ends are using the same codeset, whatever that codeset might be. Definition at line 1039 of file Transport.h. Referenced by char_translator(), is_tcs_set(), and wchar_translator(). |
|
Strategy to decide whether multiple requests can be sent over the same connection or the connection is exclusive for a request. Definition at line 934 of file Transport.h. |
|
The adapter used to receive timeout callbacks from the Reactor.
Definition at line 981 of file Transport.h. |
|
Definition at line 1034 of file Transport.h. |
|
Strategy for waiting for the reply after sending the request.
Definition at line 937 of file Transport.h. |