#include <Transport.h>
Inheritance diagram for TAO_Transport:
Template methods | |
The Transport class uses the Template Method Pattern to implement the protocol specific functionality. Implementors of a pluggable protocol should override the following methods with the semantics documented below. | |
enum | { TAO_ONEWAY_REQUEST = 0, TAO_TWOWAY_REQUEST = 1, TAO_REPLY } |
virtual ACE_Event_Handler * | event_handler_i (void)=0 |
bool | is_connected (void) const |
Is this transport really connected. | |
bool | post_open (size_t id) |
Perform all the actions when this transport get opened. | |
void | pre_close (void) |
do what needs to be done when closing the transport | |
TAO_Connection_Handler * | connection_handler (void) |
Get the connection handler for this transport. | |
TAO_OutputCDR & | out_stream (void) |
Accessor for the output CDR stream. | |
int | generate_locate_request (TAO_Target_Specification &spec, TAO_Operation_Details &opdetails, TAO_OutputCDR &output) |
virtual int | generate_request_header (TAO_Operation_Details &opd, TAO_Target_Specification &spec, TAO_OutputCDR &msg) |
int | recache_transport (TAO_Transport_Descriptor_Interface *desc) |
Recache ourselves in the cache. | |
virtual int | handle_input (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time=0) |
Callback to read incoming data. | |
virtual int | send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, int message_semantics, ACE_Time_Value *max_time_wait)=0 |
virtual int | send_message (TAO_OutputCDR &stream, TAO_Stub *stub=0, int message_semantics=TAO_Transport::TAO_TWOWAY_REQUEST, ACE_Time_Value *max_time_wait=0)=0 |
virtual int | send_message_shared (TAO_Stub *stub, int message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
Sent the contents of message_block. | |
int | format_queue_message (TAO_OutputCDR &stream, ACE_Time_Value *max_wait_time) |
int | send_message_block_chain (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time=0) |
Send a message block chain,. | |
int | send_message_block_chain_i (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time) |
Send a message block chain, assuming the lock is held. | |
int | purge_entry (void) |
Cache management. | |
int | make_idle (void) |
Cache management. | |
int | update_transport (void) |
Cache management. | |
int | handle_timeout (const ACE_Time_Value ¤t_time, const void *act) |
size_t | recv_buffer_size (void) const |
Accessor to recv_buffer_size_. | |
size_t | sent_byte_count (void) const |
Accessor to sent_byte_count_. | |
TAO_Codeset_Translator_Base * | char_translator (void) const |
CodeSet Negotiation - Get the char codeset translator factory. | |
TAO_Codeset_Translator_Base * | wchar_translator (void) const |
CodeSet Negotiation - Get the wchar codeset translator factory. | |
void | char_translator (TAO_Codeset_Translator_Base *) |
CodeSet negotiation - Set the char codeset translator factory. | |
void | wchar_translator (TAO_Codeset_Translator_Base *) |
CodeSet negotiation - Set the wchar codeset translator factory. | |
void | assign_translators (TAO_InputCDR *, TAO_OutputCDR *) |
void | clear_translators (TAO_InputCDR *, TAO_OutputCDR *) |
CORBA::Boolean | is_tcs_set () const |
Return true if the tcs has been set. | |
void | first_request_sent () |
Set the state of the first_request_ flag to 0. | |
void | send_connection_closed_notifications (void) |
TAO::Transport::Stats * | stats (void) const |
Transport statistics. | |
virtual TAO_Connection_Handler * | connection_handler_i (void)=0 |
int | process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) |
int | send_message_shared_i (TAO_Stub *stub, int message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
int | queue_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
CORBA::ULong const | tag_ |
IOP protocol tag. | |
TAO_ORB_Core *const | orb_core_ |
Global orbcore resource. | |
TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * | cache_map_entry_ |
TAO_Transport_Mux_Strategy * | tms_ |
TAO_Wait_Strategy * | ws_ |
Strategy for waiting for the reply after sending the request. | |
int | bidirectional_flag_ |
TAO::Connection_Role | opening_connection_role_ |
TAO_Queued_Message * | head_ |
Implement the outgoing data queue. | |
TAO_Queued_Message * | tail_ |
TAO_Incoming_Message_Queue | incoming_message_queue_ |
Queue of the consolidated, incoming messages.. | |
TAO::Incoming_Message_Stack | incoming_message_stack_ |
ACE_Time_Value | current_deadline_ |
long | flush_timer_id_ |
The timer ID. | |
TAO_Transport_Timer | transport_timer_ |
The adapter used to receive timeout callbacks from the Reactor. | |
ACE_Lock * | handler_lock_ |
size_t | id_ |
A unique identifier for the transport. | |
unsigned long | purging_order_ |
Used by the LRU, LFU and FIFO Connection Purging Strategies. | |
size_t | recv_buffer_size_ |
Size of the buffer received. | |
size_t | sent_byte_count_ |
Number of bytes sent. | |
bool | is_connected_ |
TAO::Transport_Cache_Manager & | transport_cache_manager (void) |
Helper method that returns the Transport Cache Manager. | |
int | drain_queue (void) |
Send some of the data in the queue. | |
int | drain_queue_i (void) |
Implement drain_queue() assuming the lock is held. | |
bool | queue_is_empty_i (void) |
Check if there are messages pending in the queue. | |
int | drain_queue_helper (int &iovcnt, iovec iov[]) |
A helper routine used in drain_queue_i(). | |
int | schedule_output_i (void) |
Schedule handle_output() callbacks. | |
int | cancel_output_i (void) |
Cancel handle_output() callbacks. | |
void | cleanup_queue (size_t byte_count) |
Cleanup the queue. | |
void | cleanup_queue_i () |
Cleanup the complete queue. | |
int | check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush) |
Check if the buffering constraints have been reached. | |
int | send_synchronous_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
int | send_reply_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
int | send_asynchronous_message_i (TAO_Stub *stub, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
int | send_synch_message_helper_i (TAO_Synch_Queued_Message &s, ACE_Time_Value *max_wait_time) |
int | flush_timer_pending (void) const |
Check if the flush timer is still pending. | |
void | reset_flush_timer (void) |
void | report_invalid_event_handler (const char *caller) |
Print out error messages if the event handler is not valid. | |
int | handle_input_missing_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time, TAO_Queued_Data *q_data) |
int | handle_input_parse_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time) |
int | handle_input_parse_extra_messages (ACE_Message_Block &message_block) |
int | consolidate_enqueue_message (TAO_Queued_Data *qd) |
-1 error, otherwise 0 | |
int | consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) |
-1 error, otherwise 0 | |
int | process_queue_head (TAO_Resume_Handle &rh) |
int | notify_reactor (void) |
void | send_connection_closed_notifications_i (void) |
Assume the lock is held. | |
void | allocate_partial_message_block (void) |
TAO_Transport (const TAO_Transport &) | |
void | operator= (const TAO_Transport &) |
TAO_Codeset_Translator_Base * | char_translator_ |
Additional member values required to support codeset translation. | |
TAO_Codeset_Translator_Base * | wchar_translator_ |
CORBA::Boolean | tcs_set_ |
CORBA::Boolean | first_request_ |
ACE_Message_Block * | partial_message_ |
Holds the partial GIOP message (if there is one). | |
TAO::Transport::Stats * | stats_ |
Statistics. | |
class | TAO_Reactive_Flushing_Strategy |
class | TAO_Leader_Follower_Flushing_Strategy |
class | TAO_Thread_Per_Connection_Handler |
Public Types | |
Public Member Functions | |
TAO_Transport (CORBA::ULong tag, TAO_ORB_Core *orb_core) | |
Default creator, requires the tag value be supplied. | |
virtual | ~TAO_Transport (void) |
Destructor. | |
CORBA::ULong | tag (void) const |
Return the protocol tag. | |
TAO_ORB_Core * | orb_core (void) const |
Access the ORB that owns this connection. | |
TAO_Transport_Mux_Strategy * | tms (void) const |
Get the TAO_Tranport_Mux_Strategy used by this object. | |
TAO_Wait_Strategy * | wait_strategy (void) const |
Return the TAO_Wait_Strategy used by this object. | |
int | handle_output (void) |
Callback method to reactively drain the outgoing data queue. | |
int | bidirectional_flag (void) const |
Get the bidirectional flag. | |
void | bidirectional_flag (int flag) |
Set the bidirectional flag. | |
void | cache_map_entry (TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *entry) |
Set the Cache Map entry. | |
TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * | cache_map_entry (void) |
Get the Cache Map entry. | |
size_t | id (void) const |
Set and Get the identifier for this transport instance. | |
void | id (size_t id) |
TAO::Connection_Role | opened_as (void) const |
void | opened_as (TAO::Connection_Role) |
unsigned long | purging_order (void) const |
void | purging_order (unsigned long value) |
bool | queue_is_empty (void) |
Check if there are messages pending in the queue. | |
void | provide_handler (TAO::Connection_Handler_Set &handlers) |
Added event handler to the handlers set. | |
bool | provide_blockable_handler (TAO::Connection_Handler_Set &handlers) |
virtual int | register_handler (void) |
Register the handler with the reactor. | |
virtual ssize_t | send (iovec *iov, int iovcnt, size_t &bytes_transferred, const ACE_Time_Value *timeout=0)=0 |
Write the complete Message_Block chain to the connection. | |
virtual ssize_t | recv (char *buffer, size_t len, const ACE_Time_Value *timeout=0)=0 |
Read len bytes from into buf. | |
Control connection lifecycle | |
These methods are routed through the TMS object. The TMS strategies implement them correctly. | |
bool | idle_after_send (void) |
bool | idle_after_reply (void) |
virtual void | close_connection (void) |
Call the implementation method after obtaining the lock. | |
Template methods | |
The Transport class uses the Template Method Pattern to implement the protocol specific functionality. Implementors of a pluggable protocol should override the following methods with the semantics documented below. | |
virtual int | messaging_init (CORBA::Octet major, CORBA::Octet minor)=0 |
virtual int | tear_listen_point_list (TAO_InputCDR &cdr) |
virtual bool | post_connect_hook (void) |
Hooks that can be overridden in concrete transports. | |
ACE_Event_Handler::Reference_Count | add_reference (void) |
Memory management routines. | |
ACE_Event_Handler::Reference_Count | remove_reference (void) |
virtual TAO_Pluggable_Messaging * | messaging_object (void)=0 |
The transport object is created in the Service handler constructor and deleted in the Service Handler's destructor!!
The main responsability of a Transport object is to encapsulate a connection, and provide a transport independent way to send and receive data. Since TAO is heavily based on the Reactor for all if not all its I/O the Transport class is usually implemented with a helper Connection Handler that adapts the generic Transport interface to the Reactor types.
One of the responsibilities of the TAO_Transport class is to send out GIOP messages as efficiently as possible. In most cases messages are put out in FIFO order, the transport object will put out the message using a single system call and return control to the application. However, for oneways and AMI requests it may be more efficient (or required if the SYNC_NONE policy is in effect) to queue the messages until a large enough data set is available. Another reason to queue is that some applications cannot block for I/O, yet they want to send messages so large that a single write() operation would not be able to cope with them. In such cases we need to queue the data and use the Reactor to drain the queue.
Therefore, the Transport class may need to use a queue to temporarily hold the messages, and, in some configurations, it may need to use the Reactor to concurrently drain such queues.
TAO provides explicit policies to send 'urgent' messages. Such messages may put at the head of the queue. However, they cannot be sent immediately because the transport may already be sending another message in a reactive fashion.
Consequently, the Transport must also know if the head of the queue has been partially sent. In that case new messages can only follow the head. Only once the head is completely sent we can start sending new messages.
One or more threads can be blocked waiting for the connection to completely send the message. The thread should return as soon as its message has been sent, so a per-thread condition is required. This suggest that simply using a ACE_Message_Queue would not be enough: there is a significant amount of ancillary information, to keep on each message that the Message_Block class does not provide room for.
Blocking I/O is still attractive for some applications. First, my eliminating the Reactor overhead performance is improved when sending large blocks of data. Second, using the Reactor to send out data opens the door for nested upcalls, yet some applications cannot deal with the reentrancy issues in this case.
Some or all messages could have a timeout period attached to them. The timeout source could either be some high-level policy or maybe some strategy to prevent denial of service attacks. In any case the timeouts are per-message, and later messages could have shorter timeouts. In fact, some kind of scheduling (such as EDF) could be required in a few applications.
The outgoing data path consist in several components:
The Transport object provides a single method to send request messages (send_request_message ()).
One of the main responsibilities of the transport is to read and process the incoming GIOP message as quickly and efficiently as possible. There are other forces that needs to be given due consideration. They are
The messages should be checked for validity and the right information should be sent to the higher layer for processing. The process of doing a sanity check and preparing the messages for the higher layers of the ORB are done by the messaging protocol.
To keep things as efficient as possible for medium sized requests, it would be good to minimise data copying and locking along the incoming path ie. from the time of reading the data from the handle to the application. We achieve this by creating a buffer on stack and reading the data from the handle into the buffer. We then pass the same data block (the buffer is encapsulated into a data block) to the higher layers of the ORB. The problems stem from the following (a) Data is bigger than the buffer that we have on stack (b) Transports like TCP do not guarantee availability of the whole chunk of data in one shot. Data could trickle in byte by byte. (c) Single read gives multiple messages
We solve the problems as follows
(a) First do a read with the buffer on stack. Query the underlying messaging object whether the message has any incomplete portion. If so, data will be copied into new buffer being able to hold full message and is queued; succeeding events will read data from socket and write directly into this buffer. Otherwise, if if the message in local buffer is complete, we free the handle and then send the message to the higher layers of the ORB for processing.
(b) If buffer with incomplete message has been enqueued, while trying to do the above, the reactor will call us back when the handle becomes read ready. The read-operation will copy data directly into the enqueued buffer. If the message has bee read completely the message is sent to the higher layers of the ORB for processing.
(c) If we get multiple messages (possible if the client connected to the server sends oneways or AMI requests), we parse and split the messages. Every message is put in the queue. Once the messages are queued, the thread picks up one message to send to the higher layers of the ORB. Before doing that, if it finds more messages, it sends a notify to the reactor without resuming the handle. The next thread picks up a message from the queue and processes that. Once the queue is drained the last thread resumes the handle.
We could use the outgoing path of the ORB to send replies. This would allow us to reuse most of the code in the outgoing data path. We were doing this till TAO-1.2.3. We run in to problems. When writing the reply the ORB gets flow controlled, and the ORB tries to flush the message by going into the reactor. This resulted in unnecessary nesting. The thread that gets into the Reactor could potentially handle other messages (incoming or outgoing) and the stack starts growing leading to crashes.
The solution that we (plan to) adopt is pretty straight forward. The thread sending replies will not block to send the replies but queue the replies and return to the Reactor. (Note the careful usages of the terms "blocking in the Reactor" as opposed to "return back to the Reactor".
See Also:
Definition at line 245 of file Transport.h.
|
Definition at line 597 of file Transport.h.
00598 { 00599 TAO_ONEWAY_REQUEST = 0, 00600 TAO_TWOWAY_REQUEST = 1, 00601 TAO_REPLY 00602 }; |
|
Default creator, requires the tag value be supplied.
Definition at line 126 of file Transport.cpp. References ACE_NEW_THROW_EX, TAO_ORB_Core::client_factory(), TAO_Client_Strategy_Factory::create_transport_mux_strategy(), TAO_Client_Strategy_Factory::create_wait_strategy(), TAO_HAS_SENDFILE, and ws_.
00128 : tag_ (tag) 00129 , orb_core_ (orb_core) 00130 , cache_map_entry_ (0) 00131 , bidirectional_flag_ (-1) 00132 , opening_connection_role_ (TAO::TAO_UNSPECIFIED_ROLE) 00133 , head_ (0) 00134 , tail_ (0) 00135 , incoming_message_queue_ (orb_core) 00136 , current_deadline_ (ACE_Time_Value::zero) 00137 , flush_timer_id_ (-1) 00138 , transport_timer_ (this) 00139 , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ()) 00140 , id_ ((size_t) this) 00141 , purging_order_ (0) 00142 , recv_buffer_size_ (0) 00143 , sent_byte_count_ (0) 00144 , is_connected_ (false) 00145 , char_translator_ (0) 00146 , wchar_translator_ (0) 00147 , tcs_set_ (0) 00148 , first_request_ (1) 00149 , partial_message_ (0) 00150 #if TAO_HAS_SENDFILE == 1 00151 // The ORB has been configured to use the MMAP allocator, meaning 00152 // we could/should use sendfile() to send data. Cast once rather 00153 // here rather than during each send. This assumes that all 00154 // TAO_OutputCDR instances are using the same TAO_MMAP_Allocator 00155 // instance as the underlying output CDR buffer allocator. 00156 , mmap_allocator_ ( 00157 dynamic_cast<TAO_MMAP_Allocator *> ( 00158 orb_core->output_cdr_buffer_allocator ())) 00159 #endif /* TAO_HAS_SENDFILE==1 */ 00160 { 00161 TAO_Client_Strategy_Factory *cf = 00162 this->orb_core_->client_factory (); 00163 00164 // Create WS now. 00165 this->ws_ = cf->create_wait_strategy (this); 00166 00167 // Create TMS now. 00168 this->tms_ = cf->create_transport_mux_strategy (this); 00169 00170 #if TAO_HAS_TRANSPORT_CURRENT == 1 00171 // Allocate stats 00172 ACE_NEW_THROW_EX (this->stats_, 00173 TAO::Transport::Stats, 00174 CORBA::NO_MEMORY ()); 00175 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 00176 00177 /* 00178 * Hook to add code that initializes components that 00179 * belong to the concrete protocol implementation. 00180 * Further additions to this Transport class will 00181 * need to add code *before* this hook. 00182 */ 00183 //@@ TAO_TRANSPORT_SPL_CONSTRUCTOR_ADD_HOOK 00184 } |
|
Destructor.
Definition at line 186 of file Transport.cpp. References ACE_ASSERT, cleanup_queue_i(), handler_lock_, purge_entry(), ACE_Message_Block::release(), stats_, and ws_.
00187 { 00188 delete this->ws_; 00189 00190 delete this->tms_; 00191 00192 delete this->handler_lock_; 00193 00194 if (!this->is_connected_) 00195 { 00196 // When we have a not connected transport we could have buffered 00197 // messages on this transport which we have to cleanup now. 00198 this->cleanup_queue_i(); 00199 00200 // Cleanup our cache entry 00201 this->purge_entry(); 00202 } 00203 00204 // Release the partial message block, however we may 00205 // have never allocated one. 00206 ACE_Message_Block::release (this->partial_message_); 00207 00208 // By the time the destructor is reached here all the connection stuff 00209 // *must* have been cleaned up. 00210 00211 // The following assert is needed for the test "Bug_2494_Regression". 00212 // See the bugzilla bug #2494 for details. 00213 ACE_ASSERT (this->head_ == 0); 00214 ACE_ASSERT (this->cache_map_entry_ == 0); 00215 00216 #if TAO_HAS_TRANSPORT_CURRENT == 1 00217 delete this->stats_; 00218 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 00219 00220 /* 00221 * Hook to add code that cleans up components 00222 * belong to the concrete protocol implementation. 00223 * Further additions to this Transport class will 00224 * need to add code *before* this hook. 00225 */ 00226 //@@ TAO_TRANSPORT_SPL_DESTRUCTOR_ADD_HOOK 00227 } |
|
|
|
Memory management routines.
Definition at line 2484 of file Transport.cpp. References ACE_Event_Handler::add_reference(), and event_handler_i(). Referenced by TAO::Cache_IntId::Cache_IntId(), TAO_Connection_Handler::connection_pending(), TAO::Cache_IntId::operator=(), provide_blockable_handler(), provide_handler(), TAO::Transport_Cache_Manager::purge(), TAO_Thread_Per_Connection_Handler::TAO_Thread_Per_Connection_Handler(), and TAO_Asynch_Reply_Dispatcher_Base::transport().
02485 { 02486 return this->event_handler_i ()->add_reference (); 02487 } |
|
Allocate a partial message block and store it in our partial_message_ data member. Definition at line 2558 of file Transport.cpp. References ACE_NEW, TAO_Pluggable_Messaging::header_length(), messaging_object(), and partial_message_. Referenced by handle_input_parse_data().
02559 { 02560 if (this->partial_message_ == 0) 02561 { 02562 // This value must be at least large enough to hold a GIOP message 02563 // header plus a GIOP fragment header 02564 size_t const partial_message_size = 02565 this->messaging_object ()->header_length (); 02566 // + this->messaging_object ()->fragment_header_length (); 02567 // deprecated, conflicts with not-single_read_opt. 02568 02569 ACE_NEW (this->partial_message_, 02570 ACE_Message_Block (partial_message_size)); 02571 } 02572 } |
|
Use the Transport's codeset factories to set the translator for input and output CDRs. Definition at line 2454 of file Transport.cpp. References TAO_Codeset_Translator_Base::assign(). Referenced by TAO::LocateRequest_Invocation::check_reply(), TAO::Synch_Twoway_Invocation::check_reply_status(), TAO_GIOP_Message_Base::process_reply_message(), TAO_GIOP_Message_Base::process_request(), TAO_GIOP_Message_Base::process_request_message(), and TAO::Remote_Invocation::write_header().
02455 { 02456 if (this->char_translator_) 02457 { 02458 this->char_translator_->assign (inp); 02459 this->char_translator_->assign (outp); 02460 } 02461 if (this->wchar_translator_) 02462 { 02463 this->wchar_translator_->assign (inp); 02464 this->wchar_translator_->assign (outp); 02465 } 02466 } |
|
Set the bidirectional flag.
Definition at line 39 of file Transport.inl. References bidirectional_flag_.
00040 { 00041 this->bidirectional_flag_ = flag; 00042 } |
|
Get the bidirectional flag.
Definition at line 33 of file Transport.inl. References bidirectional_flag_. Referenced by TAO::Wait_On_LF_No_Upcall::can_process_upcalls(), TAO_IIOP_Transport::generate_request_header(), TAO_Muxed_TMS::request_id(), TAO_Exclusive_TMS::request_id(), and TAO_IIOP_Transport::tear_listen_point_list().
00034 { 00035 return this->bidirectional_flag_; 00036 } |
|
Get the Cache Map entry.
Definition at line 57 of file Transport.inl. References cache_map_entry_.
00058 { 00059 return this->cache_map_entry_; 00060 } |
|
Set the Cache Map entry.
Definition at line 63 of file Transport.inl. References cache_map_entry_, and TAO::Transport_Cache_Manager::HASH_MAP_ENTRY. Referenced by TAO::Transport_Cache_Manager::bind_i().
00065 { 00066 this->cache_map_entry_ = entry; 00067 } |
|
Cancel handle_output() callbacks.
Definition at line 794 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, ACE_Reactor::cancel_wakeup(), event_handler_i(), LM_DEBUG, ACE_Event_Handler::reactor(), and TAO_debug_level. Referenced by TAO_Reactive_Flushing_Strategy::cancel_output(), and TAO_Leader_Follower_Flushing_Strategy::cancel_output().
00795 { 00796 ACE_Event_Handler * const eh = this->event_handler_i (); 00797 ACE_Reactor *const reactor = eh->reactor (); 00798 00799 if (TAO_debug_level > 3) 00800 { 00801 ACE_DEBUG ((LM_DEBUG, 00802 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"), 00803 this->id ())); 00804 } 00805 00806 return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK); 00807 } |
|
CodeSet negotiation - Set the char codeset translator factory.
Definition at line 137 of file Transport.inl. References tcs_set_.
00138 { 00139 this->char_translator_ = tf; 00140 this->tcs_set_ = 1; 00141 } |
|
CodeSet Negotiation - Get the char codeset translator factory.
Definition at line 125 of file Transport.inl.
00126 { 00127 return this->char_translator_; 00128 } |
|
Check if the buffering constraints have been reached.
Definition at line 1138 of file Transport.cpp. References TAO::Transport_Queueing_Strategy::buffering_constraints_reached(), ACE_Reactor::cancel_timer(), current_deadline_, event_handler_i(), flush_timer_id_, flush_timer_pending(), ACE_OS::gettimeofday(), TAO_Queued_Message::message_length(), TAO_Queued_Message::next(), ACE_Event_Handler::reactor(), ACE_Reactor::schedule_timer(), and TAO_Stub::transport_queueing_strategy(). Referenced by send_asynchronous_message_i().
01139 { 01140 // First let's compute the size of the queue: 01141 size_t msg_count = 0; 01142 size_t total_bytes = 0; 01143 01144 for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) 01145 { 01146 ++msg_count; 01147 total_bytes += i->message_length (); 01148 } 01149 01150 bool set_timer = false; 01151 ACE_Time_Value new_deadline; 01152 01153 TAO::Transport_Queueing_Strategy *queue_strategy = 01154 stub->transport_queueing_strategy (); 01155 01156 bool constraints_reached = true; 01157 01158 if (queue_strategy) 01159 { 01160 constraints_reached = 01161 queue_strategy->buffering_constraints_reached (stub, 01162 msg_count, 01163 total_bytes, 01164 must_flush, 01165 this->current_deadline_, 01166 set_timer, 01167 new_deadline); 01168 } 01169 else 01170 { 01171 must_flush = false; 01172 } 01173 01174 // ... set the new timer, also cancel any previous timers ... 01175 if (set_timer) 01176 { 01177 ACE_Event_Handler *eh = this->event_handler_i (); 01178 ACE_Reactor * const reactor = eh->reactor (); 01179 this->current_deadline_ = new_deadline; 01180 ACE_Time_Value delay = new_deadline - ACE_OS::gettimeofday (); 01181 01182 if (this->flush_timer_pending ()) 01183 { 01184 reactor->cancel_timer (this->flush_timer_id_); 01185 } 01186 01187 this->flush_timer_id_ = 01188 reactor->schedule_timer (&this->transport_timer_, 01189 &this->current_deadline_, 01190 delay); 01191 } 01192 01193 return constraints_reached; 01194 } |
|
Cleanup the queue. Exactly byte_count bytes have been sent, the queue must be cleaned up as potentially several messages have been completely sent out. It leaves on head_ the next message to send out. Definition at line 1101 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_Queued_Message::all_data_sent(), TAO_Queued_Message::bytes_transferred(), TAO_Queued_Message::destroy(), LM_DEBUG, TAO_Queued_Message::message_length(), TAO_Queued_Message::remove_from_list(), and TAO_debug_level. Referenced by drain_queue_helper().
01102 { 01103 while (this->head_ != 0 && byte_count > 0) 01104 { 01105 TAO_Queued_Message *i = this->head_; 01106 01107 if (TAO_debug_level > 4) 01108 { 01109 ACE_DEBUG ((LM_DEBUG, 01110 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ") 01111 ACE_TEXT ("byte_count = %d\n"), 01112 this->id (), byte_count)); 01113 } 01114 01115 // Update the state of the first message 01116 i->bytes_transferred (byte_count); 01117 01118 if (TAO_debug_level > 4) 01119 { 01120 ACE_DEBUG ((LM_DEBUG, 01121 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ") 01122 ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"), 01123 this->id (), byte_count, i->all_data_sent (), 01124 i->message_length ())); 01125 } 01126 01127 // ... if all the data was sent the message must be removed from 01128 // the queue... 01129 if (i->all_data_sent ()) 01130 { 01131 i->remove_from_list (this->head_, this->tail_); 01132 i->destroy (); 01133 } 01134 } 01135 } |
|
Cleanup the complete queue.
Definition at line 1058 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_Queued_Message::destroy(), LM_DEBUG, TAO_Queued_Message::message_length(), TAO_Queued_Message::remove_from_list(), TAO_LF_Event::state_changed(), and TAO_debug_level. Referenced by pre_close(), send_connection_closed_notifications_i(), and ~TAO_Transport().
01059 { 01060 if (TAO_debug_level > 4) 01061 { 01062 ACE_DEBUG ((LM_DEBUG, 01063 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ") 01064 ACE_TEXT ("cleaning up complete queue\n"), 01065 this->id ())); 01066 } 01067 01068 size_t byte_count = 0; 01069 int msg_count = 0; 01070 01071 // Cleanup all messages 01072 while (this->head_ != 0) 01073 { 01074 TAO_Queued_Message *i = this->head_; 01075 01076 if (TAO_debug_level > 4) 01077 { 01078 byte_count += i->message_length(); 01079 ++msg_count; 01080 } 01081 // @@ This is a good point to insert a flag to indicate that a 01082 // CloseConnection message was successfully received. 01083 i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED, 01084 this->orb_core_->leader_follower ()); 01085 01086 i->remove_from_list (this->head_, this->tail_); 01087 01088 i->destroy (); 01089 } 01090 01091 if (TAO_debug_level > 4) 01092 { 01093 ACE_DEBUG ((LM_DEBUG, 01094 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ") 01095 ACE_TEXT ("discarded %d messages, %u bytes.\n"), 01096 this->id (), msg_count, byte_count)); 01097 } 01098 } |
|
It is necessary to clear the codeset translator when a CDR stream is used for more than one GIOP message. This is required since the header must not be translated, whereas the body must be. Definition at line 2469 of file Transport.cpp. References ACE_OutputCDR::char_translator(), ACE_InputCDR::char_translator(), ACE_OutputCDR::wchar_translator(), and ACE_InputCDR::wchar_translator(). Referenced by TAO::Remote_Invocation::write_header().
02470 { 02471 if (inp) 02472 { 02473 inp->char_translator (0); 02474 inp->wchar_translator (0); 02475 } 02476 if (outp) 02477 { 02478 outp->char_translator (0); 02479 outp->wchar_translator (0); 02480 } 02481 } |
|
Call the implementation method after obtaining the lock.
Definition at line 312 of file Transport.cpp. References TAO_Connection_Handler::close_connection(), and connection_handler_i(). Referenced by TAO_IIOP_Connector::complete_connection(), TAO::LocateRequest_Invocation::invoke(), post_open(), TAO::Transport_Cache_Manager::purge(), TAO::Synch_Twoway_Invocation::remote_twoway(), TAO_GIOP_Message_Base::send_close_connection(), TAO::Remote_Invocation::send_message(), send_message_shared(), TAO_IIOP_Transport::send_message_shared(), TAO_Wait_On_Read::wait(), TAO::Synch_Twoway_Invocation::wait_for_reply(), and TransportCleanupGuard::~TransportCleanupGuard().
00313 { 00314 this->connection_handler_i ()->close_connection (); 00315 } |
|
Get the connection handler for this transport.
Definition at line 175 of file Transport.inl. References connection_handler_i(). Referenced by TAO_IIOP_Connector::complete_connection(), TAO::Remote_Invocation::send_message(), TAO_Connect_Strategy::wait(), and TAO_Connector::wait_for_connection_completion().
00176 { 00177 return this->connection_handler_i(); 00178 } |
|
Implemented in TAO_IIOP_Transport. Referenced by close_connection(), and connection_handler(). |
|
-1 error, otherwise 0
Definition at line 1616 of file Transport.cpp. References ACE_ERROR, ACE_TEXT, TAO_Pluggable_Messaging::consolidate_fragmented_message(), TAO_Incoming_Message_Queue::enqueue_tail(), incoming_message_queue_, LM_ERROR, messaging_object(), TAO_Queued_Data::missing_data(), TAO_Queued_Data::more_fragments(), TAO_Queued_Data::msg_type(), TAO_Queued_Data::release(), TAO_debug_level, and TAO_PLUGGABLE_MESSAGE_FRAGMENT. Referenced by handle_input_parse_data(), and handle_input_parse_extra_messages().
01617 { 01618 // consolidate message on top of stack, only for fragmented messages 01619 01620 // paranoid check 01621 if (q_data->missing_data () != 0) 01622 { 01623 return -1; 01624 } 01625 01626 if (q_data->more_fragments () || 01627 q_data->msg_type () == TAO_PLUGGABLE_MESSAGE_FRAGMENT) 01628 { 01629 TAO_Queued_Data *new_q_data = 0; 01630 01631 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data)) 01632 { 01633 case -1: // error 01634 return -1; 01635 01636 case 0: // returning consolidated message in new_q_data 01637 if (!new_q_data) 01638 { 01639 if (TAO_debug_level > 0) 01640 { 01641 ACE_ERROR ((LM_ERROR, 01642 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ") 01643 ACE_TEXT ("error, consolidated message is NULL\n"), 01644 this->id ())); 01645 } 01646 return -1; 01647 } 01648 01649 if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0) 01650 { 01651 TAO_Queued_Data::release (new_q_data); 01652 return -1; 01653 } 01654 break; 01655 01656 case 1: // fragment has been stored in messaging_oject() 01657 break; 01658 } 01659 } 01660 else 01661 { 01662 if (this->incoming_message_queue_.enqueue_tail (q_data) != 0) 01663 { 01664 TAO_Queued_Data::release (q_data); 01665 return -1; 01666 } 01667 } 01668 01669 return 0; // success 01670 } |
|
-1 error, otherwise 0
Definition at line 1529 of file Transport.cpp. References ACE_ERROR, ACE_TEXT, TAO_Pluggable_Messaging::consolidate_fragmented_message(), LM_ERROR, messaging_object(), TAO_Queued_Data::missing_data(), TAO_Queued_Data::more_fragments(), TAO_Queued_Data::msg_type(), process_parsed_messages(), TAO_Queued_Data::release(), TAO_debug_level, and TAO_PLUGGABLE_MESSAGE_FRAGMENT. Referenced by handle_input_missing_data().
01531 { 01532 // paranoid check 01533 if (q_data->missing_data () != 0) 01534 { 01535 if (TAO_debug_level > 0) 01536 { 01537 ACE_ERROR ((LM_ERROR, 01538 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") 01539 ACE_TEXT ("missing data\n"), 01540 this->id ())); 01541 } 01542 return -1; 01543 } 01544 01545 if (q_data->more_fragments () || 01546 q_data->msg_type () == TAO_PLUGGABLE_MESSAGE_FRAGMENT) 01547 { 01548 // consolidate message on top of stack, only for fragmented messages 01549 TAO_Queued_Data *new_q_data = 0; 01550 01551 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data)) 01552 { 01553 case -1: // error 01554 return -1; 01555 01556 case 0: // returning consolidated message in q_data 01557 if (!new_q_data) 01558 { 01559 if (TAO_debug_level > 0) 01560 { 01561 ACE_ERROR ((LM_ERROR, 01562 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") 01563 ACE_TEXT ("error, consolidated message is NULL\n"), 01564 this->id ())); 01565 } 01566 return -1; 01567 } 01568 01569 01570 if (this->process_parsed_messages (new_q_data, rh) == -1) 01571 { 01572 TAO_Queued_Data::release (new_q_data); 01573 01574 if (TAO_debug_level > 0) 01575 { 01576 ACE_ERROR ((LM_ERROR, 01577 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") 01578 ACE_TEXT ("error processing consolidated message\n"), 01579 this->id ())); 01580 } 01581 return -1; 01582 } 01583 01584 TAO_Queued_Data::release (new_q_data); 01585 01586 break; 01587 01588 case 1: // fragment has been stored in messaging_oject() 01589 break; 01590 } 01591 } 01592 else 01593 { 01594 if (this->process_parsed_messages (q_data, rh) == -1) 01595 { 01596 TAO_Queued_Data::release (q_data); 01597 01598 if (TAO_debug_level > 0) 01599 { 01600 ACE_ERROR ((LM_ERROR, 01601 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") 01602 ACE_TEXT ("error processing message\n"), 01603 this->id ())); 01604 } 01605 return -1; 01606 } 01607 01608 TAO_Queued_Data::release (q_data); 01609 01610 } 01611 01612 return 0; 01613 } |
|
Send some of the data in the queue. As the outgoing data is drained this method is invoked to send as much of the current message as possible. Returns 0 if there is more data to send, -1 if there was an error and 1 if the message was completely sent. Definition at line 849 of file Transport.cpp. References ACE_GUARD_RETURN, TAO_Flushing_Strategy::cancel_output(), drain_queue_i(), TAO_ORB_Core::flushing_strategy(), and orb_core(). Referenced by handle_output().
00850 { 00851 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); 00852 int const retval = this->drain_queue_i (); 00853 00854 if (retval == 1) 00855 { 00856 // ... there is no current message or it was completely 00857 // sent, cancel output... 00858 TAO_Flushing_Strategy *flushing_strategy = 00859 this->orb_core ()->flushing_strategy (); 00860 00861 flushing_strategy->cancel_output (this); 00862 00863 return 0; 00864 } 00865 00866 return retval; 00867 } |
|
A helper routine used in drain_queue_i().
Definition at line 870 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, cleanup_queue(), dump_iov(), EWOULDBLOCK, LM_DEBUG, send(), sent_byte_count_, ssize_t, and TAO_debug_level. Referenced by drain_queue_i().
00871 { 00872 size_t byte_count = 0; 00873 00874 // ... send the message ... 00875 ssize_t retval = -1; 00876 00877 #if TAO_HAS_SENDFILE == 1 00878 if (this->mmap_allocator_) 00879 retval = this->sendfile (this->mmap_allocator_, 00880 iov, 00881 iovcnt, 00882 byte_count); 00883 else 00884 #endif /* TAO_HAS_SENDFILE==1 */ 00885 retval = this->send (iov, iovcnt, byte_count); 00886 00887 if (TAO_debug_level == 5) 00888 { 00889 dump_iov (iov, iovcnt, this->id (), 00890 byte_count, "drain_queue_helper"); 00891 } 00892 00893 // ... now we need to update the queue, removing elements 00894 // that have been sent, and updating the last element if it 00895 // was only partially sent ... 00896 this->cleanup_queue (byte_count); 00897 iovcnt = 0; 00898 00899 if (retval == 0) 00900 { 00901 if (TAO_debug_level > 4) 00902 { 00903 ACE_DEBUG ((LM_DEBUG, 00904 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") 00905 ACE_TEXT ("send() returns 0\n"), 00906 this->id ())); 00907 } 00908 return -1; 00909 } 00910 else if (retval == -1) 00911 { 00912 if (TAO_debug_level > 4) 00913 { 00914 ACE_DEBUG ((LM_DEBUG, 00915 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") 00916 ACE_TEXT ("error during %p\n"), 00917 this->id (), ACE_TEXT ("send()"))); 00918 } 00919 00920 if (errno == EWOULDBLOCK || errno == EAGAIN) 00921 { 00922 return 0; 00923 } 00924 00925 return -1; 00926 } 00927 00928 // ... start over, how do we guarantee progress? Because if 00929 // no bytes are sent send() can only return 0 or -1 00930 00931 // Total no. of bytes sent for a send call 00932 this->sent_byte_count_ += byte_count; 00933 00934 if (TAO_debug_level > 4) 00935 { 00936 ACE_DEBUG ((LM_DEBUG, 00937 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") 00938 ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"), 00939 this->id(), byte_count, (this->head_ == 0))); 00940 } 00941 00942 return 1; 00943 } |
|
Implement drain_queue() assuming the lock is held.
Definition at line 946 of file Transport.cpp. References ACE_DEBUG, ACE_IOV_MAX, ACE_TEXT, ACE_Reactor::cancel_timer(), TAO_Queued_Message::destroy(), drain_queue_helper(), event_handler_i(), TAO_Queued_Message::fill_iov(), flush_timer_pending(), ACE_High_Res_Timer::gettimeofday_hr(), TAO_Queued_Message::is_expired(), LM_DEBUG, TAO_Queued_Message::next(), ACE_Event_Handler::reactor(), TAO_Queued_Message::remove_from_list(), reset_flush_timer(), sent_byte_count_, TAO_LF_Event::state_changed(), and TAO_debug_level. Referenced by drain_queue(), send_message_block_chain_i(), and send_synch_message_helper_i().
00947 { 00948 // This is the vector used to send data, it must be declared outside 00949 // the loop because after the loop there may still be data to be 00950 // sent 00951 int iovcnt = 0; 00952 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) 00953 iovec iov[ACE_IOV_MAX] = { 0 , 0 }; 00954 #else 00955 iovec iov[ACE_IOV_MAX]; 00956 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ 00957 00958 // We loop over all the elements in the queue ... 00959 TAO_Queued_Message *i = this->head_; 00960 00961 // Reset the value so that the counting is done for each new send 00962 // call. 00963 this->sent_byte_count_ = 0; 00964 00965 // Avoid calling this expensive function each time through the loop. Instead 00966 // we'll assume that the time is unlikely to change much during the loop. 00967 // If we are forced to send in the loop then we'll recompute the time. 00968 ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr (); 00969 00970 while (i != 0) 00971 { 00972 if (i->is_expired (now)) 00973 { 00974 if (TAO_debug_level > 3) 00975 { 00976 ACE_DEBUG ((LM_DEBUG, 00977 ACE_TEXT ("TAO (%P|%t - Transport[%d]::drain_queue_i, ") 00978 ACE_TEXT ("Discarding expired queued message.\n"), 00979 this->id ())); 00980 } 00981 TAO_Queued_Message *next = i->next (); 00982 i->state_changed (TAO_LF_Event::LFS_TIMEOUT, 00983 this->orb_core_->leader_follower ()); 00984 i->remove_from_list (this->head_, this->tail_); 00985 i->destroy (); 00986 i = next; 00987 continue; 00988 } 00989 // ... each element fills the iovector ... 00990 i->fill_iov (ACE_IOV_MAX, iovcnt, iov); 00991 00992 // ... the vector is full, no choice but to send some data out. 00993 // We need to loop because a single message can span multiple 00994 // IOV_MAX elements ... 00995 if (iovcnt == ACE_IOV_MAX) 00996 { 00997 int const retval = 00998 this->drain_queue_helper (iovcnt, iov); 00999 01000 now = ACE_High_Res_Timer::gettimeofday_hr (); 01001 01002 if (TAO_debug_level > 4) 01003 { 01004 ACE_DEBUG ((LM_DEBUG, 01005 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ") 01006 ACE_TEXT ("helper retval = %d\n"), 01007 this->id (), retval)); 01008 } 01009 01010 if (retval != 1) 01011 { 01012 return retval; 01013 } 01014 01015 i = this->head_; 01016 continue; 01017 } 01018 // ... notice that this line is only reached if there is still 01019 // room in the iovector ... 01020 i = i->next (); 01021 } 01022 01023 if (iovcnt != 0) 01024 { 01025 int const retval = this->drain_queue_helper (iovcnt, iov); 01026 01027 if (TAO_debug_level > 4) 01028 { 01029 ACE_DEBUG ((LM_DEBUG, 01030 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ") 01031 ACE_TEXT ("helper retval = %d\n"), 01032 this->id (), retval)); 01033 } 01034 01035 if (retval != 1) 01036 { 01037 return retval; 01038 } 01039 } 01040 01041 if (this->head_ == 0) 01042 { 01043 if (this->flush_timer_pending ()) 01044 { 01045 ACE_Event_Handler *eh = this->event_handler_i (); 01046 ACE_Reactor * const reactor = eh->reactor (); 01047 reactor->cancel_timer (this->flush_timer_id_); 01048 this->reset_flush_timer (); 01049 } 01050 01051 return 1; 01052 } 01053 01054 return 0; 01055 } |
|
Normally a concrete TAO_Transport object has-a ACE_Event_Handler member that functions as an adapter between the ACE_Reactor framework and the TAO pluggable protocol framework. In all the protocols implemented so far this role is fullfilled by an instance of ACE_Svc_Handler.
Implemented in TAO_IIOP_Transport. Referenced by add_reference(), cancel_output_i(), check_buffering_constraints_i(), drain_queue_i(), TAO::Transport_Cache_Manager::find_transport(), notify_reactor(), register_handler(), remove_reference(), schedule_output_i(), TAO_Connection_Handler::transport(), and TAO_Wait_On_Read::wait(). |
|
Set the state of the first_request_ flag to 0.
Definition at line 158 of file Transport.inl. References first_request_. Referenced by TAO_IIOP_Transport::send_request().
00159 { 00160 this->first_request_ = 0; 00161 } |
|
Check if the flush timer is still pending.
Definition at line 108 of file Transport.inl. References flush_timer_id_. Referenced by check_buffering_constraints_i(), drain_queue_i(), and handle_timeout().
00109 { 00110 return this->flush_timer_id_ != -1; 00111 } |
|
Format and queue a message for stream
Definition at line 499 of file Transport.cpp. References ACE_OutputCDR::begin(), TAO_Pluggable_Messaging::format_message(), messaging_object(), and queue_message_i(). Referenced by TAO::Synch_Oneway_Invocation::remote_oneway().
00501 { 00502 if (this->messaging_object ()->format_message (stream) != 0) 00503 return -1; 00504 00505 return this->queue_message_i (stream.begin (), max_wait_time); 00506 } |
|
This is a request for the transport object to write a LocateRequest header before it is sent out. Definition at line 371 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_Pluggable_Messaging::generate_locate_request_header(), LM_DEBUG, messaging_object(), and TAO_debug_level. Referenced by TAO::LocateRequest_Invocation::invoke().
00375 { 00376 if (this->messaging_object ()->generate_locate_request_header (opdetails, 00377 spec, 00378 output) == -1) 00379 { 00380 if (TAO_debug_level > 0) 00381 { 00382 ACE_DEBUG ((LM_DEBUG, 00383 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ") 00384 ACE_TEXT ("error while marshalling the LocateRequest header\n"), 00385 this->id ())); 00386 } 00387 00388 return -1; 00389 } 00390 00391 return 0; 00392 } |
|
This is a request for the transport object to write a request header before it sends out the request Reimplemented in TAO_IIOP_Transport. Definition at line 395 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_ORB_Core::codeset_manager(), first_request_, TAO_Pluggable_Messaging::generate_request_header(), TAO_Codeset_Manager::generate_service_context(), LM_DEBUG, messaging_object(), orb_core(), and TAO_debug_level. Referenced by TAO_IIOP_Transport::generate_request_header(), and TAO::Remote_Invocation::write_header().
00399 { 00400 // codeset service context is only supposed to be sent in the first request 00401 // on a particular connection. 00402 if (this->first_request_) 00403 { 00404 TAO_Codeset_Manager * const csm = this->orb_core ()->codeset_manager (); 00405 if (csm) 00406 csm->generate_service_context (opdetails,*this); 00407 } 00408 00409 if (this->messaging_object ()->generate_request_header (opdetails, 00410 spec, 00411 output) == -1) 00412 { 00413 if (TAO_debug_level > 0) 00414 { 00415 ACE_DEBUG ((LM_DEBUG, 00416 ACE_TEXT ("(%P|%t) - Transport[%d]::generate_request_header, ") 00417 ACE_TEXT ("error while marshalling the Request header\n"), 00418 this->id())); 00419 } 00420 00421 return -1; 00422 } 00423 00424 return 0; 00425 } |
|
Callback to read incoming data. The ACE_Event_Handler adapter invokes this method as part of its handle_input() operation.
Definition at line 1455 of file Transport.cpp. References ACE_DEBUG, ACE_ERROR, ACE_TEXT, handle_input_missing_data(), handle_input_parse_data(), incoming_message_stack_, LM_DEBUG, LM_ERROR, TAO_Queued_Data::missing_data(), process_queue_head(), TAO_debug_level, TAO_MISSING_DATA_UNDEFINED, and TAO::Incoming_Message_Stack::top(). Referenced by TAO_Connection_Handler::handle_input_internal(), TAO_Connection_Handler::svc_i(), and TAO_Wait_On_Read::wait().
01457 { 01458 if (TAO_debug_level > 3) 01459 { 01460 ACE_DEBUG ((LM_DEBUG, 01461 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"), 01462 this->id ())); 01463 } 01464 01465 // First try to process messages of the head of the incoming queue. 01466 int const retval = this->process_queue_head (rh); 01467 01468 if (retval <= 0) 01469 { 01470 if (retval == -1) 01471 { 01472 if (TAO_debug_level > 2) 01473 { 01474 ACE_DEBUG ((LM_DEBUG, 01475 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") 01476 ACE_TEXT ("error while parsing the head of the queue\n"), 01477 this->id())); 01478 01479 } 01480 return -1; 01481 } 01482 else 01483 { 01484 // retval == 0 01485 01486 // Processed a message in queue successfully. This 01487 // thread must return to thread-pool now. 01488 return 0; 01489 } 01490 } 01491 01492 TAO_Queued_Data *q_data = 0; 01493 01494 if (this->incoming_message_stack_.top (q_data) != -1 01495 && q_data->missing_data () != TAO_MISSING_DATA_UNDEFINED) 01496 { 01497 /* PRE: q_data->missing_data_ > 0 as all QD on stack must be incomplete */ 01498 if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1) 01499 { 01500 if (TAO_debug_level > 0) 01501 { 01502 ACE_ERROR ((LM_ERROR, 01503 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") 01504 ACE_TEXT ("error consolidating incoming message\n"), 01505 this->id ())); 01506 } 01507 return -1; 01508 } 01509 } 01510 else 01511 { 01512 if (this->handle_input_parse_data (rh, max_wait_time) == -1) 01513 { 01514 if (TAO_debug_level > 0) 01515 { 01516 ACE_ERROR ((LM_ERROR, 01517 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") 01518 ACE_TEXT ("error parsing incoming message\n"), 01519 this->id ())); 01520 } 01521 return -1; 01522 } 01523 } 01524 01525 return 0; 01526 } |
|
Is invoked by handle_input operation. It consolidate message on top of incoming_message_stack. The amount of missing data is known and recv operation copies data directly into message buffer, as much as a single recv-invocation provides. Definition at line 1673 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, consolidate_process_message(), ACE_CDR::grow(), incoming_message_stack_, ACE_Message_Block::length(), LM_DEBUG, TAO_Queued_Data::missing_data(), TAO_Queued_Data::msg_block(), TAO::Incoming_Message_Stack::pop(), recv(), ACE_Message_Block::space(), ssize_t, TAO_debug_level, and ACE_Message_Block::wr_ptr(). Referenced by handle_input().
01676 { 01677 // paranoid check 01678 if (q_data == 0) 01679 { 01680 return -1; 01681 } 01682 01683 if (TAO_debug_level > 3) 01684 { 01685 ACE_DEBUG ((LM_DEBUG, 01686 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ") 01687 ACE_TEXT ("enter (missing data == %d)\n"), 01688 this->id (), q_data->missing_data ())); 01689 } 01690 01691 size_t const recv_size = q_data->missing_data (); 01692 01693 if (q_data->msg_block ()->space() < recv_size) 01694 { 01695 // make sure the message_block has enough space 01696 size_t const message_size = recv_size + q_data->msg_block ()->length(); 01697 01698 if (ACE_CDR::grow (q_data->msg_block (), message_size) == -1) 01699 { 01700 return -1; 01701 } 01702 } 01703 01704 // Saving the size of the received buffer in case any one needs to 01705 // get the size of the message thats received in the 01706 // context. Obviously the value will be changed for each recv call 01707 // and the user is supposed to invoke the accessor only in the 01708 // invocation context to get meaningful information. 01709 this->recv_buffer_size_ = recv_size; 01710 01711 // Read the message into the existing message block on heap 01712 ssize_t const n = this->recv (q_data->msg_block ()->wr_ptr(), 01713 recv_size, 01714 max_wait_time); 01715 01716 01717 if (n <= 0) 01718 { 01719 return n; 01720 } 01721 01722 if (TAO_debug_level > 3) 01723 { 01724 ACE_DEBUG ((LM_DEBUG, 01725 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ") 01726 ACE_TEXT ("read bytes %d\n"), 01727 this->id (), n)); 01728 } 01729 01730 q_data->msg_block ()->wr_ptr(n); 01731 q_data->missing_data (q_data->missing_data () - n); 01732 01733 if (q_data->missing_data () == 0) 01734 { 01735 // paranoid check 01736 if (this->incoming_message_stack_.pop (q_data) == -1) 01737 { 01738 return -1; 01739 } 01740 01741 if (this->consolidate_process_message (q_data, rh) == -1) 01742 { 01743 return -1; 01744 } 01745 } 01746 01747 return 0; 01748 } |
|
Is invoked by handle_input operation. It parses new messages from input stream or consolidates messages whose header has been partially read, the message size being unknown so far. It parses as much data as a single recv-invocation provides. Definition at line 1795 of file Transport.cpp. References ACE_DEBUG, ACE_ERROR, ACE_TEXT, allocate_partial_message_block(), consolidate_enqueue_message(), TAO_Pluggable_Messaging::consolidate_node(), ACE_Message_Block::copy(), TAO_Queued_Data::duplicate(), ACE_CDR::grow(), handle_input_parse_extra_messages(), TAO_Pluggable_Messaging::header_length(), incoming_message_queue_, incoming_message_stack_, TAO_ORB_Core::input_cdr_dblock_allocator(), ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO_ORB_Core::locking_strategy(), ACE_CDR::mb_align(), ACE_OS::memset(), messaging_object(), TAO_Queued_Data::missing_data(), TAO_Queued_Data::more_fragments(), TAO_Queued_Data::msg_block(), TAO_Queued_Data::msg_type(), notify_reactor(), TAO_ORB_Core::orb_params(), TAO_Pluggable_Messaging::parse_next_message(), partial_message_, TAO::Incoming_Message_Stack::pop(), process_parsed_messages(), process_queue_head(), TAO::Incoming_Message_Stack::push(), TAO_Incoming_Message_Queue::queue_length(), ACE_Message_Block::rd_ptr(), recv(), ACE_Message_Block::reset(), TAO_Resume_Handle::set_flag(), TAO_ORB_Parameters::single_read_optimization(), ACE_Message_Block::space(), ssize_t, TAO_debug_level, TAO_MAXBUFSIZE, TAO_MISSING_DATA_UNDEFINED, TAO_PLUGGABLE_MESSAGE_FRAGMENT, TAO::Incoming_Message_Stack::top(), and ACE_Message_Block::wr_ptr(). Referenced by handle_input().
01797 { 01798 01799 if (TAO_debug_level > 3) 01800 { 01801 ACE_DEBUG ((LM_DEBUG, 01802 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 01803 ACE_TEXT ("enter\n"), 01804 this->id ())); 01805 } 01806 01807 01808 // The buffer on the stack which will be used to hold the input 01809 // messages, ACE_CDR::MAX_ALIGNMENT compensates the 01810 // memory-alignment. This improves performance with SUN-Java-ORB-1.4 01811 // and higher that sends fragmented requests of size 1024 bytes. 01812 char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT]; 01813 01814 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) 01815 (void) ACE_OS::memset (buf, 01816 '\0', 01817 sizeof buf); 01818 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ 01819 01820 // Create a data block 01821 ACE_Data_Block db (sizeof (buf), 01822 ACE_Message_Block::MB_DATA, 01823 buf, 01824 this->orb_core_->input_cdr_buffer_allocator (), 01825 this->orb_core_->locking_strategy (), 01826 ACE_Message_Block::DONT_DELETE, 01827 this->orb_core_->input_cdr_dblock_allocator ()); 01828 01829 // Create a message block 01830 ACE_Message_Block message_block (&db, 01831 ACE_Message_Block::DONT_DELETE, 01832 this->orb_core_->input_cdr_msgblock_allocator ()); 01833 01834 01835 // Align the message block 01836 ACE_CDR::mb_align (&message_block); 01837 01838 size_t recv_size = 0; // Note: unsigned integer 01839 01840 // Pointer to newly parsed message 01841 TAO_Queued_Data *q_data = 0; 01842 01843 // optimizing access of constants 01844 size_t const header_length = this->messaging_object ()->header_length (); 01845 01846 // paranoid check 01847 if (header_length > message_block.space ()) 01848 { 01849 return -1; 01850 } 01851 01852 if (this->orb_core_->orb_params ()->single_read_optimization ()) 01853 { 01854 recv_size = message_block.space (); 01855 } 01856 else 01857 { 01858 // Single read optimization has been de-activated. That means 01859 // that we need to read from transport the GIOP header first 01860 // before the payload. This codes first checks the incoming 01861 // stack for partial messages which needs to be 01862 // consolidated. Otherwise we are in new cycle, reading complete 01863 // GIOP header of new incoming message. 01864 if (this->incoming_message_stack_.top (q_data) != -1 01865 && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED) 01866 { 01867 // There is a partial message on incoming_message_stack_ 01868 // whose length is unknown so far. We need to consolidate 01869 // the GIOP header to get to know the payload size, 01870 recv_size = header_length - q_data->msg_block ()->length (); 01871 } 01872 else 01873 { 01874 // Read amount of data forming GIOP header of new incoming 01875 // message. 01876 recv_size = header_length; 01877 } 01878 // POST: 0 <= recv_size <= header_length 01879 } 01880 // POST: 0 <= recv_size <= message_block->space () 01881 01882 // If we have a partial message, copy it into our message block and 01883 // clear out the partial message. 01884 if (this->partial_message_ != 0 && this->partial_message_->length () > 0) 01885 { 01886 // (*) Copy back the partial message into current read-buffer, 01887 // verify that the read-strategy of "recv_size" bytes is not 01888 // exceeded. The latter check guarantees that recv_size does not 01889 // roll-over and keeps in range 01890 // 0<=recv_size<=message_block->space() 01891 if (this->partial_message_->length () <= recv_size && 01892 message_block.copy (this->partial_message_->rd_ptr (), 01893 this->partial_message_->length ()) == 0) 01894 { 01895 01896 recv_size -= this->partial_message_->length (); 01897 this->partial_message_->reset (); 01898 } 01899 else 01900 { 01901 return -1; 01902 } 01903 } 01904 // POST: 0 <= recv_size <= buffer_space 01905 01906 if (0 >= recv_size) // paranoid: the check above (*) guarantees recv_size>=0 01907 { 01908 // This event would cause endless looping, trying frequently to 01909 // read zero bytes from stream. This might happen, if TAOs 01910 // protocol implementation is not correct and tries to read data 01911 // beyond header without "single_read_optimazation" being 01912 // activated. 01913 if (TAO_debug_level > 0) 01914 { 01915 ACE_ERROR ((LM_ERROR, 01916 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 01917 ACE_TEXT ("Error - endless loop detection, closing connection"), 01918 this->id ())); 01919 } 01920 return -1; 01921 } 01922 01923 // Saving the size of the received buffer in case any one needs to 01924 // get the size of the message thats received in the 01925 // context. Obviously the value will be changed for each recv call 01926 // and the user is supposed to invoke the accessor only in the 01927 // invocation context to get meaningful information. 01928 this->recv_buffer_size_ = recv_size; 01929 01930 // Read the message into the message block that we have created on 01931 // the stack. 01932 ssize_t const n = this->recv (message_block.wr_ptr (), 01933 recv_size, 01934 max_wait_time); 01935 01936 // If there is an error return to the reactor.. 01937 if (n <= 0) 01938 { 01939 return n; 01940 } 01941 01942 if (TAO_debug_level > 3) 01943 { 01944 ACE_DEBUG ((LM_DEBUG, 01945 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 01946 ACE_TEXT ("read %d bytes\n"), 01947 this->id (), n)); 01948 } 01949 01950 // Set the write pointer in the stack buffer 01951 message_block.wr_ptr (n); 01952 01953 // 01954 // STACK PROCESSING OR MESSAGE CONSOLIDATION 01955 // 01956 01957 // PRE: data in buffer is aligned && message_block.length() > 0 01958 01959 if (this->incoming_message_stack_.top (q_data) != -1 01960 && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED) 01961 { 01962 // 01963 // MESSAGE CONSOLIDATION 01964 // 01965 01966 // Partial message on incoming_message_stack_ needs to be 01967 // consolidated. The message header could not be parsed so far 01968 // and therefor the message size is unknown yet. Consolidating 01969 // the message destroys the memory alignment of succeeding 01970 // messages sharing the buffer, for that reason consolidation 01971 // and stack based processing are mutial exclusive. 01972 if (this->messaging_object ()->consolidate_node (q_data, 01973 message_block) == -1) 01974 { 01975 if (TAO_debug_level > 0) 01976 { 01977 ACE_ERROR ((LM_ERROR, 01978 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 01979 ACE_TEXT ("error consolidating message from input buffer\n"), 01980 this->id () )); 01981 } 01982 return -1; 01983 } 01984 01985 // Complete message are to be enqueued and later processed 01986 if (q_data->missing_data () == 0) 01987 { 01988 if (this->incoming_message_stack_.pop (q_data) == -1) 01989 { 01990 return -1; 01991 } 01992 01993 if (this->consolidate_enqueue_message (q_data) == -1) 01994 { 01995 return -1; 01996 } 01997 } 01998 01999 if (message_block.length () > 0 02000 && this->handle_input_parse_extra_messages (message_block) == -1) 02001 { 02002 return -1; 02003 } 02004 02005 // In any case try to process the enqueued messages 02006 if (this->process_queue_head (rh) == -1) 02007 { 02008 return -1; 02009 } 02010 } 02011 else 02012 { 02013 // 02014 // STACK PROCESSING (critical path) 02015 // 02016 02017 // Process the first message in buffer on stack 02018 02019 // (PRE: first message resides in aligned memory) Make a node of 02020 // the message-block.. 02021 02022 TAO_Queued_Data qd (&message_block, 02023 this->orb_core_->transport_message_buffer_allocator ()); 02024 02025 size_t mesg_length = 0; 02026 02027 if (this->messaging_object ()->parse_next_message (qd, 02028 mesg_length) == -1 02029 || (qd.missing_data () == 0 02030 && mesg_length > message_block.length ()) ) 02031 { 02032 // extracting message failed 02033 return -1; 02034 } 02035 // POST: qd.missing_data_ == 0 --> mesg_length <= message_block.length() 02036 // This prevents seeking rd_ptr behind the wr_ptr 02037 02038 if (qd.missing_data () != 0 || 02039 qd.more_fragments () || 02040 qd.msg_type () == TAO_PLUGGABLE_MESSAGE_FRAGMENT) 02041 { 02042 if (qd.missing_data () == 0) 02043 { 02044 // Dealing with a fragment 02045 TAO_Queued_Data *nqd = 02046 TAO_Queued_Data::duplicate (qd); 02047 02048 if (nqd == 0) 02049 { 02050 return -1; 02051 } 02052 02053 // mark the end of message in new buffer 02054 char* end_mark = nqd->msg_block ()->rd_ptr () 02055 + mesg_length; 02056 nqd->msg_block ()->wr_ptr (end_mark); 02057 02058 // move the read pointer forward in old buffer 02059 message_block.rd_ptr (mesg_length); 02060 02061 // enqueue the message 02062 if (this->consolidate_enqueue_message (nqd) == -1) 02063 { 02064 return -1; 02065 } 02066 02067 if (message_block.length () > 0 02068 && this->handle_input_parse_extra_messages (message_block) == -1) 02069 { 02070 return -1; 02071 } 02072 02073 // In any case try to process the enqueued messages 02074 if (this->process_queue_head (rh) == -1) 02075 { 02076 return -1; 02077 } 02078 } 02079 else if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED) 02080 { 02081 // Incomplete message, must be the last one in buffer 02082 02083 if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED && 02084 qd.missing_data () > message_block.space ()) 02085 { 02086 // Re-Allocate correct size on heap 02087 if (ACE_CDR::grow (qd.msg_block (), 02088 message_block.length () 02089 + qd.missing_data ()) == -1) 02090 { 02091 return -1; 02092 } 02093 } 02094 02095 TAO_Queued_Data *nqd = 02096 TAO_Queued_Data::duplicate (qd); 02097 02098 if (nqd == 0) 02099 { 02100 return -1; 02101 } 02102 02103 // move read-pointer to end of buffer 02104 message_block.rd_ptr (message_block.length()); 02105 02106 this->incoming_message_stack_.push (nqd); 02107 } 02108 } 02109 else 02110 { 02111 // 02112 // critical path 02113 // 02114 02115 // We cant process the message on stack right now. First we 02116 // have got to parse extra messages from message_block, 02117 // putting them into queue. When this is done we can return 02118 // to process this message, and notifying other threads to 02119 // process the messages in queue. 02120 02121 char * end_marker = message_block.rd_ptr () 02122 + mesg_length; 02123 02124 if (message_block.length () > mesg_length) 02125 { 02126 // There are more message in data stream to be parsed. 02127 // Safe the rd_ptr to restore later. 02128 char *rd_ptr_stack_mesg = message_block.rd_ptr (); 02129 02130 // Skip parsed message, jump to next message in buffer 02131 // PRE: mesg_length <= message_block.length () 02132 message_block.rd_ptr (mesg_length); 02133 02134 // Extract remaining messages and enqueue them for later 02135 // heap processing 02136 if (this->handle_input_parse_extra_messages (message_block) == -1) 02137 { 02138 return -1; 02139 } 02140 02141 // correct the end_marker 02142 end_marker = message_block.rd_ptr (); 02143 02144 // Restore rd_ptr 02145 message_block.rd_ptr (rd_ptr_stack_mesg); 02146 } 02147 02148 // The following if-else has been copied from 02149 // process_queue_head(). While process_queue_head() 02150 // processes message on heap, here we will process a message 02151 // on stack. 02152 02153 // Now that we have one message on stack to be processed, 02154 // check whether we have one more message in the queue... 02155 if (this->incoming_message_queue_.queue_length () > 0) 02156 { 02157 if (TAO_debug_level > 0) 02158 { 02159 ACE_DEBUG ((LM_DEBUG, 02160 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 02161 ACE_TEXT ("notify reactor\n"), 02162 this->id ())); 02163 02164 } 02165 02166 const int retval = this->notify_reactor (); 02167 02168 if (retval == 1) 02169 { 02170 // Let the class know that it doesn't need to resume the 02171 // handle.. 02172 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); 02173 } 02174 else if (retval < 0) 02175 return -1; 02176 } 02177 else 02178 { 02179 // As there are no further messages in queue just resume 02180 // the handle. Set the flag incase someone had reset the flag.. 02181 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); 02182 } 02183 02184 // PRE: incoming_message_queue is empty 02185 if (this->process_parsed_messages (&qd, 02186 rh) == -1) 02187 { 02188 return -1; 02189 } 02190 02191 // move the rd_ptr tp position of end_marker 02192 message_block.rd_ptr (end_marker); 02193 } 02194 } 02195 02196 // Now that all cases have been processed, there might be kept some data 02197 // in buffer that needs to be safed for next "handle_input" invocations. 02198 if (message_block.length () > 0) 02199 { 02200 if (this->partial_message_ == 0) 02201 { 02202 this->allocate_partial_message_block (); 02203 } 02204 02205 if (this->partial_message_ != 0 && 02206 this->partial_message_->copy (message_block.rd_ptr (), 02207 message_block.length ()) == 0) 02208 { 02209 message_block.rd_ptr (message_block.length ()); 02210 } 02211 else 02212 { 02213 return -1; 02214 } 02215 } 02216 02217 return 0; 02218 } |
|
Is invoked by handle_input_parse_data. Parses all messages remaining in message_block. Definition at line 1752 of file Transport.cpp. References consolidate_enqueue_message(), TAO_Pluggable_Messaging::extract_next_message(), incoming_message_stack_, ACE_Message_Block::length(), messaging_object(), TAO_Queued_Data::missing_data(), and TAO::Incoming_Message_Stack::push(). Referenced by handle_input_parse_data().
01753 { 01754 01755 // store buffer status of last extraction: -1 parse error, 0 01756 // incomplete message header in buffer, 1 complete messages header 01757 // parsed 01758 int buf_status = 0; 01759 01760 TAO_Queued_Data *q_data = 0; // init 01761 01762 // parse buffer until all messages have been extracted, consolidate 01763 // and enqueue complete messages, if the last message being parsed 01764 // has missin data, it is stays on top of incoming_message_stack. 01765 while (message_block.length () > 0 && 01766 (buf_status = this->messaging_object ()->extract_next_message 01767 (message_block, q_data)) != -1 && 01768 q_data != 0) // paranoid check 01769 { 01770 if (q_data->missing_data () == 0) 01771 { 01772 if (this->consolidate_enqueue_message (q_data) == -1) 01773 { 01774 return -1; 01775 } 01776 } 01777 else // incomplete message read, probably the last message in buffer 01778 { 01779 // can not fail 01780 this->incoming_message_stack_.push (q_data); 01781 } 01782 01783 q_data = 0; // reset 01784 } // while 01785 01786 if (buf_status == -1) 01787 { 01788 return -1; 01789 } 01790 01791 return 0; 01792 } |
|
Callback method to reactively drain the outgoing data queue.
Definition at line 471 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, drain_queue(), LM_DEBUG, and TAO_debug_level. Referenced by TAO_Block_Flushing_Strategy::flush_message(), TAO_Block_Flushing_Strategy::flush_transport(), and TAO_Connection_Handler::handle_output_eh().
00472 { 00473 if (TAO_debug_level > 3) 00474 { 00475 ACE_DEBUG ((LM_DEBUG, 00476 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output\n"), 00477 this->id ())); 00478 } 00479 00480 // The flushing strategy (potentially via the Reactor) wants to send 00481 // more data, first check if there is a current message that needs 00482 // more sending... 00483 int const retval = this->drain_queue (); 00484 00485 if (TAO_debug_level > 3) 00486 { 00487 ACE_DEBUG ((LM_DEBUG, 00488 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ") 00489 ACE_TEXT ("drain_queue returns %d/%d\n"), 00490 this->id (), 00491 retval, errno)); 00492 } 00493 00494 // Any errors are returned directly to the Reactor 00495 return retval; 00496 } |
|
Definition at line 810 of file Transport.cpp. References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT, current_deadline_, flush_timer_pending(), TAO_Flushing_Strategy::flush_transport(), TAO_ORB_Core::flushing_strategy(), LM_DEBUG, orb_core(), reset_flush_timer(), TAO_Flushing_Strategy::schedule_output(), and TAO_debug_level. Referenced by TAO_Transport_Timer::handle_timeout().
00812 { 00813 if (TAO_debug_level > 6) 00814 { 00815 ACE_DEBUG ((LM_DEBUG, 00816 ACE_TEXT ("TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, ") 00817 ACE_TEXT ("timer expired\n"), 00818 this->id ())); 00819 } 00820 00821 /// This is the only legal ACT in the current configuration.... 00822 if (act != &this->current_deadline_) 00823 { 00824 return -1; 00825 } 00826 00827 if (this->flush_timer_pending ()) 00828 { 00829 // The timer is always a oneshot timer, so mark is as not 00830 // pending. 00831 this->reset_flush_timer (); 00832 00833 TAO_Flushing_Strategy *flushing_strategy = 00834 this->orb_core ()->flushing_strategy (); 00835 int const result = flushing_strategy->schedule_output (this); 00836 if (result == TAO_Flushing_Strategy::MUST_FLUSH) 00837 { 00838 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; 00839 TAO_REVERSE_LOCK reverse (*this->handler_lock_); 00840 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); 00841 (void) flushing_strategy->flush_transport (this); 00842 } 00843 } 00844 00845 return 0; 00846 } |
|
Definition at line 92 of file Transport.inl.
00093 { 00094 this->id_ = id; 00095 } |
|
Set and Get the identifier for this transport instance.
If not set, this will return an integer representation of the Definition at line 86 of file Transport.inl. Referenced by TAO::Transport_Cache_Manager::bind_i(), TAO_Connection_Handler::close_connection_eh(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Transport_Cache_Manager::find_transport(), TAO_Connection_Handler::handle_input_internal(), TAO_Connector::parallel_connect(), TAO_GIOP_Message_Base::process_reply_message(), TAO::Transport_Cache_Manager::purge(), TAO_GIOP_Message_Base::send_close_connection(), TAO_GIOP_Message_Base::send_error(), TAO_Connector::wait_for_connection_completion(), and TAO_Leader_Follower::wait_for_event().
00087 { 00088 return this->id_; 00089 } |
|
Request is sent and the reply is received. Idle the transport now. Definition at line 258 of file Transport.cpp. References TAO_Transport_Mux_Strategy::idle_after_reply(), and tms(). Referenced by TAO::LocateRequest_Invocation::invoke(), and TAO::Synch_Twoway_Invocation::remote_twoway().
00259 { 00260 return this->tms ()->idle_after_reply (); 00261 } |
|
Request has been just sent, but the reply is not received. Idle the transport now. Definition at line 252 of file Transport.cpp. References TAO_Transport_Mux_Strategy::idle_after_send(), and tms(). Referenced by TAO::LocateRequest_Invocation::invoke(), and TAO::Synch_Twoway_Invocation::remote_twoway().
00253 { 00254 return this->tms ()->idle_after_send (); 00255 } |
|
Is this transport really connected.
Definition at line 164 of file Transport.inl. References ACE_GUARD_RETURN. Referenced by TAO::Cache_IntId::Cache_IntId(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Synch_Oneway_Invocation::remote_oneway(), and TAO_Connector::wait_for_connection_completion().
00165 { 00166 ACE_GUARD_RETURN (ACE_Lock, 00167 ace_mon, 00168 *this->handler_lock_, 00169 false); 00170 00171 return this->is_connected_; 00172 } |
|
Return true if the tcs has been set.
Definition at line 152 of file Transport.inl. References tcs_set_.
00153 { 00154 return tcs_set_; 00155 } |
|
Cache management.
Definition at line 447 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, LM_DEBUG, TAO::Transport_Cache_Manager::make_idle(), TAO_debug_level, and transport_cache_manager(). Referenced by TAO_Exclusive_TMS::idle_after_reply(), TAO_Muxed_TMS::idle_after_send(), and TAO_IIOP_Connection_Handler::process_listen_point_list().
00448 { 00449 if (TAO_debug_level > 3) 00450 { 00451 ACE_DEBUG ((LM_DEBUG, 00452 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"), 00453 this->id ())); 00454 } 00455 00456 return this->transport_cache_manager ().make_idle (this->cache_map_entry_); 00457 } |
|
Initialising the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects. Implemented in TAO_IIOP_Transport. |
|
Return the messaging object that is used to format the data that needs to be sent. Implemented in TAO_IIOP_Transport. Referenced by allocate_partial_message_block(), consolidate_enqueue_message(), consolidate_process_message(), format_queue_message(), TAO_On_Demand_Fragmentation_Strategy::fragment(), generate_locate_request(), generate_request_header(), handle_input_parse_data(), handle_input_parse_extra_messages(), out_stream(), process_parsed_messages(), and send_connection_closed_notifications_i(). |
|
Definition at line 2410 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, event_handler_i(), TAO_Wait_Strategy::is_registered(), LM_DEBUG, ACE_Reactor::notify(), orb_core(), TAO_ORB_Core::reactor(), TAO_debug_level, and ws_. Referenced by handle_input_parse_data(), and process_queue_head().
02411 { 02412 if (!this->ws_->is_registered ()) 02413 { 02414 return 0; 02415 } 02416 02417 ACE_Event_Handler *eh = this->event_handler_i (); 02418 02419 // Get the reactor associated with the event handler 02420 ACE_Reactor *reactor = this->orb_core ()->reactor (); 02421 02422 if (TAO_debug_level > 0) 02423 { 02424 ACE_DEBUG ((LM_DEBUG, 02425 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ") 02426 ACE_TEXT ("notify to Reactor\n"), 02427 this->id ())); 02428 } 02429 02430 02431 // Send a notification to the reactor... 02432 int const retval = reactor->notify (eh, ACE_Event_Handler::READ_MASK); 02433 02434 if (retval < 0 && TAO_debug_level > 2) 02435 { 02436 // @todo: need to think about what is the action that 02437 // we can take when we get here. 02438 ACE_DEBUG ((LM_DEBUG, 02439 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ") 02440 ACE_TEXT ("notify to the reactor failed..\n"), 02441 this->id ())); 02442 } 02443 02444 return 1; 02445 } |
|
Definition at line 51 of file Transport.inl. References opening_connection_role_.
00052 { 00053 this->opening_connection_role_ = role; 00054 } |
|
Methods dealing with the role of the connection, e.g., CLIENT or SERVER. See CORBA 2.6 Specification, Section 15.5.1 for origin of definitions. Definition at line 45 of file Transport.inl. References opening_connection_role_. Referenced by TAO::Wait_On_LF_No_Upcall::can_process_upcalls(), TAO_Connector::connect(), and TAO_IIOP_Connection_Handler::open().
00046 { 00047 return this->opening_connection_role_; 00048 } |
|
|
|
|
Accessor for the output CDR stream.
Definition at line 2496 of file Transport.cpp. References messaging_object(), and TAO_Pluggable_Messaging::out_stream(). Referenced by TAO::LocateRequest_Invocation::invoke(), TAO::Synch_Oneway_Invocation::remote_oneway(), and TAO::Synch_Twoway_Invocation::remote_twoway().
02497 { 02498 return this->messaging_object ()->out_stream (); 02499 } |
|
Hooks that can be overridden in concrete transports. These hooks are invoked just after connection establishment (or after a connection is fetched from cache). The return value signifies whether the invoker should proceed with post connection establishment activities. Protocols like SSLIOP need this to verify whether connections already established have valid certificates. There are no pre_connect_hooks () since the transport doesn't exist before a connection establishment. :-)
Definition at line 306 of file Transport.cpp. Referenced by TAO_Connector::connect().
00307 { 00308 return true; 00309 } |
|
Perform all the actions when this transport get opened.
Definition at line 2513 of file Transport.cpp. References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT, close_connection(), LM_ERROR, purge_entry(), queue_is_empty_i(), TAO_Wait_Strategy::register_handler(), TAO_debug_level, and wait_strategy(). Referenced by TAO_IIOP_Connection_Handler::open().
02514 { 02515 this->id_ = id; 02516 02517 { 02518 ACE_GUARD_RETURN (ACE_Lock, 02519 ace_mon, 02520 *this->handler_lock_, 02521 false); 02522 this->is_connected_ = true; 02523 } 02524 02525 // When we have data in our outgoing queue schedule ourselves 02526 // for output 02527 if (this->queue_is_empty_i ()) 02528 return true; 02529 02530 // If the wait strategy wants us to be registered with the reactor 02531 // then we do so. If registeration is required and it succeeds, 02532 // #REFCOUNT# becomes two. 02533 if (this->wait_strategy ()->register_handler () != 0) 02534 { 02535 // Registration failures. 02536 02537 // Purge from the connection cache, if we are not in the cache, this 02538 // just does nothing. 02539 (void) this->purge_entry (); 02540 02541 // Close the handler. 02542 (void) this->close_connection (); 02543 02544 if (TAO_debug_level > 0) 02545 ACE_ERROR ((LM_ERROR, 02546 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_connect , ") 02547 ACE_TEXT ("could not register the transport ") 02548 ACE_TEXT ("in the reactor.\n"), 02549 this->id ())); 02550 02551 return false; 02552 } 02553 02554 return true; 02555 } |
|
do what needs to be done when closing the transport
Definition at line 2502 of file Transport.cpp. References ACE_GUARD, cleanup_queue_i(), and purge_entry(). Referenced by TAO_Connection_Handler::close_connection_eh().
02503 { 02504 this->is_connected_ = false; 02505 this->purge_entry (); 02506 { 02507 ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_)); 02508 this->cleanup_queue_i (); 02509 } 02510 } |
|
Process the message by sending it to the higher layers of the ORB. Definition at line 2222 of file Transport.cpp. References ACE_DEBUG, ACE_ERROR, ACE_TEXT, TAO_Pluggable_Messaging::discard_fragmented_message(), ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO::Transport::Stats::messages_received(), messaging_object(), TAO_Queued_Data::missing_data(), TAO_Queued_Data::msg_block(), TAO_Queued_Data::msg_type(), TAO_Pluggable_Messaging::process_reply_message(), TAO_Pluggable_Messaging::process_request_message(), TAO_Resume_Handle::resume_handle(), stats_, TAO_debug_level, TAO_PLUGGABLE_MESSAGE_CANCELREQUEST, TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION, TAO_PLUGGABLE_MESSAGE_FRAGMENT, TAO_PLUGGABLE_MESSAGE_LOCATEREPLY, TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST, TAO_PLUGGABLE_MESSAGE_MESSAGERROR, TAO_PLUGGABLE_MESSAGE_REPLY, and TAO_PLUGGABLE_MESSAGE_REQUEST. Referenced by consolidate_process_message(), handle_input_parse_data(), and process_queue_head().
02224 { 02225 if (TAO_debug_level > 7) 02226 { 02227 ACE_DEBUG ((LM_DEBUG, 02228 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02229 ACE_TEXT ("entering (missing data == %d)\n"), 02230 this->id(), qd->missing_data ())); 02231 } 02232 02233 #if TAO_HAS_TRANSPORT_CURRENT == 1 02234 // Update stats, if any 02235 if (this->stats_ != 0) 02236 this->stats_->messages_received (qd->msg_block ()->length ()); 02237 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 02238 02239 switch (qd->msg_type ()) 02240 { 02241 case TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION: 02242 { 02243 if (TAO_debug_level > 0) 02244 ACE_DEBUG ((LM_DEBUG, 02245 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02246 ACE_TEXT ("received CloseConnection message - %m\n"), 02247 this->id())); 02248 02249 // Return a "-1" so that the next stage can take care of 02250 // closing connection and the necessary memory management. 02251 return -1; 02252 } 02253 break; 02254 case TAO_PLUGGABLE_MESSAGE_REQUEST: 02255 case TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST: 02256 { 02257 // Let us resume the handle before we go ahead to process the 02258 // request. This will open up the handle for other threads. 02259 rh.resume_handle (); 02260 02261 if (this->messaging_object ()->process_request_message ( 02262 this, 02263 qd) == -1) 02264 { 02265 // Return a "-1" so that the next stage can take care of 02266 // closing connection and the necessary memory management. 02267 return -1; 02268 } 02269 } 02270 break; 02271 case TAO_PLUGGABLE_MESSAGE_REPLY: 02272 case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY: 02273 { 02274 rh.resume_handle (); 02275 02276 TAO_Pluggable_Reply_Params params (this); 02277 02278 if (this->messaging_object ()->process_reply_message (params, 02279 qd) == -1) 02280 { 02281 if (TAO_debug_level > 0) 02282 ACE_DEBUG ((LM_DEBUG, 02283 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02284 ACE_TEXT ("error in process_reply_message - %m\n"), 02285 this->id ())); 02286 02287 return -1; 02288 } 02289 02290 } 02291 break; 02292 case TAO_PLUGGABLE_MESSAGE_CANCELREQUEST: 02293 { 02294 // The associated request might be incomplete residing 02295 // fragmented in messaging object. We must make sure the 02296 // resources allocated by fragments are released. 02297 if (this->messaging_object ()->discard_fragmented_message (qd) == -1) 02298 { 02299 if (TAO_debug_level > 0) 02300 { 02301 ACE_ERROR ((LM_ERROR, 02302 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02303 ACE_TEXT ("error processing CancelRequest\n"), 02304 this->id ())); 02305 } 02306 } 02307 02308 // We are not able to cancel requests being processed already; 02309 // this is declared as optional feature by CORBA, and TAO does 02310 // not support this currently. 02311 02312 // Just continue processing, CancelRequest does not mean to cut 02313 // off the connection. 02314 } 02315 break; 02316 case TAO_PLUGGABLE_MESSAGE_MESSAGERROR: 02317 { 02318 if (TAO_debug_level > 0) 02319 { 02320 ACE_ERROR ((LM_ERROR, 02321 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02322 ACE_TEXT ("received MessageError, closing connection\n"), 02323 this->id ())); 02324 } 02325 return -1; 02326 } 02327 break; 02328 case TAO_PLUGGABLE_MESSAGE_FRAGMENT: 02329 { 02330 // Nothing to be done. 02331 } 02332 break; 02333 } 02334 02335 // If not, just return back.. 02336 return 0; 02337 } |
|
Definition at line 2340 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_Incoming_Message_Queue::dequeue_head(), incoming_message_queue_, LM_DEBUG, notify_reactor(), process_parsed_messages(), TAO_Incoming_Message_Queue::queue_length(), TAO_Queued_Data::release(), TAO_Resume_Handle::set_flag(), and TAO_debug_level. Referenced by handle_input(), and handle_input_parse_data().
02341 { 02342 if (TAO_debug_level > 3) 02343 { 02344 ACE_DEBUG ((LM_DEBUG, 02345 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"), 02346 this->id (), this->incoming_message_queue_.queue_length () )); 02347 } 02348 02349 // See if message in queue ... 02350 if (this->incoming_message_queue_.queue_length () > 0) 02351 { 02352 // Get the message on the head of the queue.. 02353 TAO_Queued_Data *qd = 02354 this->incoming_message_queue_.dequeue_head (); 02355 02356 if (TAO_debug_level > 3) 02357 { 02358 ACE_DEBUG ((LM_DEBUG, 02359 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ") 02360 ACE_TEXT ("the size of the queue is [%d]\n"), 02361 this->id (), 02362 this->incoming_message_queue_.queue_length())); 02363 } 02364 // Now that we have pulled out out one message out of the queue, 02365 // check whether we have one more message in the queue... 02366 if (this->incoming_message_queue_.queue_length () > 0) 02367 { 02368 if (TAO_debug_level > 0) 02369 { 02370 ACE_DEBUG ((LM_DEBUG, 02371 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ") 02372 ACE_TEXT ("notify reactor\n"), 02373 this->id ())); 02374 } 02375 02376 int const retval = this->notify_reactor (); 02377 02378 if (retval == 1) 02379 { 02380 // Let the class know that it doesn't need to resume the 02381 // handle.. 02382 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); 02383 } 02384 else if (retval < 0) 02385 return -1; 02386 } 02387 else 02388 { 02389 // As we are ready to process the last message just resume 02390 // the handle. Set the flag incase someone had reset the flag.. 02391 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); 02392 } 02393 02394 // Process the message... 02395 if (this->process_parsed_messages (qd, rh) == -1) 02396 { 02397 return -1; 02398 } 02399 02400 // Delete the Queued_Data.. 02401 TAO_Queued_Data::release (qd); 02402 02403 return 0; 02404 } 02405 02406 return 1; 02407 } |
|
Called by the cache when the ORB is shuting down.
Definition at line 238 of file Transport.cpp. References add_reference(), TAO::Connection_Handler_Set, ACE_Unbounded_Set< T >::insert(), TAO_Wait_Strategy::non_blocking(), opening_connection_role_, and ws_.
00239 { 00240 if (this->ws_->non_blocking () || 00241 this->opening_connection_role_ == TAO::TAO_SERVER_ROLE) 00242 return false; 00243 00244 (void) this->add_reference (); 00245 00246 h.insert (this->connection_handler_i ()); 00247 00248 return true; 00249 } |
|
Added event handler to the handlers set. Called by the cache when the cache is closing.
Definition at line 230 of file Transport.cpp. References add_reference(), TAO::Connection_Handler_Set, and ACE_Unbounded_Set< T >::insert().
00231 { 00232 (void) this->add_reference (); 00233 00234 handlers.insert (this->connection_handler_i ()); 00235 } |
|
Cache management.
Definition at line 441 of file Transport.cpp. References TAO::Transport_Cache_Manager::purge_entry(), and transport_cache_manager(). Referenced by TAO_Connection_Handler::close_handler(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), post_open(), pre_close(), recache_transport(), ~TAO_Transport(), and TransportCleanupGuard::~TransportCleanupGuard().
00442 { 00443 return this->transport_cache_manager ().purge_entry (this->cache_map_entry_); 00444 } |
|
Definition at line 76 of file Transport.inl. References purging_order_.
00077 { 00078 // This should only be called by the Transport Cache Manager when 00079 // it is holding it's lock. 00080 // The transport should still be here since the cache manager still 00081 // has a reference to it. 00082 this->purging_order_ = value; 00083 } |
|
Get and Set the purging order. The purging strategy uses the set version to set the purging order. Definition at line 70 of file Transport.inl. References purging_order_. Referenced by TAO_LRU_Connection_Purging_Strategy::update_item().
00071 { 00072 return this->purging_order_; 00073 } |
|
Check if there are messages pending in the queue.
Definition at line 98 of file Transport.inl. References ACE_GUARD_RETURN, and queue_is_empty_i(). Referenced by TAO_Reactive_Flushing_Strategy::flush_transport(), TAO_Leader_Follower_Flushing_Strategy::flush_transport(), and TAO_Block_Flushing_Strategy::flush_transport().
00099 { 00100 ACE_GUARD_RETURN (ACE_Lock, 00101 ace_mon, 00102 *this->handler_lock_, 00103 false); 00104 return this->queue_is_empty_i (); 00105 } |
|
Check if there are messages pending in the queue. This version assumes that the lock is already held. Use with care!
Definition at line 747 of file Transport.cpp. Referenced by post_open(), and queue_is_empty().
00748 { 00749 return (this->head_ == 0); 00750 } |
|
Queue a message for message_block
Definition at line 1432 of file Transport.cpp. References ACE_NEW_RETURN, and TAO_Queued_Message::push_back(). Referenced by format_queue_message(), and send_asynchronous_message_i().
01434 { 01435 TAO_Queued_Message *queued_message = 0; 01436 ACE_NEW_RETURN (queued_message, 01437 TAO_Asynch_Queued_Message (message_block, 01438 this->orb_core_, 01439 max_wait_time, 01440 0, 01441 true), 01442 -1); 01443 queued_message->push_back (this->head_, this->tail_); 01444 01445 return 0; 01446 } |
|
Recache ourselves in the cache.
Definition at line 430 of file Transport.cpp. References TAO::Transport_Cache_Manager::cache_transport(), purge_entry(), and transport_cache_manager(). Referenced by TAO_IIOP_Connection_Handler::process_listen_point_list().
00431 { 00432 // First purge our entry 00433 this->purge_entry (); 00434 00435 // Then add ourselves to the cache 00436 return this->transport_cache_manager ().cache_transport (desc, 00437 this); 00438 } |
|
Read len bytes from into buf. This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.
Implemented in TAO_IIOP_Transport. Referenced by handle_input_missing_data(), and handle_input_parse_data(). |
|
Accessor to recv_buffer_size_.
Definition at line 181 of file Transport.inl.
00182 { 00183 return this->recv_buffer_size_; 00184 } |
|
Register the handler with the reactor. Register the handler with the reactor. This method is used by the Wait_On_Reactor strategy. The transport must register its event handler with the ORB's Reactor.
Definition at line 318 of file Transport.cpp. References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT, event_handler_i(), TAO_Wait_Strategy::is_registered(), LM_DEBUG, ACE_Event_Handler::reactor(), TAO_ORB_Core::reactor(), ACE_Reactor::register_handler(), TAO_debug_level, and ws_. Referenced by TAO_Wait_On_Reactor::register_handler(), TAO_Wait_On_Leader_Follower::register_handler(), and TAO_Wait_On_Leader_Follower::sending_request().
00319 { 00320 if (TAO_debug_level > 4) 00321 { 00322 ACE_DEBUG ((LM_DEBUG, 00323 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"), 00324 this->id ())); 00325 } 00326 00327 ACE_Reactor * const r = this->orb_core_->reactor (); 00328 00329 // @@note: This should be okay since the register handler call will 00330 // not make a nested call into the transport. 00331 ACE_GUARD_RETURN (ACE_Lock, 00332 ace_mon, 00333 *this->handler_lock_, 00334 false); 00335 00336 if (r == this->event_handler_i ()->reactor ()) 00337 { 00338 return 0; 00339 } 00340 00341 // Set the flag in the Connection Handler and in the Wait Strategy 00342 // @@Maybe we should set these flags after registering with the 00343 // reactor. What if the registration fails??? 00344 this->ws_->is_registered (true); 00345 00346 // Register the handler with the reactor 00347 return r->register_handler (this->event_handler_i (), 00348 ACE_Event_Handler::READ_MASK); 00349 } |
|
Initialising the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects. Definition at line 2490 of file Transport.cpp. References event_handler_i(), and ACE_Event_Handler::remove_reference(). Referenced by TAO_Connection_Handler::cancel_pending_connection(), TAO_Connection_Handler::close_handler(), TAO::Cache_IntId::operator=(), TAO::Transport_Cache_Manager::purge(), TAO_Asynch_Reply_Dispatcher_Base::transport(), TAO::Cache_IntId::~Cache_IntId(), TAO_Asynch_Reply_Dispatcher_Base::~TAO_Asynch_Reply_Dispatcher_Base(), TAO_Thread_Per_Connection_Handler::~TAO_Thread_Per_Connection_Handler(), and TransportCleanupGuard::~TransportCleanupGuard().
02491 { 02492 return this->event_handler_i ()->remove_reference (); 02493 } |
|
Print out error messages if the event handler is not valid.
Definition at line 1197 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, ACE_TEXT_CHAR_TO_TCHAR, LM_DEBUG, and TAO_debug_level.
01198 { 01199 if (TAO_debug_level > 0) 01200 { 01201 ACE_DEBUG ((LM_DEBUG, 01202 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler") 01203 ACE_TEXT ("(%s) no longer associated with handler [tag=%d]\n"), 01204 this->id (), ACE_TEXT_CHAR_TO_TCHAR (caller), this->tag_)); 01205 } 01206 } |
|
The flush timer expired or was explicitly cancelled, mark it as not pending Definition at line 114 of file Transport.inl. References current_deadline_, and flush_timer_id_. Referenced by drain_queue_i(), and handle_timeout().
00115 { 00116 this->flush_timer_id_ = -1; 00117 this->current_deadline_ = ACE_Time_Value::zero; 00118 } |
|
Schedule handle_output() callbacks.
Definition at line 754 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, event_handler_i(), ACE_Reactor::find_handler(), ACE_Event_Handler::get_handle(), LM_DEBUG, ACE_Event_Handler::reactor(), ACE_Event_Handler::remove_reference(), ACE_Reactor::schedule_wakeup(), and TAO_debug_level. Referenced by TAO_Reactive_Flushing_Strategy::schedule_output(), and TAO_Leader_Follower_Flushing_Strategy::schedule_output().
00755 { 00756 ACE_Event_Handler * const eh = this->event_handler_i (); 00757 ACE_Reactor * const reactor = eh->reactor (); 00758 00759 if (reactor == 0) 00760 return -1; 00761 00762 // Check to see if our event handler is still registered with the 00763 // reactor. It's possible for another thread to have run close_connection() 00764 // since we last used the event handler. 00765 ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ()); 00766 if (found != eh) 00767 { 00768 if(TAO_debug_level > 3) 00769 { 00770 ACE_DEBUG ((LM_DEBUG, 00771 "TAO (%P|%t) - Transport[%d]::schedule_output_i " 00772 "event handler not found in reactor, returning -1\n", 00773 this->id ())); 00774 } 00775 if (found) 00776 { 00777 found->remove_reference (); 00778 } 00779 return -1; 00780 } 00781 found->remove_reference (); 00782 00783 if (TAO_debug_level > 3) 00784 { 00785 ACE_DEBUG ((LM_DEBUG, 00786 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"), 00787 this->id ())); 00788 } 00789 00790 return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK); 00791 } |
|
Write the complete Message_Block chain to the connection. This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently. Often the implementation simply forwards the arguments to the underlying ACE_Svc_Handler class. Using the code factored out into ACE. Be careful with protocols that perform non-trivial transformations of the data, such as SSLIOP or protocols that compress the stream.
ENOENT .
Implemented in TAO_IIOP_Transport. Referenced by drain_queue_helper(). |
|
Send an asynchronous message, i.e. do not block until the message is on the wire Definition at line 1269 of file Transport.cpp. References ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT, check_buffering_constraints_i(), ACE_Message_Block::cont(), ETIME, EWOULDBLOCK, TAO_Flushing_Strategy::flush_transport(), TAO_ORB_Core::flushing_strategy(), ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO::Transport_Queueing_Strategy::must_queue(), orb_core(), queue_message_i(), TAO_Flushing_Strategy::schedule_output(), send_message_block_chain_i(), ssize_t, TAO_debug_level, ACE_Message_Block::total_length(), and TAO_Stub::transport_queueing_strategy(). Referenced by send_message_shared_i().
01272 { 01273 // Let's figure out if the message should be queued without trying 01274 // to send first: 01275 bool try_sending_first = true; 01276 01277 bool const queue_empty = (this->head_ == 0); 01278 01279 TAO::Transport_Queueing_Strategy *queue_strategy = 01280 stub->transport_queueing_strategy (); 01281 01282 if (!queue_empty) 01283 { 01284 try_sending_first = false; 01285 } 01286 else if (queue_strategy) 01287 { 01288 if (queue_strategy->must_queue (queue_empty)) 01289 { 01290 try_sending_first = false; 01291 } 01292 } 01293 01294 if (try_sending_first) 01295 { 01296 ssize_t n = 0; 01297 size_t byte_count = 0; 01298 // ... in this case we must try to send the message first ... 01299 01300 size_t const total_length = message_block->total_length (); 01301 01302 if (TAO_debug_level > 6) 01303 { 01304 ACE_DEBUG ((LM_DEBUG, 01305 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") 01306 ACE_TEXT ("trying to send the message (ml = %d)\n"), 01307 this->id (), total_length)); 01308 } 01309 01310 // @@ I don't think we want to hold the mutex here, however if 01311 // we release it we need to recheck the status of the transport 01312 // after we return... once I understand the final form for this 01313 // code I will re-visit this decision 01314 n = this->send_message_block_chain_i (message_block, 01315 byte_count, 01316 max_wait_time); 01317 if (n == -1) 01318 { 01319 // ... if this is just an EWOULDBLOCK we must schedule the 01320 // message for later, if it is ETIME we still have to send 01321 // the complete message, because cutting off the message at 01322 // this point will destroy the synchronization with the 01323 // server ... 01324 if (errno != EWOULDBLOCK && errno != ETIME) 01325 { 01326 if (TAO_debug_level > 0) 01327 { 01328 ACE_ERROR ((LM_ERROR, 01329 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") 01330 ACE_TEXT ("fatal error in ") 01331 ACE_TEXT ("send_message_block_chain_i - %m\n"), 01332 this->id ())); 01333 } 01334 return -1; 01335 } 01336 } 01337 01338 // ... let's figure out if the complete message was sent ... 01339 if (total_length == byte_count) 01340 { 01341 // Done, just return. Notice that there are no allocations 01342 // or copies up to this point (though some fancy calling 01343 // back and forth). 01344 // This is the common case for the critical path, it should 01345 // be fast. 01346 return 0; 01347 } 01348 01349 // If it was partially sent, then we can't allow a timeout 01350 if (byte_count > 0) 01351 max_wait_time = 0; 01352 01353 if (TAO_debug_level > 6) 01354 { 01355 ACE_DEBUG ((LM_DEBUG, 01356 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") 01357 ACE_TEXT ("partial send %d / %d bytes\n"), 01358 this->id (), byte_count, total_length)); 01359 } 01360 01361 // ... part of the data was sent, need to figure out what piece 01362 // of the message block chain must be queued ... 01363 while (message_block != 0 && message_block->length () == 0) 01364 { 01365 message_block = message_block->cont (); 01366 } 01367 01368 // ... at least some portion of the message block chain should 01369 // remain ... 01370 } 01371 01372 // ... either the message must be queued or we need to queue it 01373 // because it was not completely sent out ... 01374 01375 if (TAO_debug_level > 6) 01376 { 01377 ACE_DEBUG ((LM_DEBUG, 01378 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") 01379 ACE_TEXT ("message is queued\n"), 01380 this->id ())); 01381 } 01382 01383 if (this->queue_message_i (message_block, max_wait_time) == -1) 01384 { 01385 if (TAO_debug_level > 0) 01386 { 01387 ACE_DEBUG ((LM_DEBUG, 01388 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") 01389 ACE_TEXT ("send_asynchronous_message_i, ") 01390 ACE_TEXT ("cannot queue message for - %m\n"), 01391 this->id ())); 01392 } 01393 return -1; 01394 } 01395 01396 // ... if the queue is full we need to activate the output on the 01397 // queue ... 01398 bool must_flush = false; 01399 const bool constraints_reached = 01400 this->check_buffering_constraints_i (stub, 01401 must_flush); 01402 01403 // ... but we also want to activate it if the message was partially 01404 // sent.... Plus, when we use the blocking flushing strategy the 01405 // queue is flushed as a side-effect of 'schedule_output()' 01406 01407 TAO_Flushing_Strategy *flushing_strategy = 01408 this->orb_core ()->flushing_strategy (); 01409 01410 if (constraints_reached || try_sending_first) 01411 { 01412 int const result = flushing_strategy->schedule_output (this); 01413 if (result == TAO_Flushing_Strategy::MUST_FLUSH) 01414 { 01415 must_flush = true; 01416 } 01417 } 01418 01419 if (must_flush) 01420 { 01421 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; 01422 TAO_REVERSE_LOCK reverse (*this->handler_lock_); 01423 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); 01424 01425 (void) flushing_strategy->flush_transport (this); 01426 } 01427 01428 return 0; 01429 } |
|
Notify all the components inside a Transport when the underlying connection is closed. Definition at line 1209 of file Transport.cpp. References ACE_GUARD, TAO_Transport_Mux_Strategy::connection_closed(), send_connection_closed_notifications_i(), and tms(). Referenced by TAO_Connection_Handler::close_connection_eh().
01210 { 01211 { 01212 ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_)); 01213 01214 this->send_connection_closed_notifications_i (); 01215 } 01216 01217 this->tms ()->connection_closed (); 01218 } |
|
Assume the lock is held.
Definition at line 1221 of file Transport.cpp. References cleanup_queue_i(), messaging_object(), and TAO_Pluggable_Messaging::reset(). Referenced by send_connection_closed_notifications().
01222 { 01223 this->cleanup_queue_i (); 01224 01225 this->messaging_object ()->reset (); 01226 } |
|
Once the ORB is prepared to receive a reply (see send_request() above), and all the arguments have been marshaled the CDR stream must be 'formatted', i.e. the message_size field in the GIOP header can finally be set to the proper value. Implemented in TAO_IIOP_Transport. Referenced by TAO_On_Demand_Fragmentation_Strategy::fragment(), TAO_GIOP_Message_Base::make_send_locate_reply(), TAO_GIOP_Message_Base::process_request(), and TAO_GIOP_Message_Base::send_reply_exception(). |
|
Send a message block chain,.
Definition at line 509 of file Transport.cpp. References ACE_GUARD_RETURN, and send_message_block_chain_i(). Referenced by TAO_GIOP_Message_Base::send_close_connection(), and TAO_GIOP_Message_Base::send_error().
00512 { 00513 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); 00514 00515 return this->send_message_block_chain_i (mb, 00516 bytes_transferred, 00517 max_wait_time); 00518 } |
|
Send a message block chain, assuming the lock is held.
Definition at line 521 of file Transport.cpp. References drain_queue_i(), TAO_Synch_Queued_Message::message_length(), TAO_Queued_Message::push_back(), TAO_Queued_Message::remove_from_list(), and ACE_Message_Block::total_length(). Referenced by send_asynchronous_message_i(), and send_message_block_chain().
00524 { 00525 size_t const total_length = mb->total_length (); 00526 00527 // We are going to block, so there is no need to clone 00528 // the message block. 00529 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); 00530 00531 synch_message.push_back (this->head_, this->tail_); 00532 00533 int const n = this->drain_queue_i (); 00534 00535 if (n == -1) 00536 { 00537 synch_message.remove_from_list (this->head_, this->tail_); 00538 return -1; // Error while sending... 00539 } 00540 else if (n == 1) 00541 { 00542 bytes_transferred = total_length; 00543 return 1; // Empty queue, message was sent.. 00544 } 00545 00546 // Remove the temporary message from the queue... 00547 synch_message.remove_from_list (this->head_, this->tail_); 00548 00549 bytes_transferred = total_length - synch_message.message_length (); 00550 00551 return 0; 00552 } |
|
Sent the contents of message_block.
Reimplemented in TAO_IIOP_Transport. Definition at line 280 of file Transport.cpp. References ACE_GUARD_RETURN, close_connection(), and send_message_shared_i().
00284 { 00285 int result = 0; 00286 00287 { 00288 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); 00289 00290 result = 00291 this->send_message_shared_i (stub, message_semantics, 00292 message_block, max_wait_time); 00293 } 00294 00295 if (result == -1) 00296 { 00297 this->close_connection (); 00298 } 00299 00300 return result; 00301 } |
|
Implement send_message_shared() assuming the handler_lock_ is held. Definition at line 1229 of file Transport.cpp. References ACE_Message_Block::length(), TAO::Transport::Stats::messages_sent(), send_asynchronous_message_i(), send_reply_message_i(), send_synchronous_message_i(), stats_, TAO_ONEWAY_REQUEST, TAO_REPLY, and TAO_TWOWAY_REQUEST. Referenced by send_message_shared(), and TAO_IIOP_Transport::send_message_shared().
01233 { 01234 int ret = 0; 01235 01236 #if TAO_HAS_TRANSPORT_CURRENT == 1 01237 size_t const message_length = message_block->length (); 01238 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 01239 01240 switch (message_semantics) 01241 { 01242 case TAO_Transport::TAO_TWOWAY_REQUEST: 01243 ret = this->send_synchronous_message_i (message_block, 01244 max_wait_time); 01245 break; 01246 01247 case TAO_Transport::TAO_REPLY: 01248 ret = this->send_reply_message_i (message_block, 01249 max_wait_time); 01250 break; 01251 01252 case TAO_Transport::TAO_ONEWAY_REQUEST: 01253 ret = this->send_asynchronous_message_i (stub, 01254 message_block, 01255 max_wait_time); 01256 break; 01257 } 01258 01259 #if TAO_HAS_TRANSPORT_CURRENT == 1 01260 // "Count" the message, only if no error was encountered. 01261 if (ret != -1 && this->stats_ != 0) 01262 this->stats_->messages_sent (message_length); 01263 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 01264 01265 return ret; 01266 } |
|
Send a reply message, i.e. do not block until the message is on the wire, but just return after adding them to the queue. Definition at line 657 of file Transport.cpp. References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT, TAO_Synch_Queued_Message::clone(), TAO_Queued_Message::destroy(), TAO_Flushing_Strategy::flush_message(), TAO_ORB_Core::flushing_strategy(), LM_DEBUG, orb_core(), TAO_Queued_Message::push_back(), TAO_Queued_Message::remove_from_list(), TAO_Flushing_Strategy::schedule_output(), send_synch_message_helper_i(), and TAO_debug_level. Referenced by send_message_shared_i().
00659 { 00660 // Dont clone now.. We could be sent in one shot! 00661 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); 00662 00663 synch_message.push_back (this->head_, 00664 this->tail_); 00665 00666 int const n = 00667 this->send_synch_message_helper_i (synch_message, 00668 max_wait_time); 00669 00670 if (n == -1 || n == 1) 00671 { 00672 return n; 00673 } 00674 00675 if (TAO_debug_level > 3) 00676 { 00677 ACE_DEBUG ((LM_DEBUG, 00678 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ") 00679 ACE_TEXT ("preparing to add to queue before leaving\n"), 00680 this->id ())); 00681 } 00682 00683 // Till this point we shouldn't have any copying and that is the 00684 // point anyway. Now, remove the node from the list 00685 synch_message.remove_from_list (this->head_, this->tail_); 00686 00687 // Clone the node that we have. 00688 TAO_Queued_Message *msg = 00689 synch_message.clone (this->orb_core_->transport_message_buffer_allocator ()); 00690 00691 // Stick it in the queue 00692 msg->push_back (this->head_, this->tail_); 00693 00694 TAO_Flushing_Strategy *flushing_strategy = 00695 this->orb_core ()->flushing_strategy (); 00696 00697 int const result = flushing_strategy->schedule_output (this); 00698 00699 if (result == -1) 00700 { 00701 if (TAO_debug_level > 5) 00702 { 00703 ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_" 00704 "message_i, dequeuing msg due to schedule_output " 00705 "failure\n", this->id ())); 00706 } 00707 msg->remove_from_list (this->head_, this->tail_); 00708 msg->destroy (); 00709 } 00710 else if (result == TAO_Flushing_Strategy::MUST_FLUSH) 00711 { 00712 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; 00713 TAO_REVERSE_LOCK reverse (*this->handler_lock_); 00714 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); 00715 (void) flushing_strategy->flush_message(this, msg, 0); 00716 } 00717 00718 return 1; 00719 } |
|
Preparing the ORB to receive the reply only once the request is completely sent opens the system to some subtle race conditions: suppose the ORB is running in a multi-threaded configuration, thread A makes a request while thread B is using the Reactor to process all incoming requests. Thread A could be implemented as follows: 1) send the request 2) setup the ORB to receive the reply 3) wait for the request but in this case thread B may receive the reply between step (1) and (2), and drop it as an invalid or unexpected message. Consequently the correct implementation is: 1) setup the ORB to receive the reply 2) send the request 3) wait for the reply The following method encapsulates this idiom.
Implemented in TAO_IIOP_Transport. Referenced by TAO::Remote_Invocation::send_message(). |
|
A helper method used by send_synchronous_message_i() and send_reply_message_i(). Reusable code that could be used by both the methods. Definition at line 722 of file Transport.cpp. References TAO_Synch_Queued_Message::all_data_sent(), drain_queue_i(), and TAO_Queued_Message::remove_from_list(). Referenced by send_reply_message_i(), and send_synchronous_message_i().
00724 { 00725 // @todo: Need to send timeouts for writing.. 00726 int const n = this->drain_queue_i (); 00727 00728 if (n == -1) 00729 { 00730 synch_message.remove_from_list (this->head_, this->tail_); 00731 return -1; // Error while sending... 00732 } 00733 else if (n == 1) 00734 { 00735 return 1; // Empty queue, message was sent.. 00736 } 00737 00738 if (synch_message.all_data_sent ()) 00739 { 00740 return 1; 00741 } 00742 00743 return 0; 00744 } |
|
Send a synchronous message, i.e. block until the message is on the wire Definition at line 555 of file Transport.cpp. References ACE_ERROR, ACE_GUARD_RETURN, ACE_NEW_RETURN, ACE_TEXT, TAO_Synch_Queued_Message::current_block(), ETIME, TAO_Flushing_Strategy::flush_message(), TAO_ORB_Core::flushing_strategy(), LM_ERROR, TAO_Synch_Queued_Message::message_length(), orb_core(), TAO_Queued_Message::push_back(), TAO_Queued_Message::push_front(), TAO_Queued_Message::remove_from_list(), TAO_Flushing_Strategy::schedule_output(), send_synch_message_helper_i(), and TAO_debug_level. Referenced by send_message_shared_i().
00557 { 00558 // We are going to block, so there is no need to clone 00559 // the message block. 00560 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); 00561 size_t const message_length = synch_message.message_length (); 00562 00563 synch_message.push_back (this->head_, this->tail_); 00564 00565 int const n = this->send_synch_message_helper_i (synch_message, 00566 0 /*ignored*/); 00567 if (n == -1 || n == 1) 00568 { 00569 return n; 00570 } 00571 00572 // @todo: Check for timeouts! 00573 // if (max_wait_time != 0 && errno == ETIME) return -1; 00574 TAO_Flushing_Strategy *flushing_strategy = 00575 this->orb_core ()->flushing_strategy (); 00576 int result = flushing_strategy->schedule_output (this); 00577 if (result == -1) 00578 { 00579 synch_message.remove_from_list (this->head_, this->tail_); 00580 if (TAO_debug_level > 0) 00581 { 00582 ACE_ERROR ((LM_ERROR, 00583 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") 00584 ACE_TEXT ("send_synchronous_message_i, ") 00585 ACE_TEXT ("error while scheduling flush - %m\n"), 00586 this->id ())); 00587 } 00588 return -1; 00589 } 00590 00591 // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH, 00592 // because we're always going to flush anyway. 00593 00594 // Release the mutex, other threads may modify the queue as we 00595 // block for a long time writing out data. 00596 { 00597 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; 00598 TAO_REVERSE_LOCK reverse (*this->handler_lock_); 00599 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); 00600 00601 result = flushing_strategy->flush_message (this, 00602 &synch_message, 00603 max_wait_time); 00604 } 00605 00606 if (result == -1) 00607 { 00608 synch_message.remove_from_list (this->head_, this->tail_); 00609 00610 if (errno == ETIME) 00611 { 00612 // If partially sent, then we must queue the remainder. 00613 if (message_length != synch_message.message_length ()) 00614 { 00615 // This is a timeout, there is only one nasty case: the 00616 // message has been partially sent! We simply cannot take 00617 // the message out of the queue, because that would corrupt 00618 // the connection. 00619 // 00620 // What we do is replace the queued message with an 00621 // asynchronous message, that contains only what remains of 00622 // the timed out request. If you think about sending 00623 // CancelRequests in this case: there is no much point in 00624 // doing that: the receiving ORB would probably ignore it, 00625 // and figuring out the request ID would be a bit of a 00626 // nightmare. 00627 // 00628 TAO_Queued_Message *queued_message = 0; 00629 ACE_NEW_RETURN (queued_message, 00630 TAO_Asynch_Queued_Message ( 00631 synch_message.current_block (), 00632 this->orb_core_, 00633 0, // no timeout 00634 0, 00635 true), 00636 -1); 00637 queued_message->push_front (this->head_, this->tail_); 00638 } 00639 } 00640 00641 if (TAO_debug_level > 0) 00642 { 00643 ACE_ERROR ((LM_ERROR, 00644 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ") 00645 ACE_TEXT ("error while flushing message - %m\n"), 00646 this->id ())); 00647 } 00648 00649 return -1; 00650 } 00651 00652 return 1; 00653 } |
|
Accessor to sent_byte_count_.
Definition at line 187 of file Transport.inl. References sent_byte_count_.
00188 { 00189 return this->sent_byte_count_; 00190 } |
|
Transport statistics.
|
|
Return the protocol tag. The OMG assigns unique tags (a 32-bit unsigned number) to each protocol. New protocol tags can be obtained free of charge from the OMG, check the documents in corbafwd.h for more details. Definition at line 8 of file Transport.inl.
00009 { 00010 return this->tag_; 00011 } |
|
Extracts the list of listen points from the cdr stream. The list would have the protocol specific details of the ListenPoints Reimplemented in TAO_IIOP_Transport. Definition at line 274 of file Transport.cpp. References ACE_NOTSUP_RETURN. Referenced by TAO_GIOP_Message_Generator_Parser_12::process_bidir_context().
00275 { 00276 ACE_NOTSUP_RETURN (-1); 00277 } |
|
Get the TAO_Tranport_Mux_Strategy used by this object. The role of the TAO_Transport_Mux_Strategy is described in more detail in that class' documentation. Enough is to say that the class is used to control how many threads can have pending requests over the same connection. Multiplexing multiple threads over the same connection conserves resources and is almost required for AMI, but having only one pending request per connection is more efficient and reduces the possibilities of priority inversions. Definition at line 20 of file Transport.inl. Referenced by idle_after_reply(), idle_after_send(), TAO::LocateRequest_Invocation_Adapter::invoke(), TAO::LocateRequest_Invocation::invoke(), TAO::Invocation_Adapter::invoke_remote_i(), TAO_GIOP_Message_Base::process_reply_message(), TAO::Synch_Twoway_Invocation::remote_twoway(), and send_connection_closed_notifications().
00021 { 00022 return tms_; 00023 } |
|
Helper method that returns the Transport Cache Manager.
Definition at line 2448 of file Transport.cpp. References TAO_ORB_Core::lane_resources(), and TAO_Thread_Lane_Resources::transport_cache(). Referenced by make_idle(), purge_entry(), recache_transport(), and update_transport().
02449 { 02450 return this->orb_core_->lane_resources ().transport_cache (); 02451 } |
|
Cache management.
Definition at line 460 of file Transport.cpp. References transport_cache_manager(), and TAO::Transport_Cache_Manager::update_entry(). Referenced by TAO_Connection_Handler::handle_input_internal(), TAO_Connection_Handler::handle_output_eh(), and TAO_Connection_Handler::svc_i().
00461 { 00462 return this->transport_cache_manager ().update_entry (this->cache_map_entry_); 00463 } |
|
Return the TAO_Wait_Strategy used by this object. The role of the TAO_Wait_Strategy is described in more detail in that class' documentation. Enough is to say that the ORB can wait for a reply blocking on read(), using the Reactor to wait for multiple events concurrently or using the Leader/Followers protocol. Definition at line 27 of file Transport.inl. References ws_. Referenced by TAO_Connection_Handler::close_connection_eh(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Transport_Cache_Manager::find_transport(), TAO_Connection_Handler::handle_input_eh(), TAO_IIOP_Connection_Handler::open(), post_open(), and TAO::Synch_Twoway_Invocation::wait_for_reply().
00028 { 00029 return this->ws_; 00030 } |
|
CodeSet negotiation - Set the wchar codeset translator factory.
Definition at line 144 of file Transport.inl. References tcs_set_.
00145 { 00146 this->wchar_translator_ = tf; 00147 this->tcs_set_ = 1; 00148 } |
|
CodeSet Negotiation - Get the wchar codeset translator factory.
Definition at line 131 of file Transport.inl.
00132 { 00133 return this->wchar_translator_; 00134 } |
|
Definition at line 808 of file Transport.h. |
|
These classes need privileged access to: Definition at line 807 of file Transport.h. |
|
Needs priveleged access to event_handler_i () Definition at line 812 of file Transport.h. |
|
Have we sent any info on bidirectional information or have we received any info regarding making the connection served by this transport bidirectional. The flag is used as follows: + We dont want to send the bidirectional context info more than once on the connection. Why? Waste of marshalling and demarshalling time on the client. + On the server side -- once a client that has established the connection asks the server to use the connection both ways, we *dont* want the server to pack service info to the client. That is not allowed. We need a flag to prevent such a things from happening. The value of this flag will be 0 if the client sends info and 1 if the server receives the info. Definition at line 963 of file Transport.h. Referenced by bidirectional_flag(). |
|
Our entry in the cache. We don't own this. It is here for our convenience. We cannot just change things around. Definition at line 935 of file Transport.h. Referenced by cache_map_entry(). |
|
Additional member values required to support codeset translation. @Phil, I think it would be nice if we could think of a way to do the following. We have been trying to use the transport for marking about translator factories and such! IMHO this is a wrong encapulation ie. trying to populate the transport object with these details. We should probably have a class something like TAO_Message_Property or TAO_Message_Translator or whatever (I am sure you get the idea) and encapsulate all these details. Coupling these seems odd. if I have to be more cynical we can move this to the connection_handler and it may more sense with the DSCP stuff around there. Do you agree? Definition at line 1038 of file Transport.h. |
|
The queue will start draining no later than if* the deadline is Definition at line 980 of file Transport.h. Referenced by check_buffering_constraints_i(), handle_timeout(), and reset_flush_timer(). |
|
First_request_ is true until the first request is sent or received. This is necessary since codeset context information is necessary only on the first request. After that, the translators are fixed for the life of the connection. Definition at line 1050 of file Transport.h. Referenced by first_request_sent(), and generate_request_header(). |
|
The timer ID.
Definition at line 983 of file Transport.h. Referenced by check_buffering_constraints_i(), flush_timer_pending(), and reset_flush_timer(). |
|
This is an Definition at line 997 of file Transport.h. Referenced by ~TAO_Transport(). |
|
Implement the outgoing data queue.
Definition at line 968 of file Transport.h. |
|
A unique identifier for the transport. This never *never* changes over the lifespan, so we don't have to worry about locking it. HINT: Protocol-specific transports that use connection handler might choose to set this to the handle for their connection. Definition at line 1007 of file Transport.h. |
|
Queue of the consolidated, incoming messages..
Definition at line 972 of file Transport.h. Referenced by consolidate_enqueue_message(), handle_input_parse_data(), and process_queue_head(). |
|
Stack of incoming fragments, consolidated messages are going to be enqueued in "incoming_message_queue_" Definition at line 976 of file Transport.h. Referenced by handle_input(), handle_input_missing_data(), handle_input_parse_data(), and handle_input_parse_extra_messages(). |
|
Is this transport really connected or not. In case of oneways with SYNC_NONE Policy we don't wait until the connection is ready and we buffer the requests in this transport until the connection is ready Definition at line 1021 of file Transport.h. |
|
Definition at line 965 of file Transport.h. Referenced by opened_as(), and provide_blockable_handler(). |
|
Global orbcore resource.
Definition at line 931 of file Transport.h. Referenced by TAO_GIOP_Message_Base::process_locate_request(), and TAO_GIOP_Message_Base::process_request(). |
|
Holds the partial GIOP message (if there is one).
Definition at line 1053 of file Transport.h. Referenced by allocate_partial_message_block(), and handle_input_parse_data(). |
|
Used by the LRU, LFU and FIFO Connection Purging Strategies.
Definition at line 1010 of file Transport.h. Referenced by purging_order(). |
|
Size of the buffer received.
Definition at line 1013 of file Transport.h. |
|
Number of bytes sent.
Definition at line 1016 of file Transport.h. Referenced by drain_queue_helper(), drain_queue_i(), and sent_byte_count(). |
|
Statistics.
Definition at line 1066 of file Transport.h. Referenced by process_parsed_messages(), send_message_shared_i(), and ~TAO_Transport(). |
|
IOP protocol tag.
Definition at line 928 of file Transport.h. |
|
Definition at line 969 of file Transport.h. |
|
The tcs_set_ flag indicates that negotiation has occured and so the translators are correct, since a null translator is valid if both ends are using the same codeset, whatever that codeset might be. Definition at line 1044 of file Transport.h. Referenced by char_translator(), is_tcs_set(), and wchar_translator(). |
|
Strategy to decide whether multiple requests can be sent over the same connection or the connection is exclusive for a request. Definition at line 939 of file Transport.h. |
|
The adapter used to receive timeout callbacks from the Reactor.
Definition at line 986 of file Transport.h. |
|
Definition at line 1039 of file Transport.h. |
|
Strategy for waiting for the reply after sending the request.
Definition at line 942 of file Transport.h. Referenced by notify_reactor(), provide_blockable_handler(), register_handler(), TAO_Transport(), wait_strategy(), and ~TAO_Transport(). |