#include <Transport.h>
Inheritance diagram for TAO_Transport:


Template methods | |
| The Transport class uses the Template Method Pattern to implement the protocol specific functionality. Implementors of a pluggable protocol should override the following methods with the semantics documented below. | |
| enum | { TAO_ONEWAY_REQUEST = 0, TAO_TWOWAY_REQUEST = 1, TAO_REPLY } |
| virtual ACE_Event_Handler * | event_handler_i (void)=0 |
| bool | is_connected (void) const |
| Is this transport really connected. | |
| bool | post_open (size_t id) |
| Perform all the actions when this transport get opened. | |
| TAO_Connection_Handler * | connection_handler (void) |
| Get the connection handler for this transport. | |
| TAO_OutputCDR & | out_stream (void) |
| Accessor for the output CDR stream. | |
| int | generate_locate_request (TAO_Target_Specification &spec, TAO_Operation_Details &opdetails, TAO_OutputCDR &output) |
| virtual int | generate_request_header (TAO_Operation_Details &opd, TAO_Target_Specification &spec, TAO_OutputCDR &msg) |
| int | recache_transport (TAO_Transport_Descriptor_Interface *desc) |
| Recache ourselves in the cache. | |
| virtual int | handle_input (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time=0, int block=0) |
| Callback to read incoming data. | |
| virtual int | send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, int message_semantics, ACE_Time_Value *max_time_wait)=0 |
| virtual int | send_message (TAO_OutputCDR &stream, TAO_Stub *stub=0, int message_semantics=TAO_Transport::TAO_TWOWAY_REQUEST, ACE_Time_Value *max_time_wait=0)=0 |
| virtual int | send_message_shared (TAO_Stub *stub, int message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
| Sent the contents of message_block. | |
| int | format_queue_message (TAO_OutputCDR &stream) |
| Format and queue a message for stream. | |
| int | send_message_block_chain (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time=0) |
| Send a message block chain,. | |
| int | send_message_block_chain_i (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time) |
| Send a message block chain, assuming the lock is held. | |
| int | purge_entry (void) |
| Cache management. | |
| int | make_idle (void) |
| Cache management. | |
| int | update_transport (void) |
| Cache management. | |
| int | handle_timeout (const ACE_Time_Value ¤t_time, const void *act) |
| size_t | recv_buffer_size (void) const |
| Accessor to recv_buffer_size_. | |
| size_t | sent_byte_count (void) const |
| Accessor to sent_byte_count_. | |
| TAO_Codeset_Translator_Base * | char_translator (void) const |
| CodeSet Negotiation - Get the char codeset translator factory. | |
| TAO_Codeset_Translator_Base * | wchar_translator (void) const |
| CodeSet Negotiation - Get the wchar codeset translator factory. | |
| void | char_translator (TAO_Codeset_Translator_Base *) |
| CodeSet negotiation - Set the char codeset translator factory. | |
| void | wchar_translator (TAO_Codeset_Translator_Base *) |
| CodeSet negotiation - Set the wchar codeset translator factory. | |
| void | assign_translators (TAO_InputCDR *, TAO_OutputCDR *) |
| void | clear_translators (TAO_InputCDR *, TAO_OutputCDR *) |
| CORBA::Boolean | is_tcs_set () const |
| Return true if the tcs has been set. | |
| void | first_request_sent () |
| Set the state of the first_request_ flag to 0. | |
| void | send_connection_closed_notifications (void) |
| virtual TAO_Connection_Handler * | connection_handler_i (void)=0 |
| int | process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) |
| int | send_message_shared_i (TAO_Stub *stub, int message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
| int | queue_message_i (const ACE_Message_Block *message_block) |
| Queue a message for message_block. | |
| CORBA::ULong const | tag_ |
| IOP protocol tag. | |
| TAO_ORB_Core *const | orb_core_ |
| Global orbcore resource. | |
| TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * | cache_map_entry_ |
| TAO_Transport_Mux_Strategy * | tms_ |
| TAO_Wait_Strategy * | ws_ |
| Strategy for waiting for the reply after sending the request. | |
| int | bidirectional_flag_ |
| TAO::Connection_Role | opening_connection_role_ |
| TAO_Queued_Message * | head_ |
| Implement the outgoing data queue. | |
| TAO_Queued_Message * | tail_ |
| TAO_Incoming_Message_Queue | incoming_message_queue_ |
| Queue of the consolidated, incoming messages.. | |
| TAO::Incoming_Message_Stack | incoming_message_stack_ |
| ACE_Time_Value | current_deadline_ |
| long | flush_timer_id_ |
| The timer ID. | |
| TAO_Transport_Timer | transport_timer_ |
| The adapter used to receive timeout callbacks from the Reactor. | |
| ACE_Lock * | handler_lock_ |
| size_t | id_ |
| A unique identifier for the transport. | |
| unsigned long | purging_order_ |
| Used by the LRU, LFU and FIFO Connection Purging Strategies. | |
| size_t | recv_buffer_size_ |
| Size of the buffer received. | |
| size_t | sent_byte_count_ |
| Number of bytes sent. | |
| bool | is_connected_ |
| TAO::Transport_Cache_Manager & | transport_cache_manager (void) |
| Helper method that returns the Transport Cache Manager. | |
| int | drain_queue (void) |
| Send some of the data in the queue. | |
| int | drain_queue_i (void) |
| Implement drain_queue() assuming the lock is held. | |
| int | queue_is_empty_i (void) |
| Check if there are messages pending in the queue. | |
| int | drain_queue_helper (int &iovcnt, iovec iov[]) |
| A helper routine used in drain_queue_i(). | |
| int | schedule_output_i (void) |
| Schedule handle_output() callbacks. | |
| int | cancel_output_i (void) |
| Cancel handle_output() callbacks. | |
| void | cleanup_queue (size_t byte_count) |
| Cleanup the queue. | |
| void | cleanup_queue_i () |
| Cleanup the complete queue. | |
| int | check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush) |
| Check if the buffering constraints have been reached. | |
| int | send_synchronous_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
| int | send_reply_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
| int | send_asynchronous_message_i (TAO_Stub *stub, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
| int | send_synch_message_helper_i (TAO_Synch_Queued_Message &s, ACE_Time_Value *max_wait_time) |
| int | flush_timer_pending (void) const |
| Check if the flush timer is still pending. | |
| void | reset_flush_timer (void) |
| void | report_invalid_event_handler (const char *caller) |
| Print out error messages if the event handler is not valid. | |
| int | handle_input_missing_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time, TAO_Queued_Data *q_data) |
| int | handle_input_parse_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time) |
| int | handle_input_parse_extra_messages (ACE_Message_Block &message_block) |
| int | consolidate_enqueue_message (TAO_Queued_Data *qd) |
| -1 error, otherwise 0 | |
| int | consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) |
| -1 error, otherwise 0 | |
| int | process_queue_head (TAO_Resume_Handle &rh) |
| int | notify_reactor (void) |
| void | send_connection_closed_notifications_i (void) |
| Assume the lock is held. | |
| void | allocate_partial_message_block (void) |
| TAO_Transport (const TAO_Transport &) | |
| void | operator= (const TAO_Transport &) |
| TAO_Codeset_Translator_Base * | char_translator_ |
| Additional member values required to support codeset translation. | |
| TAO_Codeset_Translator_Base * | wchar_translator_ |
| CORBA::Boolean | tcs_set_ |
| CORBA::Boolean | first_request_ |
| ACE_Message_Block * | partial_message_ |
| Holds the partial GIOP message (if there is one). | |
| class | TAO_Block_Flushing_Strategy |
| class | TAO_Reactive_Flushing_Strategy |
| class | TAO_Leader_Follower_Flushing_Strategy |
| class | TAO_Thread_Per_Connection_Handler |
Public Types | |
Public Member Functions | |
| TAO_Transport (CORBA::ULong tag, TAO_ORB_Core *orb_core) | |
| Default creator, requires the tag value be supplied. | |
| virtual | ~TAO_Transport (void) |
| Destructor. | |
| CORBA::ULong | tag (void) const |
| Return the protocol tag. | |
| TAO_ORB_Core * | orb_core (void) const |
| Access the ORB that owns this connection. | |
| TAO_Transport_Mux_Strategy * | tms (void) const |
| Get the TAO_Tranport_Mux_Strategy used by this object. | |
| TAO_Wait_Strategy * | wait_strategy (void) const |
| Return the TAO_Wait_Strategy used by this object. | |
| int | handle_output (void) |
| Callback method to reactively drain the outgoing data queue. | |
| int | bidirectional_flag (void) const |
| Get the bidirectional flag. | |
| void | bidirectional_flag (int flag) |
| Set the bidirectional flag. | |
| void | cache_map_entry (TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *entry) |
| Set the Cache Map entry. | |
| TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * | cache_map_entry (void) |
| Get the Cache Map entry. | |
| size_t | id (void) const |
| Set and Get the identifier for this transport instance. | |
| void | id (size_t id) |
| TAO::Connection_Role | opened_as (void) const |
| void | opened_as (TAO::Connection_Role) |
| unsigned long | purging_order (void) const |
| void | purging_order (unsigned long value) |
| int | queue_is_empty (void) |
| Check if there are messages pending in the queue. | |
| void | provide_handler (TAO::Connection_Handler_Set &handlers) |
| Added event handler to the handlers set. | |
| bool | provide_blockable_handler (TAO::Connection_Handler_Set &handlers) |
| virtual int | register_handler (void) |
| Register the handler with the reactor. | |
| virtual ssize_t | send (iovec *iov, int iovcnt, size_t &bytes_transferred, const ACE_Time_Value *timeout=0)=0 |
| Write the complete Message_Block chain to the connection. | |
| virtual ssize_t | recv (char *buffer, size_t len, const ACE_Time_Value *timeout=0)=0 |
| Read len bytes from into buf. | |
Control connection lifecycle | |
These methods are routed through the TMS object. The TMS strategies implement them correctly. | |
| bool | idle_after_send (void) |
| bool | idle_after_reply (void) |
| virtual void | close_connection (void) |
| Call the implementation method after obtaining the lock. | |
Template methods | |
The Transport class uses the Template Method Pattern to implement the protocol specific functionality. Implementors of a pluggable protocol should override the following methods with the semantics documented below. | |
| virtual int | messaging_init (CORBA::Octet major, CORBA::Octet minor)=0 |
| virtual int | tear_listen_point_list (TAO_InputCDR &cdr) |
| virtual bool | post_connect_hook (void) |
| Hooks that can be overridden in concrete transports. | |
| ACE_Event_Handler::Reference_Count | add_reference (void) |
| Memory management routines. | |
| ACE_Event_Handler::Reference_Count | remove_reference (void) |
| virtual TAO_Pluggable_Messaging * | messaging_object (void)=0 |
The transport object is created in the Service handler constructor and deleted in the Service Handler's destructor!!
The main responsability of a Transport object is to encapsulate a connection, and provide a transport independent way to send and receive data. Since TAO is heavily based on the Reactor for all if not all its I/O the Transport class is usually implemented with a helper Connection Handler that adapts the generic Transport interface to the Reactor types.
One of the responsibilities of the TAO_Transport class is to send out GIOP messages as efficiently as possible. In most cases messages are put out in FIFO order, the transport object will put out the message using a single system call and return control to the application. However, for oneways and AMI requests it may be more efficient (or required if the SYNC_NONE policy is in effect) to queue the messages until a large enough data set is available. Another reason to queue is that some applications cannot block for I/O, yet they want to send messages so large that a single write() operation would not be able to cope with them. In such cases we need to queue the data and use the Reactor to drain the queue.
Therefore, the Transport class may need to use a queue to temporarily hold the messages, and, in some configurations, it may need to use the Reactor to concurrently drain such queues.
TAO provides explicit policies to send 'urgent' messages. Such messages may put at the head of the queue. However, they cannot be sent immediately because the transport may already be sending another message in a reactive fashion.
Consequently, the Transport must also know if the head of the queue has been partially sent. In that case new messages can only follow the head. Only once the head is completely sent we can start sending new messages.
One or more threads can be blocked waiting for the connection to completely send the message. The thread should return as soon as its message has been sent, so a per-thread condition is required. This suggest that simply using a ACE_Message_Queue would not be enough: there is a significant amount of ancillary information, to keep on each message that the Message_Block class does not provide room for.
Blocking I/O is still attractive for some applications. First, my eliminating the Reactor overhead performance is improved when sending large blocks of data. Second, using the Reactor to send out data opens the door for nested upcalls, yet some applications cannot deal with the reentrancy issues in this case.
Some or all messages could have a timeout period attached to them. The timeout source could either be some high-level policy or maybe some strategy to prevent denial of service attacks. In any case the timeouts are per-message, and later messages could have shorter timeouts. In fact, some kind of scheduling (such as EDF) could be required in a few applications.
The outgoing data path consist in several components:
The Transport object provides a single method to send request messages (send_request_message ()).
One of the main responsibilities of the transport is to read and process the incoming GIOP message as quickly and efficiently as possible. There are other forces that needs to be given due consideration. They are
The messages should be checked for validity and the right information should be sent to the higher layer for processing. The process of doing a sanity check and preparing the messages for the higher layers of the ORB are done by the messaging protocol.
To keep things as efficient as possible for medium sized requests, it would be good to minimise data copying and locking along the incoming path ie. from the time of reading the data from the handle to the application. We achieve this by creating a buffer on stack and reading the data from the handle into the buffer. We then pass the same data block (the buffer is encapsulated into a data block) to the higher layers of the ORB. The problems stem from the following (a) Data is bigger than the buffer that we have on stack (b) Transports like TCP do not guarantee availability of the whole chunk of data in one shot. Data could trickle in byte by byte. (c) Single read gives multiple messages
We solve the problems as follows
(a) First do a read with the buffer on stack. Query the underlying messaging object whether the message has any incomplete portion. If so, data will be copied into new buffer being able to hold full message and is queued; succeeding events will read data from socket and write directly into this buffer. Otherwise, if if the message in local buffer is complete, we free the handle and then send the message to the higher layers of the ORB for processing.
(b) If buffer with incomplete message has been enqueued, while trying to do the above, the reactor will call us back when the handle becomes read ready. The read-operation will copy data directly into the enqueued buffer. If the message has bee read completely the message is sent to the higher layers of the ORB for processing.
(c) If we get multiple messages (possible if the client connected to the server sends oneways or AMI requests), we parse and split the messages. Every message is put in the queue. Once the messages are queued, the thread picks up one message to send to the higher layers of the ORB. Before doing that, if it finds more messages, it sends a notify to the reactor without resuming the handle. The next thread picks up a message from the queue and processes that. Once the queue is drained the last thread resumes the handle.
We could use the outgoing path of the ORB to send replies. This would allow us to reuse most of the code in the outgoing data path. We were doing this till TAO-1.2.3. We run in to problems. When writing the reply the ORB gets flow controlled, and the ORB tries to flush the message by going into the reactor. This resulted in unnecessary nesting. The thread that gets into the Reactor could potentially handle other messages (incoming or outgoing) and the stack starts growing leading to crashes.
The solution that we (plan to) adopt is pretty straight forward. The thread sending replies will not block to send the replies but queue the replies and return to the Reactor. (Note the careful usages of the terms "blocking in the Reactor" as opposed to "return back to the Reactor".
See Also:
http://cvs.doc.wustl.edu/ace-latest.cgi/ACE_wrappers/TAO/docs/pluggable_protocols/index.html
Definition at line 237 of file Transport.h.
|
|
Definition at line 592 of file Transport.h.
00593 {
00594 TAO_ONEWAY_REQUEST = 0,
00595 TAO_TWOWAY_REQUEST = 1,
00596 TAO_REPLY
00597 };
|
|
||||||||||||
|
Default creator, requires the tag value be supplied.
Definition at line 118 of file Transport.cpp. References TAO_ORB_Core::client_factory(), TAO_Client_Strategy_Factory::create_transport_mux_strategy(), and TAO_Client_Strategy_Factory::create_wait_strategy().
00120 : tag_ (tag) 00121 , orb_core_ (orb_core) 00122 , cache_map_entry_ (0) 00123 , bidirectional_flag_ (-1) 00124 , opening_connection_role_ (TAO::TAO_UNSPECIFIED_ROLE) 00125 , head_ (0) 00126 , tail_ (0) 00127 , incoming_message_queue_ (orb_core) 00128 , current_deadline_ (ACE_Time_Value::zero) 00129 , flush_timer_id_ (-1) 00130 , transport_timer_ (this) 00131 , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ()) 00132 , id_ ((size_t) this) 00133 , purging_order_ (0) 00134 , recv_buffer_size_ (0) 00135 , sent_byte_count_ (0) 00136 , is_connected_ (false) 00137 , char_translator_ (0) 00138 , wchar_translator_ (0) 00139 , tcs_set_ (0) 00140 , first_request_ (1) 00141 , partial_message_ (0) 00142 #ifdef ACE_HAS_SENDFILE 00143 // The ORB has been configured to use the MMAP allocator, meaning 00144 // we could/should use sendfile() to send data. Cast once rather 00145 // here rather than during each send. This assumes that all 00146 // TAO_OutputCDR instances are using the same TAO_MMAP_Allocator 00147 // instance as the underlying output CDR buffer allocator. 00148 , mmap_allocator_ ( 00149 dynamic_cast<TAO_MMAP_Allocator *> ( 00150 orb_core->output_cdr_buffer_allocator ())) 00151 #endif /* ACE_HAS_SENDFILE */ 00152 { 00153 TAO_Client_Strategy_Factory *cf = 00154 this->orb_core_->client_factory (); 00155 00156 // Create WS now. 00157 this->ws_ = cf->create_wait_strategy (this); 00158 00159 // Create TMS now. 00160 this->tms_ = cf->create_transport_mux_strategy (this); 00161 00162 /* 00163 * Hook to add code that initializes components that 00164 * belong to the concrete protocol implementation. 00165 * Further additions to this Transport class will 00166 * need to add code *before* this hook. 00167 */ 00168 //@@ TAO_TRANSPORT_SPL_CONSTRUCTOR_ADD_HOOK 00169 } |
|
|
Destructor.
Definition at line 171 of file Transport.cpp. References ACE_ASSERT, cleanup_queue_i(), handler_lock_, is_connected_, purge_entry(), and ACE_Message_Block::release().
00172 {
00173 delete this->ws_;
00174
00175 delete this->tms_;
00176
00177 delete this->handler_lock_;
00178
00179 if (!this->is_connected_)
00180 {
00181 // When we have a not connected transport we could have buffered
00182 // messages on this transport which we have to cleanup now.
00183 this->cleanup_queue_i();
00184
00185 // Cleanup our cache entry
00186 this->purge_entry();
00187 }
00188
00189 // Release the partial message block, however we may
00190 // have never allocated one.
00191 ACE_Message_Block::release (this->partial_message_);
00192
00193 // By the time the destructor is reached here all the connection stuff
00194 // *must* have been cleaned up.
00195
00196 // The following assert is needed for the test "Bug_2494_Regression".
00197 // See the bugzilla bug #2494 for details.
00198 ACE_ASSERT (this->head_ == 0);
00199 ACE_ASSERT (this->cache_map_entry_ == 0);
00200
00201 /*
00202 * Hook to add code that cleans up components
00203 * belong to the concrete protocol implementation.
00204 * Further additions to this Transport class will
00205 * need to add code *before* this hook.
00206 */
00207 //@@ TAO_TRANSPORT_SPL_DESTRUCTOR_ADD_HOOK
00208 }
|
|
|
|
|
|
Memory management routines.
Definition at line 2352 of file Transport.cpp. References ACE_Event_Handler::add_reference(), and event_handler_i(). Referenced by TAO::Cache_IntId::Cache_IntId(), TAO::Cache_IntId::operator=(), provide_blockable_handler(), provide_handler(), TAO::Transport_Cache_Manager::purge(), TAO_Thread_Per_Connection_Handler::TAO_Thread_Per_Connection_Handler(), and TAO_Asynch_Reply_Dispatcher_Base::transport().
02353 {
02354 return this->event_handler_i ()->add_reference ();
02355 }
|
|
|
Allocate a partial message block and store it in our partial_message_ data member. Definition at line 2421 of file Transport.cpp. References ACE_NEW, TAO_Pluggable_Messaging::header_length(), messaging_object(), and partial_message_. Referenced by handle_input_parse_data().
02422 {
02423 if (this->partial_message_ == 0)
02424 {
02425 // This value must be at least large enough to hold a GIOP message
02426 // header plus a GIOP fragment header
02427 const size_t partial_message_size =
02428 this->messaging_object ()->header_length ();
02429 // + this->messaging_object ()->fragment_header_length ();
02430 // deprecated, conflicts with not-single_read_opt.
02431
02432 ACE_NEW (this->partial_message_,
02433 ACE_Message_Block (partial_message_size));
02434 }
02435 }
|
|
||||||||||||
|
Use the Transport's codeset factories to set the translator for input and output CDRs. Definition at line 2322 of file Transport.cpp. References TAO_Codeset_Translator_Base::assign(). Referenced by TAO::LocateRequest_Invocation::check_reply(), TAO::Synch_Twoway_Invocation::check_reply_status(), TAO_ServerRequest::init_reply(), TAO_GIOP_Message_Lite::process_request(), TAO_GIOP_Message_Base::process_request(), TAO_GIOP_Message_Lite::process_request_message(), TAO_GIOP_Message_Base::process_request_message(), TAO_ServerRequest::send_cached_reply(), TAO_GIOP_Message_Lite::send_reply_exception(), TAO_ServerRequest::tao_send_reply_exception(), and TAO::Remote_Invocation::write_header().
02323 {
02324 if (this->char_translator_)
02325 {
02326 this->char_translator_->assign (inp);
02327 this->char_translator_->assign (outp);
02328 }
02329 if (this->wchar_translator_)
02330 {
02331 this->wchar_translator_->assign (inp);
02332 this->wchar_translator_->assign (outp);
02333 }
02334 }
|
|
|
Set the bidirectional flag.
Definition at line 39 of file Transport.inl. References bidirectional_flag_.
00040 {
00041 this->bidirectional_flag_ = flag;
00042 }
|
|
|
Get the bidirectional flag.
Definition at line 33 of file Transport.inl. References bidirectional_flag_. Referenced by TAO::Wait_On_LF_No_Upcall::can_process_upcalls(), TAO_IIOP_Transport::generate_request_header(), TAO_Muxed_TMS::request_id(), TAO_Exclusive_TMS::request_id(), and TAO_IIOP_Transport::tear_listen_point_list().
00034 {
00035 return this->bidirectional_flag_;
00036 }
|
|
|
Get the Cache Map entry.
Definition at line 57 of file Transport.inl. References cache_map_entry_.
00058 {
00059 return this->cache_map_entry_;
00060 }
|
|
|
Set the Cache Map entry.
Definition at line 63 of file Transport.inl. References cache_map_entry_, and TAO::Transport_Cache_Manager::HASH_MAP_ENTRY. Referenced by TAO::Transport_Cache_Manager::bind_i().
00065 {
00066 this->cache_map_entry_ = entry;
00067 }
|
|
|
Cancel handle_output() callbacks.
Definition at line 759 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, ACE_Reactor::cancel_wakeup(), event_handler_i(), LM_DEBUG, ACE_Event_Handler::reactor(), and TAO_debug_level. Referenced by TAO_Reactive_Flushing_Strategy::cancel_output(), and TAO_Leader_Follower_Flushing_Strategy::cancel_output().
00760 {
00761 ACE_Event_Handler * const eh = this->event_handler_i ();
00762 ACE_Reactor *const reactor = eh->reactor ();
00763
00764 if (TAO_debug_level > 3)
00765 {
00766 ACE_DEBUG ((LM_DEBUG,
00767 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"),
00768 this->id ()));
00769 }
00770
00771 return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00772 }
|
|
|
CodeSet negotiation - Set the char codeset translator factory.
Definition at line 137 of file Transport.inl. References tcs_set_.
00138 {
00139 this->char_translator_ = tf;
00140 this->tcs_set_ = 1;
00141 }
|
|
|
CodeSet Negotiation - Get the char codeset translator factory.
Definition at line 125 of file Transport.inl.
00126 {
00127 return this->char_translator_;
00128 }
|
|
||||||||||||
|
Check if the buffering constraints have been reached.
Definition at line 1056 of file Transport.cpp. References ACE_Reactor::cancel_timer(), current_deadline_, event_handler_i(), flush_timer_id_, flush_timer_pending(), ACE_OS::gettimeofday(), TAO_Queued_Message::message_length(), TAO_Queued_Message::next(), ACE_Event_Handler::reactor(), ACE_Reactor::schedule_timer(), and TAO_Stub::transport_queueing_strategy(). Referenced by send_asynchronous_message_i().
01058 {
01059 // First let's compute the size of the queue:
01060 size_t msg_count = 0;
01061 size_t total_bytes = 0;
01062
01063 for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
01064 {
01065 ++msg_count;
01066 total_bytes += i->message_length ();
01067 }
01068
01069 bool set_timer;
01070 ACE_Time_Value new_deadline;
01071
01072 bool constraints_reached =
01073 stub->transport_queueing_strategy ().
01074 buffering_constraints_reached (stub,
01075 msg_count,
01076 total_bytes,
01077 must_flush,
01078 this->current_deadline_,
01079 set_timer,
01080 new_deadline);
01081
01082 // ... set the new timer, also cancel any previous timers ...
01083 if (set_timer)
01084 {
01085 ACE_Event_Handler *eh = this->event_handler_i ();
01086 ACE_Reactor *reactor = eh->reactor ();
01087 this->current_deadline_ = new_deadline;
01088 ACE_Time_Value delay =
01089 new_deadline - ACE_OS::gettimeofday ();
01090
01091 if (this->flush_timer_pending ())
01092 {
01093 reactor->cancel_timer (this->flush_timer_id_);
01094 }
01095
01096 this->flush_timer_id_ =
01097 reactor->schedule_timer (&this->transport_timer_,
01098 &this->current_deadline_,
01099 delay);
01100 }
01101
01102 return constraints_reached;
01103 }
|
|
|
Cleanup the queue. Exactly byte_count bytes have been sent, the queue must be cleaned up as potentially several messages have been completely sent out. It leaves on head_ the next message to send out. Definition at line 1019 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_Queued_Message::all_data_sent(), TAO_Queued_Message::bytes_transferred(), TAO_Queued_Message::destroy(), LM_DEBUG, TAO_Queued_Message::message_length(), TAO_Queued_Message::remove_from_list(), and TAO_debug_level. Referenced by drain_queue_helper().
01020 {
01021 while (this->head_ != 0 && byte_count > 0)
01022 {
01023 TAO_Queued_Message *i = this->head_;
01024
01025 if (TAO_debug_level > 4)
01026 {
01027 ACE_DEBUG ((LM_DEBUG,
01028 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01029 ACE_TEXT ("byte_count = %d\n"),
01030 this->id (), byte_count));
01031 }
01032
01033 // Update the state of the first message
01034 i->bytes_transferred (byte_count);
01035
01036 if (TAO_debug_level > 4)
01037 {
01038 ACE_DEBUG ((LM_DEBUG,
01039 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01040 ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"),
01041 this->id (), byte_count, i->all_data_sent (),
01042 i->message_length ()));
01043 }
01044
01045 // ... if all the data was sent the message must be removed from
01046 // the queue...
01047 if (i->all_data_sent ())
01048 {
01049 i->remove_from_list (this->head_, this->tail_);
01050 i->destroy ();
01051 }
01052 }
01053 }
|
|
|
Cleanup the complete queue.
Definition at line 992 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_Queued_Message::destroy(), LM_DEBUG, TAO_Queued_Message::remove_from_list(), TAO_LF_Event::state_changed(), and TAO_debug_level. Referenced by send_connection_closed_notifications_i(), and ~TAO_Transport().
00993 {
00994 if (TAO_debug_level > 4)
00995 {
00996 ACE_DEBUG ((LM_DEBUG,
00997 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
00998 ACE_TEXT ("cleaning up complete queue\n"),
00999 this->id ()));
01000 }
01001
01002 // Cleanup all messages
01003 while (this->head_ != 0)
01004 {
01005 TAO_Queued_Message *i = this->head_;
01006
01007 // @@ This is a good point to insert a flag to indicate that a
01008 // CloseConnection message was successfully received.
01009 i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
01010 this->orb_core_->leader_follower ());
01011
01012 i->remove_from_list (this->head_, this->tail_);
01013
01014 i->destroy ();
01015 }
01016 }
|
|
||||||||||||
|
It is necessary to clear the codeset translator when a CDR stream is used for more than one GIOP message. This is required since the header must not be translated, whereas the body must be. Definition at line 2337 of file Transport.cpp. References ACE_OutputCDR::char_translator(), ACE_InputCDR::char_translator(), ACE_OutputCDR::wchar_translator(), and ACE_InputCDR::wchar_translator(). Referenced by TAO::Remote_Invocation::write_header().
02338 {
02339 if (inp)
02340 {
02341 inp->char_translator (0);
02342 inp->wchar_translator (0);
02343 }
02344 if (outp)
02345 {
02346 outp->char_translator (0);
02347 outp->wchar_translator (0);
02348 }
02349 }
|
|
|
Call the implementation method after obtaining the lock.
Definition at line 293 of file Transport.cpp. References TAO_Connection_Handler::close_connection(), and connection_handler_i(). Referenced by TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), post_open(), TAO::Transport_Cache_Manager::purge(), TAO_GIOP_Message_Base::send_close_connection(), TAO::Remote_Invocation::send_message(), send_message_shared(), TAO_IIOP_Transport::send_message_shared(), and TAO_Wait_On_Read::wait().
00294 {
00295 this->connection_handler_i ()->close_connection ();
00296 }
|
|
|
Get the connection handler for this transport.
Definition at line 175 of file Transport.inl. References connection_handler_i(). Referenced by TAO_IIOP_Connector::complete_connection(), TAO::Remote_Invocation::send_message(), TAO_Connect_Strategy::wait(), and TAO_Connector::wait_for_connection_completion().
00176 {
00177 return this->connection_handler_i();
00178 }
|
|
|
Implemented in TAO_IIOP_Transport. Referenced by close_connection(), and connection_handler(). |
|
|
-1 error, otherwise 0
Definition at line 1491 of file Transport.cpp. References ACE_ERROR, ACE_TEXT, TAO_Pluggable_Messaging::consolidate_fragmented_message(), TAO_Incoming_Message_Queue::enqueue_tail(), incoming_message_queue_, LM_ERROR, messaging_object(), TAO_Queued_Data::missing_data_, TAO_Queued_Data::more_fragments_, TAO_Queued_Data::msg_type_, TAO_Queued_Data::release(), TAO_debug_level, and TAO_PLUGGABLE_MESSAGE_FRAGMENT. Referenced by handle_input_parse_data(), and handle_input_parse_extra_messages().
01492 {
01493 // consolidate message on top of stack, only for fragmented messages
01494
01495 // paranoid check
01496 if (q_data->missing_data_ != 0)
01497 {
01498 return -1;
01499 }
01500
01501 if (q_data->more_fragments_ ||
01502 q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
01503 {
01504 TAO_Queued_Data *new_q_data = 0;
01505
01506 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01507 {
01508 case -1: // error
01509 return -1;
01510
01511 case 0: // returning consolidated message in new_q_data
01512 if (!new_q_data)
01513 {
01514 if (TAO_debug_level > 0)
01515 {
01516 ACE_ERROR ((LM_ERROR,
01517 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ")
01518 ACE_TEXT ("error, consolidated message is NULL\n"),
01519 this->id ()));
01520 }
01521 return -1;
01522 }
01523
01524 if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0)
01525 {
01526 TAO_Queued_Data::release (new_q_data);
01527 return -1;
01528 }
01529 break;
01530
01531 case 1: // fragment has been stored in messaging_oject()
01532 break;
01533 }
01534 }
01535 else
01536 {
01537 if (this->incoming_message_queue_.enqueue_tail (q_data) != 0)
01538 {
01539 TAO_Queued_Data::release (q_data);
01540 return -1;
01541 }
01542 }
01543
01544 return 0; // success
01545 }
|
|
||||||||||||
|
-1 error, otherwise 0
Definition at line 1404 of file Transport.cpp. References ACE_ERROR, ACE_TEXT, TAO_Pluggable_Messaging::consolidate_fragmented_message(), LM_ERROR, messaging_object(), TAO_Queued_Data::missing_data_, TAO_Queued_Data::more_fragments_, TAO_Queued_Data::msg_type_, process_parsed_messages(), TAO_Queued_Data::release(), TAO_debug_level, and TAO_PLUGGABLE_MESSAGE_FRAGMENT. Referenced by handle_input_missing_data().
01406 {
01407 // paranoid check
01408 if (q_data->missing_data_ != 0)
01409 {
01410 if (TAO_debug_level > 0)
01411 {
01412 ACE_ERROR ((LM_ERROR,
01413 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01414 ACE_TEXT ("missing data\n"),
01415 this->id ()));
01416 }
01417 return -1;
01418 }
01419
01420 if (q_data->more_fragments_ ||
01421 q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
01422 {
01423 // consolidate message on top of stack, only for fragmented messages
01424 TAO_Queued_Data *new_q_data = 0;
01425
01426 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01427 {
01428 case -1: // error
01429 return -1;
01430
01431 case 0: // returning consolidated message in q_data
01432 if (!new_q_data)
01433 {
01434 if (TAO_debug_level > 0)
01435 {
01436 ACE_ERROR ((LM_ERROR,
01437 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01438 ACE_TEXT ("error, consolidated message is NULL\n"),
01439 this->id ()));
01440 }
01441 return -1;
01442 }
01443
01444
01445 if (this->process_parsed_messages (new_q_data, rh) == -1)
01446 {
01447 TAO_Queued_Data::release (new_q_data);
01448
01449 if (TAO_debug_level > 0)
01450 {
01451 ACE_ERROR ((LM_ERROR,
01452 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01453 ACE_TEXT ("error processing consolidated message\n"),
01454 this->id ()));
01455 }
01456 return -1;
01457 }
01458
01459 TAO_Queued_Data::release (new_q_data);
01460
01461 break;
01462
01463 case 1: // fragment has been stored in messaging_oject()
01464 break;
01465 }
01466 }
01467 else
01468 {
01469 if (this->process_parsed_messages (q_data, rh) == -1)
01470 {
01471 TAO_Queued_Data::release (q_data);
01472
01473 if (TAO_debug_level > 0)
01474 {
01475 ACE_ERROR ((LM_ERROR,
01476 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01477 ACE_TEXT ("error processing message\n"),
01478 this->id ()));
01479 }
01480 return -1;
01481 }
01482
01483 TAO_Queued_Data::release (q_data);
01484
01485 }
01486
01487 return 0;
01488 }
|
|
|
Send some of the data in the queue. As the outgoing data is drained this method is invoked to send as much of the current message as possible. Returns 0 if there is more data to send, -1 if there was an error and 1 if the message was completely sent. Definition at line 807 of file Transport.cpp. References ACE_GUARD_RETURN, TAO_Flushing_Strategy::cancel_output(), drain_queue_i(), TAO_ORB_Core::flushing_strategy(), and orb_core(). Referenced by handle_output().
00808 {
00809 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00810 int const retval = this->drain_queue_i ();
00811
00812 if (retval == 1)
00813 {
00814 // ... there is no current message or it was completely
00815 // sent, cancel output...
00816 TAO_Flushing_Strategy *flushing_strategy =
00817 this->orb_core ()->flushing_strategy ();
00818
00819 flushing_strategy->cancel_output (this);
00820
00821 return 0;
00822 }
00823
00824 return retval;
00825 }
|
|
||||||||||||
|
A helper routine used in drain_queue_i().
Definition at line 828 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, cleanup_queue(), dump_iov(), EWOULDBLOCK, LM_DEBUG, send(), sent_byte_count_, ssize_t, and TAO_debug_level. Referenced by drain_queue_i().
00829 {
00830 size_t byte_count = 0;
00831
00832 // ... send the message ...
00833 ssize_t retval = -1;
00834
00835 #ifdef ACE_HAS_SENDFILE
00836 if (this->mmap_allocator_)
00837 retval = this->sendfile (this->mmap_allocator_,
00838 iov,
00839 iovcnt,
00840 byte_count);
00841 else
00842 #endif /* ACE_HAS_SENDFILE */
00843 retval = this->send (iov, iovcnt, byte_count);
00844
00845 if (TAO_debug_level == 5)
00846 {
00847 dump_iov (iov, iovcnt, this->id (),
00848 byte_count, "drain_queue_helper");
00849 }
00850
00851 // ... now we need to update the queue, removing elements
00852 // that have been sent, and updating the last element if it
00853 // was only partially sent ...
00854 this->cleanup_queue (byte_count);
00855 iovcnt = 0;
00856
00857 if (retval == 0)
00858 {
00859 if (TAO_debug_level > 4)
00860 {
00861 ACE_DEBUG ((LM_DEBUG,
00862 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00863 ACE_TEXT ("send() returns 0\n"),
00864 this->id ()));
00865 }
00866 return -1;
00867 }
00868 else if (retval == -1)
00869 {
00870 if (TAO_debug_level > 4)
00871 {
00872 ACE_DEBUG ((LM_DEBUG,
00873 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00874 ACE_TEXT ("error during %p\n"),
00875 this->id (), ACE_TEXT ("send()")));
00876 }
00877
00878 if (errno == EWOULDBLOCK || errno == EAGAIN)
00879 {
00880 return 0;
00881 }
00882
00883 return -1;
00884 }
00885
00886 // ... start over, how do we guarantee progress? Because if
00887 // no bytes are sent send() can only return 0 or -1
00888
00889 // Total no. of bytes sent for a send call
00890 this->sent_byte_count_ += byte_count;
00891
00892 if (TAO_debug_level > 4)
00893 {
00894 ACE_DEBUG ((LM_DEBUG,
00895 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00896 ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"),
00897 this->id(), byte_count, (this->head_ == 0)));
00898 }
00899
00900 return 1;
00901 }
|
|
|
Implement drain_queue() assuming the lock is held.
Definition at line 904 of file Transport.cpp. References ACE_DEBUG, ACE_IOV_MAX, ACE_TEXT, ACE_Reactor::cancel_timer(), drain_queue_helper(), event_handler_i(), TAO_Queued_Message::fill_iov(), flush_timer_pending(), LM_DEBUG, TAO_Queued_Message::next(), ACE_Event_Handler::reactor(), reset_flush_timer(), sent_byte_count_, and TAO_debug_level. Referenced by drain_queue(), TAO_Block_Flushing_Strategy::schedule_output(), send_message_block_chain_i(), and send_synch_message_helper_i().
00905 {
00906 // This is the vector used to send data, it must be declared outside
00907 // the loop because after the loop there may still be data to be
00908 // sent
00909 int iovcnt = 0;
00910 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00911 iovec iov[ACE_IOV_MAX] = { 0 , 0 };
00912 #else
00913 iovec iov[ACE_IOV_MAX];
00914 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
00915
00916 // We loop over all the elements in the queue ...
00917 TAO_Queued_Message *i = this->head_;
00918
00919 // Reset the value so that the counting is done for each new send
00920 // call.
00921 this->sent_byte_count_ = 0;
00922
00923 while (i != 0)
00924 {
00925 // ... each element fills the iovector ...
00926 i->fill_iov (ACE_IOV_MAX, iovcnt, iov);
00927
00928 // ... the vector is full, no choice but to send some data out.
00929 // We need to loop because a single message can span multiple
00930 // IOV_MAX elements ...
00931 if (iovcnt == ACE_IOV_MAX)
00932 {
00933 int const retval =
00934 this->drain_queue_helper (iovcnt, iov);
00935
00936 if (TAO_debug_level > 4)
00937 {
00938 ACE_DEBUG ((LM_DEBUG,
00939 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
00940 ACE_TEXT ("helper retval = %d\n"),
00941 this->id (), retval));
00942 }
00943
00944 if (retval != 1)
00945 {
00946 return retval;
00947 }
00948
00949 i = this->head_;
00950 continue;
00951 }
00952 // ... notice that this line is only reached if there is still
00953 // room in the iovector ...
00954 i = i->next ();
00955 }
00956
00957 if (iovcnt != 0)
00958 {
00959 int const retval = this->drain_queue_helper (iovcnt, iov);
00960
00961 if (TAO_debug_level > 4)
00962 {
00963 ACE_DEBUG ((LM_DEBUG,
00964 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
00965 ACE_TEXT ("helper retval = %d\n"),
00966 this->id (), retval));
00967 }
00968
00969 if (retval != 1)
00970 {
00971 return retval;
00972 }
00973 }
00974
00975 if (this->head_ == 0)
00976 {
00977 if (this->flush_timer_pending ())
00978 {
00979 ACE_Event_Handler *eh = this->event_handler_i ();
00980 ACE_Reactor * const reactor = eh->reactor ();
00981 reactor->cancel_timer (this->flush_timer_id_);
00982 this->reset_flush_timer ();
00983 }
00984
00985 return 1;
00986 }
00987
00988 return 0;
00989 }
|
|
|
Normally a concrete TAO_Transport object has-a ACE_Event_Handler member that functions as an adapter between the ACE_Reactor framework and the TAO pluggable protocol framework. In all the protocols implemented so far this role is fullfilled by an instance of ACE_Svc_Handler.
Implemented in TAO_IIOP_Transport. Referenced by add_reference(), cancel_output_i(), check_buffering_constraints_i(), drain_queue_i(), TAO::Transport_Cache_Manager::find_transport(), notify_reactor(), register_handler(), remove_reference(), schedule_output_i(), TAO_Connection_Handler::transport(), and TAO_Wait_On_Read::wait(). |
|
|
Set the state of the first_request_ flag to 0.
Definition at line 158 of file Transport.inl. References first_request_. Referenced by TAO_IIOP_Transport::send_request().
00159 {
00160 this->first_request_ = 0;
00161 }
|
|
|
Check if the flush timer is still pending.
Definition at line 108 of file Transport.inl. References flush_timer_id_. Referenced by check_buffering_constraints_i(), drain_queue_i(), and handle_timeout().
00109 {
00110 return this->flush_timer_id_ != -1;
00111 }
|
|
|
Format and queue a message for stream.
Definition at line 481 of file Transport.cpp. References ACE_OutputCDR::begin(), TAO_Pluggable_Messaging::format_message(), messaging_object(), and queue_message_i(). Referenced by TAO::Synch_Oneway_Invocation::remote_oneway().
00482 {
00483 if (this->messaging_object ()->format_message (stream) != 0)
00484 return -1;
00485
00486 return this->queue_message_i (stream.begin());
00487 }
|
|
||||||||||||||||
|
This is a request for the transport object to write a LocateRequest header before it is sent out. Definition at line 352 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_Pluggable_Messaging::generate_locate_request_header(), LM_DEBUG, messaging_object(), and TAO_debug_level. Referenced by TAO::LocateRequest_Invocation::invoke().
00356 {
00357 if (this->messaging_object ()->generate_locate_request_header (opdetails,
00358 spec,
00359 output)
00360 == -1)
00361 {
00362 if (TAO_debug_level > 0)
00363 {
00364 ACE_DEBUG ((LM_DEBUG,
00365 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ")
00366 ACE_TEXT ("error while marshalling the LocateRequest header\n"),
00367 this->id ()));
00368 }
00369
00370 return -1;
00371 }
00372
00373 return 0;
00374 }
|
|
||||||||||||||||
|
This is a request for the transport object to write a request header before it sends out the request Reimplemented in TAO_IIOP_Transport. Definition at line 377 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_ORB_Core::codeset_manager(), first_request_, TAO_Pluggable_Messaging::generate_request_header(), TAO_Codeset_Manager::generate_service_context(), LM_DEBUG, messaging_object(), orb_core(), and TAO_debug_level. Referenced by TAO_IIOP_Transport::generate_request_header(), and TAO::Remote_Invocation::write_header().
00381 {
00382 // codeset service context is only supposed to be sent in the first request
00383 // on a particular connection.
00384 if (this->first_request_)
00385 {
00386 TAO_Codeset_Manager * const csm = this->orb_core ()->codeset_manager ();
00387 if (csm)
00388 csm->generate_service_context (opdetails,*this);
00389 }
00390
00391 if (this->messaging_object ()->generate_request_header (opdetails,
00392 spec,
00393 output) == -1)
00394 {
00395 if (TAO_debug_level > 0)
00396 {
00397 ACE_DEBUG ((LM_DEBUG,
00398 ACE_TEXT ("(%P|%t) - Transport[%d]::generate_request_header, ")
00399 ACE_TEXT ("error while marshalling the Request header\n"),
00400 this->id()));
00401 }
00402
00403 return -1;
00404 }
00405
00406 return 0;
00407 }
|
|
||||||||||||||||
|
Callback to read incoming data. The ACE_Event_Handler adapter invokes this method as part of its handle_input() operation.
Definition at line 1329 of file Transport.cpp. References ACE_DEBUG, ACE_ERROR, ACE_TEXT, handle_input_missing_data(), handle_input_parse_data(), incoming_message_stack_, LM_DEBUG, LM_ERROR, TAO_Queued_Data::missing_data_, process_queue_head(), TAO_debug_level, TAO_MISSING_DATA_UNDEFINED, and TAO::Incoming_Message_Stack::top(). Referenced by TAO_Connection_Handler::handle_input_internal(), TAO_Connection_Handler::svc_i(), and TAO_Wait_On_Read::wait().
01332 {
01333 if (TAO_debug_level > 3)
01334 {
01335 ACE_DEBUG ((LM_DEBUG,
01336 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"),
01337 this->id ()));
01338 }
01339
01340 // First try to process messages of the head of the incoming queue.
01341 int const retval = this->process_queue_head (rh);
01342
01343 if (retval <= 0)
01344 {
01345 if (retval == -1)
01346 {
01347 if (TAO_debug_level > 2)
01348 {
01349 ACE_DEBUG ((LM_DEBUG,
01350 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01351 ACE_TEXT ("error while parsing the head of the queue\n"),
01352 this->id()));
01353
01354 }
01355 return -1;
01356 }
01357 else
01358 {
01359 // retval == 0
01360
01361 // Processed a message in queue successfully. This
01362 // thread must return to thread-pool now.
01363 return 0;
01364 }
01365 }
01366
01367 TAO_Queued_Data *q_data = 0;
01368
01369 if (this->incoming_message_stack_.top (q_data) != -1
01370 && q_data->missing_data_ != TAO_MISSING_DATA_UNDEFINED)
01371 {
01372 /* PRE: q_data->missing_data_ > 0 as all QD on stack must be incomplete */
01373 if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1)
01374 {
01375 if (TAO_debug_level > 0)
01376 {
01377 ACE_ERROR ((LM_ERROR,
01378 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01379 ACE_TEXT ("error consolidating incoming message\n"),
01380 this->id ()));
01381 }
01382 return -1;
01383 }
01384 }
01385 else
01386 {
01387 if (this->handle_input_parse_data (rh, max_wait_time) == -1)
01388 {
01389 if (TAO_debug_level > 0)
01390 {
01391 ACE_ERROR ((LM_ERROR,
01392 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01393 ACE_TEXT ("error parsing incoming message\n"),
01394 this->id ()));
01395 }
01396 return -1;
01397 }
01398 }
01399
01400 return 0;
01401 }
|
|
||||||||||||||||
|
Is invoked by handle_input operation. It consolidate message on top of incoming_message_stack. The amount of missing data is known and recv operation copies data directly into message buffer, as much as a single recv-invocation provides. Definition at line 1548 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, consolidate_process_message(), ACE_CDR::grow(), incoming_message_stack_, ACE_Message_Block::length(), LM_DEBUG, TAO_Queued_Data::missing_data_, TAO_Queued_Data::msg_block_, TAO::Incoming_Message_Stack::pop(), recv(), ACE_Message_Block::space(), ssize_t, TAO_debug_level, and ACE_Message_Block::wr_ptr(). Referenced by handle_input().
01551 {
01552 // paranoid check
01553 if (q_data == 0)
01554 {
01555 return -1;
01556 }
01557
01558 if (TAO_debug_level > 3)
01559 {
01560 ACE_DEBUG ((LM_DEBUG,
01561 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01562 ACE_TEXT ("enter (missing data == %d)\n"),
01563 this->id (), q_data->missing_data_));
01564 }
01565
01566 size_t const recv_size = q_data->missing_data_;
01567
01568 // make sure the message_block has enough space
01569 size_t const message_size = recv_size
01570 + q_data->msg_block_->length();
01571
01572 if (q_data->msg_block_->space() < recv_size)
01573 {
01574 if (ACE_CDR::grow (q_data->msg_block_, message_size) == -1)
01575 {
01576 return -1;
01577 }
01578 }
01579
01580 // Saving the size of the received buffer in case any one needs to
01581 // get the size of the message thats received in the
01582 // context. Obviously the value will be changed for each recv call
01583 // and the user is supposed to invoke the accessor only in the
01584 // invocation context to get meaningful information.
01585 this->recv_buffer_size_ = recv_size;
01586
01587 // Read the message into the existing message block on heap
01588 ssize_t const n = this->recv (q_data->msg_block_->wr_ptr(),
01589 recv_size,
01590 max_wait_time);
01591
01592
01593 if (n <= 0)
01594 {
01595 return n;
01596 }
01597
01598 if (TAO_debug_level > 3)
01599 {
01600 ACE_DEBUG ((LM_DEBUG,
01601 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01602 ACE_TEXT ("read bytes %d\n"),
01603 this->id (), n));
01604 }
01605
01606 q_data->msg_block_->wr_ptr(n);
01607 q_data->missing_data_ -= n;
01608
01609 if (q_data->missing_data_ == 0)
01610 {
01611 // paranoid check
01612 if (this->incoming_message_stack_.pop (q_data) == -1)
01613 {
01614 return -1;
01615 }
01616
01617 if (this->consolidate_process_message (q_data, rh) == -1)
01618 {
01619 return -1;
01620 }
01621 }
01622
01623 return 0;
01624 }
|
|
||||||||||||
|
Is invoked by handle_input operation. It parses new messages from input stream or consolidates messages whose header has been partially read, the message size being unknown so far. It parses as much data as a single recv-invocation provides. Definition at line 1671 of file Transport.cpp. References ACE_DEBUG, ACE_ERROR, ACE_TEXT, allocate_partial_message_block(), consolidate_enqueue_message(), TAO_Pluggable_Messaging::consolidate_node(), ACE_Message_Block::copy(), TAO_Queued_Data::duplicate(), ACE_CDR::grow(), handle_input_parse_extra_messages(), TAO_Pluggable_Messaging::header_length(), incoming_message_queue_, incoming_message_stack_, TAO_ORB_Core::input_cdr_dblock_allocator(), ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO_ORB_Core::locking_strategy(), ACE_CDR::mb_align(), ACE_OS::memset(), messaging_object(), TAO_Queued_Data::missing_data_, TAO_Queued_Data::more_fragments_, TAO_Queued_Data::msg_block_, TAO_Queued_Data::msg_type_, notify_reactor(), TAO_ORB_Core::orb_params(), TAO_Pluggable_Messaging::parse_next_message(), partial_message_, TAO::Incoming_Message_Stack::pop(), process_parsed_messages(), process_queue_head(), TAO::Incoming_Message_Stack::push(), TAO_Incoming_Message_Queue::queue_length(), ACE_Message_Block::rd_ptr(), recv(), ACE_Message_Block::reset(), TAO_Resume_Handle::set_flag(), TAO_ORB_Parameters::single_read_optimization(), ACE_Message_Block::space(), ssize_t, TAO_debug_level, TAO_MAXBUFSIZE, TAO_MISSING_DATA_UNDEFINED, TAO_PLUGGABLE_MESSAGE_FRAGMENT, TAO::Incoming_Message_Stack::top(), and ACE_Message_Block::wr_ptr(). Referenced by handle_input().
01673 {
01674
01675 if (TAO_debug_level > 3)
01676 {
01677 ACE_DEBUG ((LM_DEBUG,
01678 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01679 ACE_TEXT ("enter\n"),
01680 this->id ()));
01681 }
01682
01683
01684 // The buffer on the stack which will be used to hold the input
01685 // messages, ACE_CDR::MAX_ALIGNMENT compensates the
01686 // memory-alignment. This improves performance with SUN-Java-ORB-1.4
01687 // and higher that sends fragmented requests of size 1024 bytes.
01688 char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT];
01689
01690 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
01691 (void) ACE_OS::memset (buf,
01692 '\0',
01693 sizeof buf);
01694 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
01695
01696 // Create a data block
01697 ACE_Data_Block db (sizeof (buf),
01698 ACE_Message_Block::MB_DATA,
01699 buf,
01700 this->orb_core_->input_cdr_buffer_allocator (),
01701 this->orb_core_->locking_strategy (),
01702 ACE_Message_Block::DONT_DELETE,
01703 this->orb_core_->input_cdr_dblock_allocator ());
01704
01705 // Create a message block
01706 ACE_Message_Block message_block (&db,
01707 ACE_Message_Block::DONT_DELETE,
01708 this->orb_core_->input_cdr_msgblock_allocator ());
01709
01710
01711 // Align the message block
01712 ACE_CDR::mb_align (&message_block);
01713
01714 size_t recv_size = 0; // Note: unsigned integer
01715
01716 // Pointer to newly parsed message
01717 TAO_Queued_Data *q_data = 0;
01718
01719 // optimizing access of constants
01720 const size_t header_length =
01721 this->messaging_object ()->header_length ();
01722
01723 // paranoid check
01724 if (header_length > message_block.space ())
01725 {
01726 return -1;
01727 }
01728
01729 if (this->orb_core_->orb_params ()->single_read_optimization ())
01730 {
01731 recv_size =
01732 message_block.space ();
01733 }
01734 else
01735 {
01736 // Single read optimization has been de-activated. That means
01737 // that we need to read from transport the GIOP header first
01738 // before the payload. This codes first checks the incoming
01739 // stack for partial messages which needs to be
01740 // consolidated. Otherwise we are in new cycle, reading complete
01741 // GIOP header of new incoming message.
01742 if (this->incoming_message_stack_.top (q_data) != -1
01743 && q_data->missing_data_ == TAO_MISSING_DATA_UNDEFINED)
01744 {
01745 // There is a partial message on incoming_message_stack_
01746 // whose length is unknown so far. We need to consolidate
01747 // the GIOP header to get to know the payload size,
01748 recv_size = header_length - q_data->msg_block_->length ();
01749 }
01750 else
01751 {
01752 // Read amount of data forming GIOP header of new incoming
01753 // message.
01754 recv_size = header_length;
01755 }
01756 // POST: 0 <= recv_size <= header_length
01757 }
01758 // POST: 0 <= recv_size <= message_block->space ()
01759
01760 // If we have a partial message, copy it into our message block and
01761 // clear out the partial message.
01762 if (this->partial_message_ != 0 && this->partial_message_->length () > 0)
01763 {
01764 // (*) Copy back the partial message into current read-buffer,
01765 // verify that the read-strategy of "recv_size" bytes is not
01766 // exceeded. The latter check guarantees that recv_size does not
01767 // roll-over and keeps in range
01768 // 0<=recv_size<=message_block->space()
01769 if (this->partial_message_->length () <= recv_size &&
01770 message_block.copy (this->partial_message_->rd_ptr (),
01771 this->partial_message_->length ()) == 0)
01772 {
01773
01774 recv_size -= this->partial_message_->length ();
01775 this->partial_message_->reset ();
01776 }
01777 else
01778 {
01779 return -1;
01780 }
01781 }
01782 // POST: 0 <= recv_size <= buffer_space
01783
01784 if (0 >= recv_size) // paranoid: the check above (*) guarantees recv_size>=0
01785 {
01786 // This event would cause endless looping, trying frequently to
01787 // read zero bytes from stream. This might happen, if TAOs
01788 // protocol implementation is not correct and tries to read data
01789 // beyond header without "single_read_optimazation" being
01790 // activated.
01791 if (TAO_debug_level > 0)
01792 {
01793 ACE_ERROR ((LM_ERROR,
01794 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01795 ACE_TEXT ("Error - endless loop detection, closing connection"),
01796 this->id ()));
01797 }
01798 return -1;
01799 }
01800
01801 // Saving the size of the received buffer in case any one needs to
01802 // get the size of the message thats received in the
01803 // context. Obviously the value will be changed for each recv call
01804 // and the user is supposed to invoke the accessor only in the
01805 // invocation context to get meaningful information.
01806 this->recv_buffer_size_ = recv_size;
01807
01808 // Read the message into the message block that we have created on
01809 // the stack.
01810 const ssize_t n = this->recv (message_block.wr_ptr (),
01811 recv_size,
01812 max_wait_time);
01813
01814 // If there is an error return to the reactor..
01815 if (n <= 0)
01816 {
01817 return n;
01818 }
01819
01820 if (TAO_debug_level > 3)
01821 {
01822 ACE_DEBUG ((LM_DEBUG,
01823 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01824 ACE_TEXT ("read %d bytes\n"),
01825 this->id (), n));
01826 }
01827
01828 // Set the write pointer in the stack buffer
01829 message_block.wr_ptr (n);
01830
01831 //
01832 // STACK PROCESSING OR MESSAGE CONSOLIDATION
01833 //
01834
01835 // PRE: data in buffer is aligned && message_block.length() > 0
01836
01837 if (this->incoming_message_stack_.top (q_data) != -1
01838 && q_data->missing_data_ == TAO_MISSING_DATA_UNDEFINED)
01839 {
01840 //
01841 // MESSAGE CONSOLIDATION
01842 //
01843
01844 // Partial message on incoming_message_stack_ needs to be
01845 // consolidated. The message header could not be parsed so far
01846 // and therefor the message size is unknown yet. Consolidating
01847 // the message destroys the memory alignment of succeeding
01848 // messages sharing the buffer, for that reason consolidation
01849 // and stack based processing are mutial exclusive.
01850 if (this->messaging_object ()->consolidate_node (q_data,
01851 message_block) == -1)
01852 {
01853 if (TAO_debug_level > 0)
01854 {
01855 ACE_ERROR ((LM_ERROR,
01856 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01857 ACE_TEXT ("error consolidating message from input buffer\n"),
01858 this->id () ));
01859 }
01860 return -1;
01861 }
01862
01863 // Complete message are to be enqueued and later processed
01864 if (q_data->missing_data_ == 0)
01865 {
01866 if (this->incoming_message_stack_.pop (q_data) == -1)
01867 {
01868 return -1;
01869 }
01870
01871 if (this->consolidate_enqueue_message (q_data) == -1)
01872 {
01873 return -1;
01874 }
01875 }
01876
01877 if (message_block.length () > 0
01878 && this->handle_input_parse_extra_messages (message_block) == -1)
01879 {
01880 return -1;
01881 }
01882
01883 // In any case try to process the enqueued messages
01884 if (this->process_queue_head (rh) == -1)
01885 {
01886 return -1;
01887 }
01888 }
01889 else
01890 {
01891 //
01892 // STACK PROCESSING (critical path)
01893 //
01894
01895 // Process the first message in buffer on stack
01896
01897 // (PRE: first message resides in aligned memory) Make a node of
01898 // the message-block..
01899
01900 TAO_Queued_Data qd (&message_block,
01901 this->orb_core_->transport_message_buffer_allocator ());
01902
01903 size_t mesg_length = 0;
01904
01905 if (this->messaging_object ()->parse_next_message (message_block,
01906 qd,
01907 mesg_length) == -1
01908 || (qd.missing_data_ == 0
01909 && mesg_length > message_block.length ()) )
01910 {
01911 // extracting message failed
01912 return -1;
01913 }
01914 // POST: qd.missing_data_ == 0 --> mesg_length <= message_block.length()
01915 // This prevents seeking rd_ptr behind the wr_ptr
01916
01917 if (qd.missing_data_ != 0 ||
01918 qd.more_fragments_ ||
01919 qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
01920 {
01921 if (qd.missing_data_ == 0)
01922 {
01923 // Dealing with a fragment
01924 TAO_Queued_Data *nqd =
01925 TAO_Queued_Data::duplicate (qd);
01926
01927 if (nqd == 0)
01928 {
01929 return -1;
01930 }
01931
01932 // mark the end of message in new buffer
01933 char* end_mark = nqd->msg_block_->rd_ptr ()
01934 + mesg_length;
01935 nqd->msg_block_->wr_ptr (end_mark);
01936
01937 // move the read pointer forward in old buffer
01938 message_block.rd_ptr (mesg_length);
01939
01940 // enqueue the message
01941 if (this->consolidate_enqueue_message (nqd) == -1)
01942 {
01943 return -1;
01944 }
01945
01946 if (message_block.length () > 0
01947 && this->handle_input_parse_extra_messages (message_block) == -1)
01948 {
01949 return -1;
01950 }
01951
01952 // In any case try to process the enqueued messages
01953 if (this->process_queue_head (rh) == -1)
01954 {
01955 return -1;
01956 }
01957 }
01958 else if (qd.missing_data_ != TAO_MISSING_DATA_UNDEFINED)
01959 {
01960 // Incomplete message, must be the last one in buffer
01961
01962 if (qd.missing_data_ != TAO_MISSING_DATA_UNDEFINED &&
01963 qd.missing_data_ > message_block.space ())
01964 {
01965 // Re-Allocate correct size on heap
01966 if (ACE_CDR::grow (qd.msg_block_,
01967 message_block.length ()
01968 + qd.missing_data_) == -1)
01969 {
01970 return -1;
01971 }
01972 }
01973
01974 TAO_Queued_Data *nqd =
01975 TAO_Queued_Data::duplicate (qd);
01976
01977 if (nqd == 0)
01978 {
01979 return -1;
01980 }
01981
01982 // move read-pointer to end of buffer
01983 message_block.rd_ptr (message_block.length());
01984
01985 this->incoming_message_stack_.push (nqd);
01986 }
01987 }
01988 else
01989 {
01990 //
01991 // critical path
01992 //
01993
01994 // We cant process the message on stack right now. First we
01995 // have got to parse extra messages from message_block,
01996 // putting them into queue. When this is done we can return
01997 // to process this message, and notifying other threads to
01998 // process the messages in queue.
01999
02000 char * end_marker = message_block.rd_ptr ()
02001 + mesg_length;
02002
02003 if (message_block.length () > mesg_length)
02004 {
02005 // There are more message in data stream to be parsed.
02006 // Safe the rd_ptr to restore later.
02007 char *rd_ptr_stack_mesg = message_block.rd_ptr ();
02008
02009 // Skip parsed message, jump to next message in buffer
02010 // PRE: mesg_length <= message_block.length ()
02011 message_block.rd_ptr (mesg_length);
02012
02013 // Extract remaining messages and enqueue them for later
02014 // heap processing
02015 if (this->handle_input_parse_extra_messages (message_block) == -1)
02016 {
02017 return -1;
02018 }
02019
02020 // correct the end_marker
02021 end_marker = message_block.rd_ptr ();
02022
02023 // Restore rd_ptr
02024 message_block.rd_ptr (rd_ptr_stack_mesg);
02025 }
02026
02027 // The following if-else has been copied from
02028 // process_queue_head(). While process_queue_head()
02029 // processes message on heap, here we will process a message
02030 // on stack.
02031
02032 // Now that we have one message on stack to be processed,
02033 // check whether we have one more message in the queue...
02034 if (this->incoming_message_queue_.queue_length () > 0)
02035 {
02036 if (TAO_debug_level > 0)
02037 {
02038 ACE_DEBUG ((LM_DEBUG,
02039 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02040 ACE_TEXT ("notify reactor\n"),
02041 this->id ()));
02042
02043 }
02044
02045 const int retval = this->notify_reactor ();
02046
02047 if (retval == 1)
02048 {
02049 // Let the class know that it doesn't need to resume the
02050 // handle..
02051 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02052 }
02053 else if (retval < 0)
02054 return -1;
02055 }
02056 else
02057 {
02058 // As there are no further messages in queue just resume
02059 // the handle. Set the flag incase someone had reset the flag..
02060 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02061 }
02062
02063 // PRE: incoming_message_queue is empty
02064 if (this->process_parsed_messages (&qd,
02065 rh) == -1)
02066 {
02067 return -1;
02068 }
02069
02070 // move the rd_ptr tp position of end_marker
02071 message_block.rd_ptr (end_marker);
02072 }
02073 }
02074
02075 // Now that all cases have been processed, there might be kept some data
02076 // in buffer that needs to be safed for next "handle_input" invocations.
02077 if (message_block.length () > 0)
02078 {
02079 if (this->partial_message_ == 0)
02080 {
02081 this->allocate_partial_message_block ();
02082 }
02083
02084 if (this->partial_message_ != 0 &&
02085 this->partial_message_->copy (message_block.rd_ptr (),
02086 message_block.length ()) == 0)
02087 {
02088 message_block.rd_ptr (message_block.length ());
02089 }
02090 else
02091 {
02092 return -1;
02093 }
02094 }
02095
02096 return 0;
02097 }
|
|
|
Is invoked by handle_input_parse_data. Parses all messages remaining in message_block. Definition at line 1628 of file Transport.cpp. References consolidate_enqueue_message(), TAO_Pluggable_Messaging::extract_next_message(), incoming_message_stack_, ACE_Message_Block::length(), messaging_object(), TAO_Queued_Data::missing_data_, and TAO::Incoming_Message_Stack::push(). Referenced by handle_input_parse_data().
01629 {
01630
01631 // store buffer status of last extraction: -1 parse error, 0
01632 // incomplete message header in buffer, 1 complete messages header
01633 // parsed
01634 int buf_status = 0;
01635
01636 TAO_Queued_Data *q_data = 0; // init
01637
01638 // parse buffer until all messages have been extracted, consolidate
01639 // and enqueue complete messages, if the last message being parsed
01640 // has missin data, it is stays on top of incoming_message_stack.
01641 while (message_block.length () > 0 &&
01642 (buf_status = this->messaging_object ()->extract_next_message
01643 (message_block, q_data)) != -1 &&
01644 q_data != 0) // paranoid check
01645 {
01646 if (q_data->missing_data_ == 0)
01647 {
01648 if (this->consolidate_enqueue_message (q_data) == -1)
01649 {
01650 return -1;
01651 }
01652 }
01653 else // incomplete message read, probably the last message in buffer
01654 {
01655 // can not fail
01656 this->incoming_message_stack_.push (q_data);
01657 }
01658
01659 q_data = 0; // reset
01660 } // while
01661
01662 if (buf_status == -1)
01663 {
01664 return -1;
01665 }
01666
01667 return 0;
01668 }
|
|
|
Callback method to reactively drain the outgoing data queue.
Definition at line 453 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, drain_queue(), LM_DEBUG, and TAO_debug_level. Referenced by TAO_Block_Flushing_Strategy::flush_message(), TAO_Block_Flushing_Strategy::flush_transport(), and TAO_Connection_Handler::handle_output_eh().
00454 {
00455 if (TAO_debug_level > 3)
00456 {
00457 ACE_DEBUG ((LM_DEBUG,
00458 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output\n"),
00459 this->id ()));
00460 }
00461
00462 // The flushing strategy (potentially via the Reactor) wants to send
00463 // more data, first check if there is a current message that needs
00464 // more sending...
00465 int const retval = this->drain_queue ();
00466
00467 if (TAO_debug_level > 3)
00468 {
00469 ACE_DEBUG ((LM_DEBUG,
00470 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ")
00471 ACE_TEXT ("drain_queue returns %d/%d\n"),
00472 this->id (),
00473 retval, errno));
00474 }
00475
00476 // Any errors are returned directly to the Reactor
00477 return retval;
00478 }
|
|
||||||||||||
|
Definition at line 775 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, current_deadline_, flush_timer_pending(), TAO_ORB_Core::flushing_strategy(), LM_DEBUG, orb_core(), reset_flush_timer(), TAO_Flushing_Strategy::schedule_output(), and TAO_debug_level. Referenced by TAO_Transport_Timer::handle_timeout().
00777 {
00778 if (TAO_debug_level > 6)
00779 {
00780 ACE_DEBUG ((LM_DEBUG,
00781 ACE_TEXT ("TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, ")
00782 ACE_TEXT ("timer expired\n"),
00783 this->id ()));
00784 }
00785
00786 /// This is the only legal ACT in the current configuration....
00787 if (act != &this->current_deadline_)
00788 {
00789 return -1;
00790 }
00791
00792 if (this->flush_timer_pending ())
00793 {
00794 // The timer is always a oneshot timer, so mark is as not
00795 // pending.
00796 this->reset_flush_timer ();
00797
00798 TAO_Flushing_Strategy *flushing_strategy =
00799 this->orb_core ()->flushing_strategy ();
00800 (void) flushing_strategy->schedule_output (this);
00801 }
00802
00803 return 0;
00804 }
|
|
|
Definition at line 92 of file Transport.inl.
00093 {
00094 this->id_ = id;
00095 }
|
|
|
Set and Get the identifier for this transport instance.
If not set, this will return an integer representation of the Definition at line 86 of file Transport.inl. Referenced by TAO::Transport_Cache_Manager::bind_i(), TAO_Connection_Handler::close_connection_eh(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Transport_Cache_Manager::find_transport(), TAO_Connection_Handler::handle_input_internal(), TAO_Connector::parallel_connect(), TAO_GIOP_Message_Lite::parse_reply(), TAO_GIOP_Message_Base::process_reply_message(), TAO::Transport_Cache_Manager::purge(), TAO_GIOP_Utils::read_buffer(), TAO_GIOP_Message_Base::send_close_connection(), TAO_GIOP_Message_Lite::send_error(), TAO_GIOP_Message_Base::send_error(), TAO_GIOP_Message_Lite::send_reply_exception(), TAO_Connector::wait_for_connection_completion(), and TAO_Leader_Follower::wait_for_event().
00087 {
00088 return this->id_;
00089 }
|
|
|
Request is sent and the reply is received. Idle the transport now. Definition at line 239 of file Transport.cpp. References TAO_Transport_Mux_Strategy::idle_after_reply(), and tms().
00240 {
00241 return this->tms ()->idle_after_reply ();
00242 }
|
|
|
Request has been just sent, but the reply is not received. Idle the transport now. Definition at line 233 of file Transport.cpp. References TAO_Transport_Mux_Strategy::idle_after_send(), and tms().
00234 {
00235 return this->tms ()->idle_after_send ();
00236 }
|
|
|
Is this transport really connected.
Definition at line 164 of file Transport.inl. References ACE_GUARD_RETURN, and is_connected_. Referenced by TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Synch_Oneway_Invocation::remote_oneway(), and TAO_Connector::wait_for_connection_completion().
00165 {
00166 ACE_GUARD_RETURN (ACE_Lock,
00167 ace_mon,
00168 *this->handler_lock_,
00169 false);
00170
00171 return this->is_connected_;
00172 }
|
|
|
Return true if the tcs has been set.
Definition at line 152 of file Transport.inl. References tcs_set_.
00153 {
00154 return tcs_set_;
00155 }
|
|
|
Cache management.
Definition at line 429 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, LM_DEBUG, TAO::Transport_Cache_Manager::make_idle(), TAO_debug_level, and transport_cache_manager(). Referenced by TAO_Exclusive_TMS::idle_after_reply(), TAO_Muxed_TMS::idle_after_send(), TAO_IIOP_Connection_Handler::process_listen_point_list(), and TAO::Profile_Transport_Resolver::~Profile_Transport_Resolver().
00430 {
00431 if (TAO_debug_level > 3)
00432 {
00433 ACE_DEBUG ((LM_DEBUG,
00434 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"),
00435 this->id ()));
00436 }
00437
00438 return this->transport_cache_manager ().make_idle (this->cache_map_entry_);
00439 }
|
|
||||||||||||
|
Initialising the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects. Implemented in TAO_IIOP_Transport. |
|
|
Return the messaging object that is used to format the data that needs to be sent. Implemented in TAO_IIOP_Transport. Referenced by allocate_partial_message_block(), consolidate_enqueue_message(), consolidate_process_message(), format_queue_message(), TAO_On_Demand_Fragmentation_Strategy::fragment(), generate_locate_request(), generate_request_header(), handle_input_parse_data(), handle_input_parse_extra_messages(), out_stream(), process_parsed_messages(), and send_connection_closed_notifications_i(). |
|
|
Definition at line 2277 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, event_handler_i(), TAO_Wait_Strategy::is_registered(), LM_DEBUG, ACE_Reactor::notify(), orb_core(), TAO_ORB_Core::reactor(), and TAO_debug_level. Referenced by handle_input_parse_data(), and process_queue_head().
02278 {
02279 if (!this->ws_->is_registered ())
02280 {
02281 return 0;
02282 }
02283
02284 ACE_Event_Handler *eh = this->event_handler_i ();
02285
02286 // Get the reactor associated with the event handler
02287 ACE_Reactor *reactor = this->orb_core ()->reactor ();
02288
02289 if (TAO_debug_level > 0)
02290 {
02291 ACE_DEBUG ((LM_DEBUG,
02292 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02293 ACE_TEXT ("notify to Reactor\n"),
02294 this->id ()));
02295 }
02296
02297
02298 // Send a notification to the reactor...
02299 const int retval = reactor->notify (eh,
02300 ACE_Event_Handler::READ_MASK);
02301
02302 if (retval < 0 && TAO_debug_level > 2)
02303 {
02304 // @@todo: need to think about what is the action that
02305 // we can take when we get here.
02306 ACE_DEBUG ((LM_DEBUG,
02307 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02308 ACE_TEXT ("notify to the reactor failed..\n"),
02309 this->id ()));
02310 }
02311
02312 return 1;
02313 }
|
|
|
Definition at line 51 of file Transport.inl. References opening_connection_role_.
00052 {
00053 this->opening_connection_role_ = role;
00054 }
|
|
|
Methods dealing with the role of the connection, e.g., CLIENT or SERVER. See CORBA 2.6 Specification, Section 15.5.1 for origin of definitions. Definition at line 45 of file Transport.inl. References opening_connection_role_. Referenced by TAO::Wait_On_LF_No_Upcall::can_process_upcalls(), TAO_Connector::connect(), and TAO_IIOP_Connection_Handler::open().
00046 {
00047 return this->opening_connection_role_;
00048 }
|
|
|
|
|
|
|
Accessor for the output CDR stream.
Definition at line 2364 of file Transport.cpp. References messaging_object(), and TAO_Pluggable_Messaging::out_stream(). Referenced by TAO::LocateRequest_Invocation::invoke(), and TAO::Synch_Oneway_Invocation::remote_oneway().
02365 {
02366 return this->messaging_object ()->out_stream ();
02367 }
|
|
|
Hooks that can be overridden in concrete transports. These hooks are invoked just after connection establishment (or after a connection is fetched from cache). The return value signifies whether the invoker should proceed with post connection establishment activities. Protocols like SSLIOP need this to verify whether connections already established have valid certificates. There are no pre_connect_hooks () since the transport doesn't exist before a connection establishment. :-)
Definition at line 287 of file Transport.cpp. Referenced by TAO_Connector::connect().
00288 {
00289 return true;
00290 }
|
|
|
Perform all the actions when this transport get opened.
Definition at line 2370 of file Transport.cpp. References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT, close_connection(), TAO_ORB_Core::flushing_strategy(), is_connected_, LM_ERROR, orb_core(), purge_entry(), queue_is_empty_i(), TAO_Wait_Strategy::register_handler(), TAO_Flushing_Strategy::schedule_output(), TAO_debug_level, and wait_strategy(). Referenced by TAO_IIOP_Connection_Handler::open().
02371 {
02372 this->id_ = id;
02373
02374 {
02375 ACE_GUARD_RETURN (ACE_Lock,
02376 ace_mon,
02377 *this->handler_lock_,
02378 false);
02379 this->is_connected_ = true;
02380 }
02381
02382 // When we have data in our outgoing queue schedule ourselves
02383 // for output
02384 if (this->queue_is_empty_i ())
02385 return true;
02386
02387 // If the wait strategy wants us to be registered with the reactor
02388 // then we do so. If registeration is required and it succeeds,
02389 // #REFCOUNT# becomes two.
02390 if (this->wait_strategy ()->register_handler () == 0)
02391 {
02392 TAO_Flushing_Strategy *flushing_strategy =
02393 this->orb_core ()->flushing_strategy ();
02394 (void) flushing_strategy->schedule_output (this);
02395 }
02396 else
02397 {
02398 // Registration failures.
02399
02400 // Purge from the connection cache, if we are not in the cache, this
02401 // just does nothing.
02402 (void) this->purge_entry ();
02403
02404 // Close the handler.
02405 (void) this->close_connection ();
02406
02407 if (TAO_debug_level > 0)
02408 ACE_ERROR ((LM_ERROR,
02409 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_connect , ")
02410 ACE_TEXT ("could not register the transport ")
02411 ACE_TEXT ("in the reactor.\n"),
02412 this->id ()));
02413
02414 return false;
02415 }
02416
02417 return true;
02418 }
|
|
||||||||||||
|
Process the message by sending it to the higher layers of the ORB. Definition at line 2101 of file Transport.cpp. References ACE_DEBUG, ACE_ERROR, ACE_TEXT, TAO_Pluggable_Messaging::discard_fragmented_message(), LM_DEBUG, LM_ERROR, messaging_object(), TAO_Queued_Data::missing_data_, TAO_Queued_Data::msg_type_, TAO_Pluggable_Messaging::process_reply_message(), TAO_Pluggable_Messaging::process_request_message(), TAO_Resume_Handle::resume_handle(), TAO_debug_level, TAO_PLUGGABLE_MESSAGE_CANCELREQUEST, TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION, TAO_PLUGGABLE_MESSAGE_LOCATEREPLY, TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST, TAO_PLUGGABLE_MESSAGE_MESSAGERROR, TAO_PLUGGABLE_MESSAGE_REPLY, TAO_PLUGGABLE_MESSAGE_REQUEST, and TAO_Pluggable_Message_Type. Referenced by consolidate_process_message(), handle_input_parse_data(), and process_queue_head().
02103 {
02104 if (TAO_debug_level > 7)
02105 {
02106 ACE_DEBUG ((LM_DEBUG,
02107 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02108 ACE_TEXT ("entering (missing data == %d)\n"),
02109 this->id(), qd->missing_data_));
02110 }
02111
02112 // Get the <message_type> that we have received
02113 const TAO_Pluggable_Message_Type t = qd->msg_type_;
02114
02115 // int result = 0;
02116
02117 if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION)
02118 {
02119 if (TAO_debug_level > 0)
02120 ACE_DEBUG ((LM_DEBUG,
02121 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02122 ACE_TEXT ("received CloseConnection message - %m\n"),
02123 this->id()));
02124
02125 // Return a "-1" so that the next stage can take care of
02126 // closing connection and the necessary memory management.
02127 return -1;
02128 }
02129 else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST ||
02130 t == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST)
02131 {
02132 // Let us resume the handle before we go ahead to process the
02133 // request. This will open up the handle for other threads.
02134 rh.resume_handle ();
02135
02136 if (this->messaging_object ()->process_request_message (
02137 this,
02138 qd) == -1)
02139 {
02140 // Return a "-1" so that the next stage can take care of
02141 // closing connection and the necessary memory management.
02142 return -1;
02143 }
02144 }
02145 else if (t == TAO_PLUGGABLE_MESSAGE_REPLY ||
02146 t == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY)
02147 {
02148 rh.resume_handle ();
02149
02150 TAO_Pluggable_Reply_Params params (this);
02151
02152 if (this->messaging_object ()->process_reply_message (params,
02153 qd) == -1)
02154 {
02155 if (TAO_debug_level > 0)
02156 ACE_DEBUG ((LM_DEBUG,
02157 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02158 ACE_TEXT ("error in process_reply_message - %m\n"),
02159 this->id ()));
02160
02161 return -1;
02162 }
02163
02164 }
02165 else if (t == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST)
02166 {
02167 // The associated request might be incomplpete residing
02168 // fragmented in messaging object. We must make sure the
02169 // resources allocated by fragments are released.
02170
02171 if (this->messaging_object ()->discard_fragmented_message (qd) == -1)
02172 {
02173 if (TAO_debug_level > 0)
02174 {
02175 ACE_ERROR ((LM_ERROR,
02176 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02177 ACE_TEXT ("error processing CancelRequest\n"),
02178 this->id ()));
02179 }
02180 }
02181
02182 // We are not able to cancel requests being processed already;
02183 // this is declared as optional feature by CORBA, and TAO does
02184 // not support this currently.
02185
02186 // Just continue processing, CancelRequest does not mean to cut
02187 // off the connection.
02188 }
02189 else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
02190 {
02191 if (TAO_debug_level > 0)
02192 {
02193 ACE_ERROR ((LM_ERROR,
02194 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02195 ACE_TEXT ("received MessageError, closing connection\n"),
02196 this->id ()));
02197 }
02198 return -1;
02199 }
02200
02201 // If not, just return back..
02202 return 0;
02203 }
|
|
|
Definition at line 2206 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_Incoming_Message_Queue::dequeue_head(), incoming_message_queue_, LM_DEBUG, notify_reactor(), process_parsed_messages(), TAO_Incoming_Message_Queue::queue_length(), TAO_Queued_Data::release(), TAO_Resume_Handle::set_flag(), and TAO_debug_level. Referenced by handle_input(), and handle_input_parse_data().
02207 {
02208 if (TAO_debug_level > 3)
02209 {
02210 ACE_DEBUG ((LM_DEBUG,
02211 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"),
02212 this->id (), this->incoming_message_queue_.queue_length () ));
02213 }
02214
02215 // See if message in queue ...
02216 if (this->incoming_message_queue_.queue_length () > 0)
02217 {
02218 // Get the message on the head of the queue..
02219 TAO_Queued_Data *qd =
02220 this->incoming_message_queue_.dequeue_head ();
02221
02222 if (TAO_debug_level > 3)
02223 {
02224 ACE_DEBUG ((LM_DEBUG,
02225 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02226 ACE_TEXT ("the size of the queue is [%d]\n"),
02227 this->id (),
02228 this->incoming_message_queue_.queue_length()));
02229 }
02230 // Now that we have pulled out out one message out of the queue,
02231 // check whether we have one more message in the queue...
02232 if (this->incoming_message_queue_.queue_length () > 0)
02233 {
02234 if (TAO_debug_level > 0)
02235 {
02236 ACE_DEBUG ((LM_DEBUG,
02237 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02238 ACE_TEXT ("notify reactor\n"),
02239 this->id ()));
02240
02241 }
02242
02243 const int retval = this->notify_reactor ();
02244
02245 if (retval == 1)
02246 {
02247 // Let the class know that it doesn't need to resume the
02248 // handle..
02249 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02250 }
02251 else if (retval < 0)
02252 return -1;
02253 }
02254 else
02255 {
02256 // As we are ready to process the last message just resume
02257 // the handle. Set the flag incase someone had reset the flag..
02258 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02259 }
02260
02261 // Process the message...
02262 if (this->process_parsed_messages (qd, rh) == -1)
02263 {
02264 return -1;
02265 }
02266
02267 // Delete the Queued_Data..
02268 TAO_Queued_Data::release (qd);
02269
02270 return 0;
02271 }
02272
02273 return 1;
02274 }
|
|
|
Called by the cache when the ORB is shuting down.
Definition at line 219 of file Transport.cpp. References add_reference(), TAO::Connection_Handler_Set, ACE_Unbounded_Set< T >::insert(), TAO_Wait_Strategy::non_blocking(), and opening_connection_role_.
00220 {
00221 if (this->ws_->non_blocking () ||
00222 this->opening_connection_role_ == TAO::TAO_SERVER_ROLE)
00223 return false;
00224
00225 (void) this->add_reference ();
00226
00227 h.insert (this->connection_handler_i ());
00228
00229 return true;
00230 }
|
|
|
Added event handler to the handlers set. Called by the cache when the cache is closing.
Definition at line 211 of file Transport.cpp. References add_reference(), TAO::Connection_Handler_Set, and ACE_Unbounded_Set< T >::insert().
00212 {
00213 (void) this->add_reference ();
00214
00215 handlers.insert (this->connection_handler_i ());
00216 }
|
|
|
Cache management.
Definition at line 423 of file Transport.cpp. References TAO::Transport_Cache_Manager::purge_entry(), and transport_cache_manager(). Referenced by TAO_Connection_Handler::close_connection_eh(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), post_open(), recache_transport(), and ~TAO_Transport().
00424 {
00425 return this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
00426 }
|
|
|
Definition at line 76 of file Transport.inl. References purging_order_.
00077 {
00078 // This should only be called by the Transport Cache Manager when
00079 // it is holding it's lock.
00080 // The transport should still be here since the cache manager still
00081 // has a reference to it.
00082 this->purging_order_ = value;
00083 }
|
|
|
Get and Set the purging order. The purging strategy uses the set version to set the purging order. Definition at line 70 of file Transport.inl. References purging_order_. Referenced by TAO_LRU_Connection_Purging_Strategy::update_item().
00071 {
00072 return this->purging_order_;
00073 }
|
|
|
Check if there are messages pending in the queue.
Definition at line 98 of file Transport.inl. References ACE_GUARD_RETURN, and queue_is_empty_i(). Referenced by TAO_Reactive_Flushing_Strategy::flush_transport(), TAO_Leader_Follower_Flushing_Strategy::flush_transport(), and TAO_Block_Flushing_Strategy::flush_transport().
00099 {
00100 ACE_GUARD_RETURN (ACE_Lock,
00101 ace_mon,
00102 *this->handler_lock_,
00103 -1);
00104 return this->queue_is_empty_i ();
00105 }
|
|
|
Check if there are messages pending in the queue. This version assumes that the lock is already held. Use with care!
Definition at line 715 of file Transport.cpp. Referenced by post_open(), queue_is_empty(), and TAO_Block_Flushing_Strategy::schedule_output().
00716 {
00717 return (this->head_ == 0);
00718 }
|
|
|
Queue a message for message_block.
Definition at line 1308 of file Transport.cpp. References ACE_NEW_RETURN, and TAO_Queued_Message::push_back(). Referenced by format_queue_message(), and send_asynchronous_message_i().
01309 {
01310 TAO_Queued_Message *queued_message = 0;
01311 ACE_NEW_RETURN (queued_message,
01312 TAO_Asynch_Queued_Message (message_block,
01313 this->orb_core_,
01314 0,
01315 1),
01316 -1);
01317 queued_message->push_back (this->head_, this->tail_);
01318
01319 return 0;
01320 }
|
|
|
Recache ourselves in the cache.
Definition at line 412 of file Transport.cpp. References TAO::Transport_Cache_Manager::cache_transport(), purge_entry(), and transport_cache_manager(). Referenced by TAO_IIOP_Connection_Handler::process_listen_point_list().
00413 {
00414 // First purge our entry
00415 this->purge_entry ();
00416
00417 // Then add ourselves to the cache
00418 return this->transport_cache_manager ().cache_transport (desc,
00419 this);
00420 }
|
|
||||||||||||||||
|
Read len bytes from into buf. This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.
Implemented in TAO_IIOP_Transport. Referenced by handle_input_missing_data(), handle_input_parse_data(), TAO_GIOP_Utils::read_buffer(), and TAO_GIOP_Utils::read_bytes_input(). |
|
|
Accessor to recv_buffer_size_.
Definition at line 181 of file Transport.inl.
00182 {
00183 return this->recv_buffer_size_;
00184 }
|
|
|
Register the handler with the reactor. Register the handler with the reactor. This method is used by the Wait_On_Reactor strategy. The transport must register its event handler with the ORB's Reactor.
Definition at line 299 of file Transport.cpp. References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT, event_handler_i(), TAO_Wait_Strategy::is_registered(), LM_DEBUG, ACE_Event_Handler::reactor(), TAO_ORB_Core::reactor(), ACE_Reactor::register_handler(), and TAO_debug_level. Referenced by TAO_Wait_On_Reactor::register_handler(), TAO_Wait_On_Leader_Follower::register_handler(), and TAO_Wait_On_Leader_Follower::sending_request().
00300 {
00301 if (TAO_debug_level > 4)
00302 {
00303 ACE_DEBUG ((LM_DEBUG,
00304 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"),
00305 this->id ()));
00306 }
00307
00308 ACE_Reactor * const r = this->orb_core_->reactor ();
00309
00310 // @@note: This should be okay since the register handler call will
00311 // not make a nested call into the transport.
00312 ACE_GUARD_RETURN (ACE_Lock,
00313 ace_mon,
00314 *this->handler_lock_,
00315 false);
00316
00317 if (r == this->event_handler_i ()->reactor ())
00318 {
00319 return 0;
00320 }
00321
00322 // Set the flag in the Connection Handler and in the Wait Strategy
00323 // @@Maybe we should set these flags after registering with the
00324 // reactor. What if the registration fails???
00325 this->ws_->is_registered (true);
00326
00327 // Register the handler with the reactor
00328 return r->register_handler (this->event_handler_i (),
00329 ACE_Event_Handler::READ_MASK);
00330 }
|
|
|
Initialising the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects. Definition at line 2358 of file Transport.cpp. References event_handler_i(), and ACE_Event_Handler::remove_reference(). Referenced by TAO_Connection_Handler::close_handler(), TAO::Cache_IntId::operator=(), TAO::Transport_Cache_Manager::purge(), TAO_Asynch_Reply_Dispatcher_Base::transport(), TAO::Cache_IntId::~Cache_IntId(), TAO::Profile_Transport_Resolver::~Profile_Transport_Resolver(), TAO_Asynch_Reply_Dispatcher_Base::~TAO_Asynch_Reply_Dispatcher_Base(), and TAO_Thread_Per_Connection_Handler::~TAO_Thread_Per_Connection_Handler().
02359 {
02360 return this->event_handler_i ()->remove_reference ();
02361 }
|
|
|
Print out error messages if the event handler is not valid.
Definition at line 1106 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, ACE_TEXT_CHAR_TO_TCHAR, LM_DEBUG, and TAO_debug_level.
01107 {
01108 if (TAO_debug_level > 0)
01109 {
01110 ACE_DEBUG ((LM_DEBUG,
01111 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler")
01112 ACE_TEXT ("(%s) no longer associated with handler [tag=%d]\n"),
01113 this->id (), ACE_TEXT_CHAR_TO_TCHAR (caller), this->tag_));
01114 }
01115 }
|
|
|
The flush timer expired or was explicitly cancelled, mark it as not pending Definition at line 114 of file Transport.inl. References current_deadline_, and flush_timer_id_. Referenced by drain_queue_i(), and handle_timeout().
00115 {
00116 this->flush_timer_id_ = -1;
00117 this->current_deadline_ = ACE_Time_Value::zero;
00118 }
|
|
|
Schedule handle_output() callbacks.
Definition at line 722 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, event_handler_i(), ACE_Reactor::find_handler(), ACE_Event_Handler::get_handle(), LM_DEBUG, ACE_Event_Handler::reactor(), ACE_Event_Handler::remove_reference(), ACE_Reactor::schedule_wakeup(), and TAO_debug_level. Referenced by TAO_Reactive_Flushing_Strategy::schedule_output(), and TAO_Leader_Follower_Flushing_Strategy::schedule_output().
00723 {
00724 ACE_Event_Handler *eh = this->event_handler_i ();
00725 ACE_Reactor *reactor = eh->reactor ();
00726
00727 // Check to see if our event handler is still registered with the
00728 // reactor. It's possible for another thread to have run close_connection()
00729 // since we last used the event handler.
00730 ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ());
00731 if (found != eh)
00732 {
00733 if(TAO_debug_level > 3)
00734 {
00735 ACE_DEBUG ((LM_DEBUG,
00736 "TAO (%P|%t) - Transport[%d]::schedule_output_i "
00737 "event handler not found in reactor, returning -1\n",
00738 this->id ()));
00739 }
00740 if (found)
00741 {
00742 found->remove_reference ();
00743 }
00744 return -1;
00745 }
00746 found->remove_reference ();
00747
00748 if (TAO_debug_level > 3)
00749 {
00750 ACE_DEBUG ((LM_DEBUG,
00751 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"),
00752 this->id ()));
00753 }
00754
00755 return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00756 }
|
|
||||||||||||||||||||
|
Write the complete Message_Block chain to the connection. This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently. Often the implementation simply forwards the arguments to the underlying ACE_Svc_Handler class. Using the code factored out into ACE. Be careful with protocols that perform non-trivial transformations of the data, such as SSLIOP or protocols that compress the stream.
ENOENT.
Implemented in TAO_IIOP_Transport. Referenced by drain_queue_helper(). |
|
||||||||||||||||
|
Send an asynchronous message, i.e. do not block until the message is on the wire Definition at line 1161 of file Transport.cpp. References ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT, check_buffering_constraints_i(), ACE_Message_Block::cont(), ETIME, EWOULDBLOCK, TAO_Flushing_Strategy::flush_transport(), TAO_ORB_Core::flushing_strategy(), ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO::Transport_Queueing_Strategy::must_queue(), orb_core(), queue_message_i(), TAO_Flushing_Strategy::schedule_output(), send_message_block_chain_i(), ssize_t, TAO_debug_level, ACE_Message_Block::total_length(), and TAO_Stub::transport_queueing_strategy(). Referenced by send_message_shared_i().
01164 {
01165 // Let's figure out if the message should be queued without trying
01166 // to send first:
01167 bool try_sending_first = true;
01168
01169 const bool queue_empty = (this->head_ == 0);
01170
01171 if (!queue_empty)
01172 {
01173 try_sending_first = false;
01174 }
01175 else if (stub->transport_queueing_strategy ().must_queue (queue_empty))
01176 {
01177 try_sending_first = false;
01178 }
01179
01180 if (try_sending_first)
01181 {
01182 ssize_t n = 0;
01183 size_t byte_count = 0;
01184 // ... in this case we must try to send the message first ...
01185
01186 const size_t total_length = message_block->total_length ();
01187
01188 if (TAO_debug_level > 6)
01189 {
01190 ACE_DEBUG ((LM_DEBUG,
01191 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01192 ACE_TEXT ("trying to send the message (ml = %d)\n"),
01193 this->id (), total_length));
01194 }
01195
01196 // @@ I don't think we want to hold the mutex here, however if
01197 // we release it we need to recheck the status of the transport
01198 // after we return... once I understand the final form for this
01199 // code I will re-visit this decision
01200 n = this->send_message_block_chain_i (message_block,
01201 byte_count,
01202 max_wait_time);
01203 if (n == -1)
01204 {
01205 // ... if this is just an EWOULDBLOCK we must schedule the
01206 // message for later, if it is ETIME we still have to send
01207 // the complete message, because cutting off the message at
01208 // this point will destroy the synchronization with the
01209 // server ...
01210 if (errno != EWOULDBLOCK && errno != ETIME)
01211 {
01212 if (TAO_debug_level > 0)
01213 {
01214 ACE_ERROR ((LM_ERROR,
01215 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01216 ACE_TEXT ("fatal error in ")
01217 ACE_TEXT ("send_message_block_chain_i - %m\n"),
01218 this->id ()));
01219 }
01220 return -1;
01221 }
01222 }
01223
01224 // ... let's figure out if the complete message was sent ...
01225 if (total_length == byte_count)
01226 {
01227 // Done, just return. Notice that there are no allocations
01228 // or copies up to this point (though some fancy calling
01229 // back and forth).
01230 // This is the common case for the critical path, it should
01231 // be fast.
01232 return 0;
01233 }
01234
01235 if (TAO_debug_level > 6)
01236 {
01237 ACE_DEBUG ((LM_DEBUG,
01238 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01239 ACE_TEXT ("partial send %d / %d bytes\n"),
01240 this->id (), byte_count, total_length));
01241 }
01242
01243 // ... part of the data was sent, need to figure out what piece
01244 // of the message block chain must be queued ...
01245 while (message_block != 0 && message_block->length () == 0)
01246 {
01247 message_block = message_block->cont ();
01248 }
01249
01250 // ... at least some portion of the message block chain should
01251 // remain ...
01252 }
01253
01254 // ... either the message must be queued or we need to queue it
01255 // because it was not completely sent out ...
01256
01257 if (TAO_debug_level > 6)
01258 {
01259 ACE_DEBUG ((LM_DEBUG,
01260 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01261 ACE_TEXT ("message is queued\n"),
01262 this->id ()));
01263 }
01264
01265 if (this->queue_message_i(message_block) == -1)
01266 {
01267 if (TAO_debug_level > 0)
01268 ACE_DEBUG ((LM_DEBUG,
01269 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01270 ACE_TEXT ("cannot queue message for ")
01271 ACE_TEXT (" - %m\n"),
01272 this->id ()));
01273 return -1;
01274 }
01275
01276 // ... if the queue is full we need to activate the output on the
01277 // queue ...
01278 bool must_flush = false;
01279 const bool constraints_reached =
01280 this->check_buffering_constraints_i (stub,
01281 must_flush);
01282
01283 // ... but we also want to activate it if the message was partially
01284 // sent.... Plus, when we use the blocking flushing strategy the
01285 // queue is flushed as a side-effect of 'schedule_output()'
01286
01287 TAO_Flushing_Strategy *flushing_strategy =
01288 this->orb_core ()->flushing_strategy ();
01289
01290 if (constraints_reached || try_sending_first)
01291 {
01292 (void) flushing_strategy->schedule_output (this);
01293 }
01294
01295 if (must_flush)
01296 {
01297 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
01298 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
01299 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
01300
01301 (void) flushing_strategy->flush_transport (this);
01302 }
01303
01304 return 0;
01305 }
|
|
|
Notify all the components inside a Transport when the underlying connection is closed. Definition at line 1118 of file Transport.cpp. References ACE_GUARD, TAO_Transport_Mux_Strategy::connection_closed(), send_connection_closed_notifications_i(), and tms(). Referenced by TAO_Connection_Handler::close_connection_eh().
01119 {
01120 {
01121 ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
01122
01123 this->send_connection_closed_notifications_i ();
01124 }
01125
01126 this->tms ()->connection_closed ();
01127 }
|
|
|
Assume the lock is held.
Definition at line 1130 of file Transport.cpp. References cleanup_queue_i(), messaging_object(), and TAO_Pluggable_Messaging::reset(). Referenced by send_connection_closed_notifications().
01131 {
01132 this->cleanup_queue_i ();
01133
01134 this->messaging_object ()->reset ();
01135 }
|
|
||||||||||||||||||||
|
Once the ORB is prepared to receive a reply (see send_request() above), and all the arguments have been marshaled the CDR stream must be 'formatted', i.e. the message_size field in the GIOP header can finally be set to the proper value. Implemented in TAO_IIOP_Transport. Referenced by TAO_On_Demand_Fragmentation_Strategy::fragment(), TAO_GIOP_Message_Lite::make_send_locate_reply(), TAO_GIOP_Message_Base::make_send_locate_reply(), TAO_GIOP_Message_Lite::process_request(), TAO_GIOP_Message_Base::process_request(), TAO_ServerRequest::send_cached_reply(), TAO_ServerRequest::send_no_exception_reply(), TAO_GIOP_Message_Lite::send_reply_exception(), TAO_GIOP_Message_Base::send_reply_exception(), TAO_ServerRequest::tao_send_reply(), and TAO_ServerRequest::tao_send_reply_exception(). |
|
||||||||||||||||
|
Send a message block chain,.
Definition at line 490 of file Transport.cpp. References ACE_GUARD_RETURN, and send_message_block_chain_i(). Referenced by TAO_GIOP_Message_Base::send_close_connection(), TAO_GIOP_Message_Lite::send_error(), and TAO_GIOP_Message_Base::send_error().
00493 {
00494 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00495
00496 return this->send_message_block_chain_i (mb,
00497 bytes_transferred,
00498 max_wait_time);
00499 }
|
|
||||||||||||||||
|
Send a message block chain, assuming the lock is held.
Definition at line 502 of file Transport.cpp. References drain_queue_i(), TAO_Synch_Queued_Message::message_length(), TAO_Queued_Message::push_back(), TAO_Queued_Message::remove_from_list(), and ACE_Message_Block::total_length(). Referenced by send_asynchronous_message_i(), and send_message_block_chain().
00505 {
00506 size_t const total_length = mb->total_length ();
00507
00508 // We are going to block, so there is no need to clone
00509 // the message block.
00510 TAO_Synch_Queued_Message synch_message (mb,
00511 this->orb_core_);
00512
00513 synch_message.push_back (this->head_, this->tail_);
00514
00515 int const n = this->drain_queue_i ();
00516
00517 if (n == -1)
00518 {
00519 synch_message.remove_from_list (this->head_, this->tail_);
00520 return -1; // Error while sending...
00521 }
00522 else if (n == 1)
00523 {
00524 bytes_transferred = total_length;
00525 return 1; // Empty queue, message was sent..
00526 }
00527
00528 // Remove the temporary message from the queue...
00529 synch_message.remove_from_list (this->head_, this->tail_);
00530
00531 bytes_transferred =
00532 total_length - synch_message.message_length ();
00533
00534 return 0;
00535 }
|
|
||||||||||||||||||||
|
Sent the contents of message_block.
Reimplemented in TAO_IIOP_Transport. Definition at line 261 of file Transport.cpp. References ACE_GUARD_RETURN, close_connection(), and send_message_shared_i().
00265 {
00266 int result = 0;
00267
00268 {
00269 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00270
00271 result =
00272 this->send_message_shared_i (stub, message_semantics,
00273 message_block, max_wait_time);
00274 }
00275
00276 if (result == -1)
00277 {
00278 this->close_connection ();
00279 }
00280
00281 return result;
00282 }
|
|
||||||||||||||||||||
|
Implement send_message_shared() assuming the handler_lock_ is held. Definition at line 1138 of file Transport.cpp. References send_asynchronous_message_i(), send_reply_message_i(), send_synchronous_message_i(), TAO_ONEWAY_REQUEST, TAO_REPLY, and TAO_TWOWAY_REQUEST. Referenced by send_message_shared(), and TAO_IIOP_Transport::send_message_shared().
01142 {
01143 switch (message_semantics)
01144 {
01145 case TAO_Transport::TAO_TWOWAY_REQUEST:
01146 return this->send_synchronous_message_i (message_block,
01147 max_wait_time);
01148 case TAO_Transport::TAO_REPLY:
01149 return this->send_reply_message_i (message_block,
01150 max_wait_time);
01151 case TAO_Transport::TAO_ONEWAY_REQUEST:
01152 return this->send_asynchronous_message_i (stub,
01153 message_block,
01154 max_wait_time);
01155 }
01156
01157 return -1;
01158 }
|
|
||||||||||||
|
Send a reply message, i.e. do not block until the message is on the wire, but just return after adding them to the queue. Definition at line 630 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_Synch_Queued_Message::clone(), TAO_Queued_Message::destroy(), TAO_ORB_Core::flushing_strategy(), LM_DEBUG, orb_core(), TAO_Queued_Message::push_back(), TAO_Queued_Message::remove_from_list(), TAO_Flushing_Strategy::schedule_output(), send_synch_message_helper_i(), and TAO_debug_level. Referenced by send_message_shared_i().
00632 {
00633 // Dont clone now.. We could be sent in one shot!
00634 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00635
00636 synch_message.push_back (this->head_,
00637 this->tail_);
00638
00639 int const n =
00640 this->send_synch_message_helper_i (synch_message,
00641 max_wait_time);
00642
00643 if (n == -1 || n == 1)
00644 {
00645 return n;
00646 }
00647
00648 if (TAO_debug_level > 3)
00649 {
00650 ACE_DEBUG ((LM_DEBUG,
00651 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ")
00652 ACE_TEXT ("preparing to add to queue before leaving \n"),
00653 this->id ()));
00654 }
00655
00656 // Till this point we shouldn't have any copying and that is the
00657 // point anyway. Now, remove the node from the list
00658 synch_message.remove_from_list (this->head_,
00659 this->tail_);
00660
00661 // Clone the node that we have.
00662 TAO_Queued_Message *msg =
00663 synch_message.clone (this->orb_core_->transport_message_buffer_allocator ());
00664
00665 // Stick it in the queue
00666 msg->push_back (this->head_,
00667 this->tail_);
00668
00669 TAO_Flushing_Strategy *flushing_strategy =
00670 this->orb_core ()->flushing_strategy ();
00671
00672 int result = flushing_strategy->schedule_output (this);
00673
00674 if (result == -1)
00675 {
00676 if (TAO_debug_level > 5)
00677 {
00678 ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_"
00679 "message_i dequeuing msg due to schedule_output "
00680 "failure\n", this->id ()));
00681 }
00682 msg->remove_from_list (this->head_, this->tail_);
00683 msg->destroy ();
00684 }
00685
00686 return 1;
00687 }
|
|
||||||||||||||||||||||||
|
Preparing the ORB to receive the reply only once the request is completely sent opens the system to some subtle race conditions: suppose the ORB is running in a multi-threaded configuration, thread A makes a request while thread B is using the Reactor to process all incoming requests. Thread A could be implemented as follows: 1) send the request 2) setup the ORB to receive the reply 3) wait for the request but in this case thread B may receive the reply between step (1) and (2), and drop it as an invalid or unexpected message. Consequently the correct implementation is: 1) setup the ORB to receive the reply 2) send the request 3) wait for the reply The following method encapsulates this idiom.
Implemented in TAO_IIOP_Transport. Referenced by TAO::Remote_Invocation::send_message(). |
|
||||||||||||
|
A helper method used by send_synchronous_message_i() and send_reply_message_i(). Reusable code that could be used by both the methods. Definition at line 690 of file Transport.cpp. References TAO_Synch_Queued_Message::all_data_sent(), drain_queue_i(), and TAO_Queued_Message::remove_from_list(). Referenced by send_reply_message_i(), and send_synchronous_message_i().
00692 {
00693 // @@todo: Need to send timeouts for writing..
00694 int const n = this->drain_queue_i ();
00695
00696 if (n == -1)
00697 {
00698 synch_message.remove_from_list (this->head_, this->tail_);
00699 return -1; // Error while sending...
00700 }
00701 else if (n == 1)
00702 {
00703 return 1; // Empty queue, message was sent..
00704 }
00705
00706 if (synch_message.all_data_sent ())
00707 {
00708 return 1;
00709 }
00710
00711 return 0;
00712 }
|
|
||||||||||||
|
Send a synchronous message, i.e. block until the message is on the wire Definition at line 538 of file Transport.cpp. References ACE_ERROR, ACE_GUARD_RETURN, ACE_NEW_RETURN, ACE_TEXT, TAO_Synch_Queued_Message::current_block(), ETIME, TAO_Flushing_Strategy::flush_message(), TAO_ORB_Core::flushing_strategy(), LM_ERROR, orb_core(), TAO_Queued_Message::push_back(), TAO_Queued_Message::push_front(), TAO_Queued_Message::remove_from_list(), TAO_Flushing_Strategy::schedule_output(), send_synch_message_helper_i(), and TAO_debug_level. Referenced by send_message_shared_i().
00540 {
00541 // We are going to block, so there is no need to clone
00542 // the message block.
00543 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00544
00545 // Push synch_message on to the back of the queue.
00546 synch_message.push_back (this->head_, this->tail_);
00547
00548 int const n =
00549 this->send_synch_message_helper_i (synch_message,
00550 max_wait_time);
00551
00552 if (n == -1 || n == 1)
00553 {
00554 return n;
00555 }
00556
00557 // @todo: Check for timeouts!
00558 // if (max_wait_time != 0 && errno == ETIME) return -1;
00559 TAO_Flushing_Strategy *flushing_strategy =
00560 this->orb_core ()->flushing_strategy ();
00561 (void) flushing_strategy->schedule_output (this);
00562
00563 // Release the mutex, other threads may modify the queue as we
00564 // block for a long time writing out data.
00565 int result;
00566 {
00567 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00568 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00569 ACE_GUARD_RETURN (TAO_REVERSE_LOCK,
00570 ace_mon,
00571 reverse,
00572 -1);
00573
00574 result = flushing_strategy->flush_message (this,
00575 &synch_message,
00576 max_wait_time);
00577 }
00578
00579 if (result == -1)
00580 {
00581 synch_message.remove_from_list (this->head_, this->tail_);
00582
00583 if (errno == ETIME)
00584 {
00585 if (this->head_ == &synch_message)
00586 {
00587 // This is a timeout, there is only one nasty case: the
00588 // message has been partially sent! We simply cannot take
00589 // the message out of the queue, because that would corrupt
00590 // the connection.
00591 //
00592 // What we do is replace the queued message with an
00593 // asynchronous message, that contains only what remains of
00594 // the timed out request. If you think about sending
00595 // CancelRequests in this case: there is no much point in
00596 // doing that: the receiving ORB would probably ignore it,
00597 // and figuring out the request ID would be a bit of a
00598 // nightmare.
00599 //
00600
00601 synch_message.remove_from_list (this->head_, this->tail_);
00602 TAO_Queued_Message *queued_message = 0;
00603 ACE_NEW_RETURN (queued_message,
00604 TAO_Asynch_Queued_Message (
00605 synch_message.current_block (),
00606 this->orb_core_,
00607 0,
00608 1),
00609 -1);
00610 queued_message->push_front (this->head_, this->tail_);
00611 }
00612 }
00613
00614 if (TAO_debug_level > 0)
00615 {
00616 ACE_ERROR ((LM_ERROR,
00617 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ")
00618 ACE_TEXT ("error while flushing message - %m\n"),
00619 this->id ()));
00620 }
00621
00622 return -1;
00623 }
00624
00625 return 1;
00626 }
|
|
|
Accessor to sent_byte_count_.
Definition at line 187 of file Transport.inl. References sent_byte_count_.
00188 {
00189 return this->sent_byte_count_;
00190 }
|
|
|
Return the protocol tag. The OMG assigns unique tags (a 32-bit unsigned number) to each protocol. New protocol tags can be obtained free of charge from the OMG, check the documents in corbafwd.h for more details. Definition at line 8 of file Transport.inl.
00009 {
00010 return this->tag_;
00011 }
|
|
|
Extracts the list of listen points from the cdr stream. The list would have the protocol specific details of the ListenPoints Reimplemented in TAO_IIOP_Transport. Definition at line 255 of file Transport.cpp. References ACE_NOTSUP_RETURN. Referenced by TAO_GIOP_Message_Generator_Parser_12::process_bidir_context().
00256 {
00257 ACE_NOTSUP_RETURN (-1);
00258 }
|
|
|
Get the TAO_Tranport_Mux_Strategy used by this object. The role of the TAO_Transport_Mux_Strategy is described in more detail in that class' documentation. Enough is to say that the class is used to control how many threads can have pending requests over the same connection. Multiplexing multiple threads over the same connection conserves resources and is almost required for AMI, but having only one pending request per connection is more efficient and reduces the possibilities of priority inversions. Definition at line 20 of file Transport.inl. Referenced by idle_after_reply(), idle_after_send(), TAO::LocateRequest_Invocation_Adapter::invoke(), TAO::Invocation_Adapter::invoke_remote_i(), TAO_GIOP_Message_Lite::parse_reply(), TAO_GIOP_Message_Base::process_reply_message(), and send_connection_closed_notifications().
00021 {
00022 return tms_;
00023 }
|
|
|
Helper method that returns the Transport Cache Manager.
Definition at line 2316 of file Transport.cpp. References TAO_ORB_Core::lane_resources(), and TAO_Thread_Lane_Resources::transport_cache(). Referenced by make_idle(), purge_entry(), recache_transport(), and update_transport().
02317 {
02318 return this->orb_core_->lane_resources ().transport_cache ();
02319 }
|
|
|
Cache management.
Definition at line 442 of file Transport.cpp. References transport_cache_manager(), and TAO::Transport_Cache_Manager::update_entry(). Referenced by TAO_Connection_Handler::handle_input_internal(), TAO_Connection_Handler::handle_output_eh(), and TAO_Connection_Handler::svc_i().
00443 {
00444 return this->transport_cache_manager ().update_entry (this->cache_map_entry_);
00445 }
|
|
|
Return the TAO_Wait_Strategy used by this object. The role of the TAO_Wait_Strategy is described in more detail in that class' documentation. Enough is to say that the ORB can wait for a reply blocking on read(), using the Reactor to wait for multiple events concurrently or using the Leader/Followers protocol. Definition at line 27 of file Transport.inl. Referenced by TAO_Connection_Handler::close_connection_eh(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Transport_Cache_Manager::find_transport(), TAO_Connection_Handler::handle_input_eh(), TAO_IIOP_Connection_Handler::open(), and post_open().
00028 {
00029 return this->ws_;
00030 }
|
|
|
CodeSet negotiation - Set the wchar codeset translator factory.
Definition at line 144 of file Transport.inl. References tcs_set_.
00145 {
00146 this->wchar_translator_ = tf;
00147 this->tcs_set_ = 1;
00148 }
|
|
|
CodeSet Negotiation - Get the wchar codeset translator factory.
Definition at line 131 of file Transport.inl.
00132 {
00133 return this->wchar_translator_;
00134 }
|
|
|
This class needs priviledged access to Definition at line 785 of file Transport.h. |
|
|
Definition at line 803 of file Transport.h. |
|
|
These classes need privileged access to: Definition at line 802 of file Transport.h. |
|
|
Needs priveleged access to event_handler_i () Definition at line 807 of file Transport.h. |
|
|
Have we sent any info on bidirectional information or have we received any info regarding making the connection served by this transport bidirectional. The flag is used as follows: + We dont want to send the bidirectional context info more than once on the connection. Why? Waste of marshalling and demarshalling time on the client. + On the server side -- once a client that has established the connection asks the server to use the connection both ways, we *dont* want the server to pack service info to the client. That is not allowed. We need a flag to prevent such a things from happening. The value of this flag will be 0 if the client sends info and 1 if the server receives the info. Definition at line 958 of file Transport.h. Referenced by bidirectional_flag(). |
|
|
Our entry in the cache. We don't own this. It is here for our convenience. We cannot just change things around. Definition at line 930 of file Transport.h. Referenced by cache_map_entry(). |
|
|
Additional member values required to support codeset translation. @Phil, I think it would be nice if we could think of a way to do the following. We have been trying to use the transport for marking about translator factories and such! IMHO this is a wrong encapulation ie. trying to populate the transport object with these details. We should probably have a class something like TAO_Message_Property or TAO_Message_Translator or whatever (I am sure you get the idea) and encapsulate all these details. Coupling these seems odd. if I have to be more cynical we can move this to the connection_handler and it may more sense with the DSCP stuff around there. Do you agree? Definition at line 1033 of file Transport.h. |
|
|
The queue will start draining no later than if* the deadline is Definition at line 975 of file Transport.h. Referenced by check_buffering_constraints_i(), handle_timeout(), and reset_flush_timer(). |
|
|
First_request_ is true until the first request is sent or received. This is necessary since codeset context information is necessary only on the first request. After that, the translators are fixed for the life of the connection. Definition at line 1045 of file Transport.h. Referenced by first_request_sent(), and generate_request_header(). |
|
|
The timer ID.
Definition at line 978 of file Transport.h. Referenced by check_buffering_constraints_i(), flush_timer_pending(), and reset_flush_timer(). |
|
|
This is an Definition at line 992 of file Transport.h. Referenced by ~TAO_Transport(). |
|
|
Implement the outgoing data queue.
Definition at line 963 of file Transport.h. |
|
|
A unique identifier for the transport. This never *never* changes over the lifespan, so we don't have to worry about locking it. HINT: Protocol-specific transports that use connection handler might choose to set this to the handle for their connection. Definition at line 1002 of file Transport.h. |
|
|
Queue of the consolidated, incoming messages..
Definition at line 967 of file Transport.h. Referenced by consolidate_enqueue_message(), handle_input_parse_data(), and process_queue_head(). |
|
|
Stack of incoming fragments, consolidated messages are going to be enqueued in "incoming_message_queue_" Definition at line 971 of file Transport.h. Referenced by handle_input(), handle_input_missing_data(), handle_input_parse_data(), and handle_input_parse_extra_messages(). |
|
|
Is this transport really connected or not. In case of oneways with SYNC_NONE Policy we don't wait until the connection is ready and we buffer the requests in this transport until the connection is ready Definition at line 1016 of file Transport.h. Referenced by is_connected(), post_open(), and ~TAO_Transport(). |
|
|
Definition at line 960 of file Transport.h. Referenced by opened_as(), and provide_blockable_handler(). |
|
|
Global orbcore resource.
Definition at line 926 of file Transport.h. Referenced by TAO_GIOP_Message_Lite::process_locate_request(), TAO_GIOP_Message_Base::process_locate_request(), TAO_GIOP_Message_Lite::process_request(), and TAO_GIOP_Message_Base::process_request(). |
|
|
Holds the partial GIOP message (if there is one).
Definition at line 1048 of file Transport.h. Referenced by allocate_partial_message_block(), and handle_input_parse_data(). |
|
|
Used by the LRU, LFU and FIFO Connection Purging Strategies.
Definition at line 1005 of file Transport.h. Referenced by purging_order(). |
|
|
Size of the buffer received.
Definition at line 1008 of file Transport.h. |
|
|
Number of bytes sent.
Definition at line 1011 of file Transport.h. Referenced by drain_queue_helper(), drain_queue_i(), and sent_byte_count(). |
|
|
IOP protocol tag.
Definition at line 923 of file Transport.h. |
|
|
Definition at line 964 of file Transport.h. |
|
|
The tcs_set_ flag indicates that negotiation has occured and so the translators are correct, since a null translator is valid if both ends are using the same codeset, whatever that codeset might be. Definition at line 1039 of file Transport.h. Referenced by char_translator(), is_tcs_set(), and wchar_translator(). |
|
|
Strategy to decide whether multiple requests can be sent over the same connection or the connection is exclusive for a request. Definition at line 934 of file Transport.h. |
|
|
The adapter used to receive timeout callbacks from the Reactor.
Definition at line 981 of file Transport.h. |
|
|
Definition at line 1034 of file Transport.h. |
|
|
Strategy for waiting for the reply after sending the request.
Definition at line 937 of file Transport.h. |
1.3.6