TAO_Transport Class Reference

Generic definitions for the Transport class. More...

#include <Transport.h>

Inheritance diagram for TAO_Transport:

Inheritance graph
[legend]
Collaboration diagram for TAO_Transport:

Collaboration graph
[legend]
List of all members.

Template methods

The Transport class uses the Template Method Pattern to implement the protocol specific functionality. Implementors of a pluggable protocol should override the following methods with the semantics documented below.

enum  { TAO_ONEWAY_REQUEST = 0, TAO_TWOWAY_REQUEST = 1, TAO_REPLY }
virtual ACE_Event_Handlerevent_handler_i (void)=0
bool is_connected (void) const
 Is this transport really connected.

bool post_open (size_t id)
 Perform all the actions when this transport get opened.

void pre_close (void)
 do what needs to be done when closing the transport

TAO_Connection_Handlerconnection_handler (void)
 Get the connection handler for this transport.

TAO_OutputCDRout_stream (void)
 Accessor for the output CDR stream.

int generate_locate_request (TAO_Target_Specification &spec, TAO_Operation_Details &opdetails, TAO_OutputCDR &output)
virtual int generate_request_header (TAO_Operation_Details &opd, TAO_Target_Specification &spec, TAO_OutputCDR &msg)
int recache_transport (TAO_Transport_Descriptor_Interface *desc)
 Recache ourselves in the cache.

virtual int handle_input (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time=0)
 Callback to read incoming data.

virtual int send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, int message_semantics, ACE_Time_Value *max_time_wait)=0
virtual int send_message (TAO_OutputCDR &stream, TAO_Stub *stub=0, int message_semantics=TAO_Transport::TAO_TWOWAY_REQUEST, ACE_Time_Value *max_time_wait=0)=0
virtual int send_message_shared (TAO_Stub *stub, int message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
 Sent the contents of message_block.

int format_queue_message (TAO_OutputCDR &stream, ACE_Time_Value *max_wait_time)
int send_message_block_chain (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time=0)
 Send a message block chain,.

int send_message_block_chain_i (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time)
 Send a message block chain, assuming the lock is held.

int purge_entry (void)
 Cache management.

int make_idle (void)
 Cache management.

int update_transport (void)
 Cache management.

int handle_timeout (const ACE_Time_Value &current_time, const void *act)
size_t recv_buffer_size (void) const
 Accessor to recv_buffer_size_.

size_t sent_byte_count (void) const
 Accessor to sent_byte_count_.

TAO_Codeset_Translator_Basechar_translator (void) const
 CodeSet Negotiation - Get the char codeset translator factory.

TAO_Codeset_Translator_Basewchar_translator (void) const
 CodeSet Negotiation - Get the wchar codeset translator factory.

void char_translator (TAO_Codeset_Translator_Base *)
 CodeSet negotiation - Set the char codeset translator factory.

void wchar_translator (TAO_Codeset_Translator_Base *)
 CodeSet negotiation - Set the wchar codeset translator factory.

void assign_translators (TAO_InputCDR *, TAO_OutputCDR *)
void clear_translators (TAO_InputCDR *, TAO_OutputCDR *)
CORBA::Boolean is_tcs_set () const
 Return true if the tcs has been set.

void first_request_sent ()
 Set the state of the first_request_ flag to 0.

void send_connection_closed_notifications (void)
TAO::Transport::Statsstats (void) const
 Transport statistics.

virtual TAO_Connection_Handlerconnection_handler_i (void)=0
int process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh)
int send_message_shared_i (TAO_Stub *stub, int message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
int queue_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
CORBA::ULong const  tag_
 IOP protocol tag.

TAO_ORB_Core *const  orb_core_
 Global orbcore resource.

TAO::Transport_Cache_Manager::HASH_MAP_ENTRYcache_map_entry_
TAO_Transport_Mux_Strategytms_
TAO_Wait_Strategyws_
 Strategy for waiting for the reply after sending the request.

int bidirectional_flag_
TAO::Connection_Role opening_connection_role_
TAO_Queued_Messagehead_
 Implement the outgoing data queue.

TAO_Queued_Messagetail_
TAO_Incoming_Message_Queue incoming_message_queue_
 Queue of the consolidated, incoming messages..

TAO::Incoming_Message_Stack incoming_message_stack_
ACE_Time_Value current_deadline_
long flush_timer_id_
 The timer ID.

TAO_Transport_Timer transport_timer_
 The adapter used to receive timeout callbacks from the Reactor.

ACE_Lockhandler_lock_
size_t id_
 A unique identifier for the transport.

unsigned long purging_order_
 Used by the LRU, LFU and FIFO Connection Purging Strategies.

size_t recv_buffer_size_
 Size of the buffer received.

size_t sent_byte_count_
 Number of bytes sent.

bool is_connected_
TAO::Transport_Cache_Managertransport_cache_manager (void)
 Helper method that returns the Transport Cache Manager.

int drain_queue (void)
 Send some of the data in the queue.

int drain_queue_i (void)
 Implement drain_queue() assuming the lock is held.

bool queue_is_empty_i (void)
 Check if there are messages pending in the queue.

int drain_queue_helper (int &iovcnt, iovec iov[])
 A helper routine used in drain_queue_i().

int schedule_output_i (void)
 Schedule handle_output() callbacks.

int cancel_output_i (void)
 Cancel handle_output() callbacks.

void cleanup_queue (size_t byte_count)
 Cleanup the queue.

void cleanup_queue_i ()
 Cleanup the complete queue.

int check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush)
 Check if the buffering constraints have been reached.

int send_synchronous_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
int send_reply_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
int send_asynchronous_message_i (TAO_Stub *stub, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
int send_synch_message_helper_i (TAO_Synch_Queued_Message &s, ACE_Time_Value *max_wait_time)
int flush_timer_pending (void) const
 Check if the flush timer is still pending.

void reset_flush_timer (void)
void report_invalid_event_handler (const char *caller)
 Print out error messages if the event handler is not valid.

int handle_input_missing_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time, TAO_Queued_Data *q_data)
int handle_input_parse_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time)
int handle_input_parse_extra_messages (ACE_Message_Block &message_block)
int consolidate_enqueue_message (TAO_Queued_Data *qd)
 -1 error, otherwise 0

int consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh)
 -1 error, otherwise 0

int process_queue_head (TAO_Resume_Handle &rh)
int notify_reactor (void)
void send_connection_closed_notifications_i (void)
 Assume the lock is held.

void allocate_partial_message_block (void)
 TAO_Transport (const TAO_Transport &)
void operator= (const TAO_Transport &)
TAO_Codeset_Translator_Basechar_translator_
 Additional member values required to support codeset translation.

TAO_Codeset_Translator_Basewchar_translator_
CORBA::Boolean tcs_set_
CORBA::Boolean first_request_
ACE_Message_Blockpartial_message_
 Holds the partial GIOP message (if there is one).

TAO::Transport::Statsstats_
 Statistics.

class TAO_Reactive_Flushing_Strategy
class TAO_Leader_Follower_Flushing_Strategy
class TAO_Thread_Per_Connection_Handler

Public Types


Public Member Functions

 TAO_Transport (CORBA::ULong tag, TAO_ORB_Core *orb_core)
 Default creator, requires the tag value be supplied.

virtual ~TAO_Transport (void)
 Destructor.

CORBA::ULong tag (void) const
 Return the protocol tag.

TAO_ORB_Coreorb_core (void) const
 Access the ORB that owns this connection.

TAO_Transport_Mux_Strategytms (void) const
 Get the TAO_Tranport_Mux_Strategy used by this object.

TAO_Wait_Strategywait_strategy (void) const
 Return the TAO_Wait_Strategy used by this object.

int handle_output (void)
 Callback method to reactively drain the outgoing data queue.

int bidirectional_flag (void) const
 Get the bidirectional flag.

void bidirectional_flag (int flag)
 Set the bidirectional flag.

void cache_map_entry (TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *entry)
 Set the Cache Map entry.

TAO::Transport_Cache_Manager::HASH_MAP_ENTRYcache_map_entry (void)
 Get the Cache Map entry.

size_t id (void) const
 Set and Get the identifier for this transport instance.

void id (size_t id)
TAO::Connection_Role opened_as (void) const
void opened_as (TAO::Connection_Role)
unsigned long purging_order (void) const
void purging_order (unsigned long value)
bool queue_is_empty (void)
 Check if there are messages pending in the queue.

void provide_handler (TAO::Connection_Handler_Set &handlers)
 Added event handler to the handlers set.

bool provide_blockable_handler (TAO::Connection_Handler_Set &handlers)
virtual int register_handler (void)
 Register the handler with the reactor.

virtual ssize_t send (iovec *iov, int iovcnt, size_t &bytes_transferred, const ACE_Time_Value *timeout=0)=0
 Write the complete Message_Block chain to the connection.

virtual ssize_t recv (char *buffer, size_t len, const ACE_Time_Value *timeout=0)=0
 Read len bytes from into buf.

Control connection lifecycle
These methods are routed through the TMS object. The TMS strategies implement them correctly.

bool idle_after_send (void)
bool idle_after_reply (void)
virtual void close_connection (void)
 Call the implementation method after obtaining the lock.

Template methods
The Transport class uses the Template Method Pattern to implement the protocol specific functionality. Implementors of a pluggable protocol should override the following methods with the semantics documented below.

virtual int messaging_init (CORBA::Octet major, CORBA::Octet minor)=0
virtual int tear_listen_point_list (TAO_InputCDR &cdr)
virtual bool post_connect_hook (void)
 Hooks that can be overridden in concrete transports.

ACE_Event_Handler::Reference_Count add_reference (void)
 Memory management routines.

ACE_Event_Handler::Reference_Count remove_reference (void)
virtual TAO_Pluggable_Messagingmessaging_object (void)=0

Detailed Description

Generic definitions for the Transport class.

The transport object is created in the Service handler constructor and deleted in the Service Handler's destructor!!

The main responsability of a Transport object is to encapsulate a connection, and provide a transport independent way to send and receive data. Since TAO is heavily based on the Reactor for all if not all its I/O the Transport class is usually implemented with a helper Connection Handler that adapts the generic Transport interface to the Reactor types.

The outgoing data path:

One of the responsibilities of the TAO_Transport class is to send out GIOP messages as efficiently as possible. In most cases messages are put out in FIFO order, the transport object will put out the message using a single system call and return control to the application. However, for oneways and AMI requests it may be more efficient (or required if the SYNC_NONE policy is in effect) to queue the messages until a large enough data set is available. Another reason to queue is that some applications cannot block for I/O, yet they want to send messages so large that a single write() operation would not be able to cope with them. In such cases we need to queue the data and use the Reactor to drain the queue.

Therefore, the Transport class may need to use a queue to temporarily hold the messages, and, in some configurations, it may need to use the Reactor to concurrently drain such queues.

Out of order messages:

TAO provides explicit policies to send 'urgent' messages. Such messages may put at the head of the queue. However, they cannot be sent immediately because the transport may already be sending another message in a reactive fashion.

Consequently, the Transport must also know if the head of the queue has been partially sent. In that case new messages can only follow the head. Only once the head is completely sent we can start sending new messages.

Waiting threads:

One or more threads can be blocked waiting for the connection to completely send the message. The thread should return as soon as its message has been sent, so a per-thread condition is required. This suggest that simply using a ACE_Message_Queue would not be enough: there is a significant amount of ancillary information, to keep on each message that the Message_Block class does not provide room for.

Blocking I/O is still attractive for some applications. First, my eliminating the Reactor overhead performance is improved when sending large blocks of data. Second, using the Reactor to send out data opens the door for nested upcalls, yet some applications cannot deal with the reentrancy issues in this case.

Timeouts:

Some or all messages could have a timeout period attached to them. The timeout source could either be some high-level policy or maybe some strategy to prevent denial of service attacks. In any case the timeouts are per-message, and later messages could have shorter timeouts. In fact, some kind of scheduling (such as EDF) could be required in a few applications.

Conclusions:

The outgoing data path consist in several components:

The Transport object provides a single method to send request messages (send_request_message ()).

The incoming data path:

One of the main responsibilities of the transport is to read and process the incoming GIOP message as quickly and efficiently as possible. There are other forces that needs to be given due consideration. They are

Parsing messages (GIOP) & processing the message:

The messages should be checked for validity and the right information should be sent to the higher layer for processing. The process of doing a sanity check and preparing the messages for the higher layers of the ORB are done by the messaging protocol.

Design forces and Challenges

To keep things as efficient as possible for medium sized requests, it would be good to minimise data copying and locking along the incoming path ie. from the time of reading the data from the handle to the application. We achieve this by creating a buffer on stack and reading the data from the handle into the buffer. We then pass the same data block (the buffer is encapsulated into a data block) to the higher layers of the ORB. The problems stem from the following (a) Data is bigger than the buffer that we have on stack (b) Transports like TCP do not guarantee availability of the whole chunk of data in one shot. Data could trickle in byte by byte. (c) Single read gives multiple messages

We solve the problems as follows

(a) First do a read with the buffer on stack. Query the underlying messaging object whether the message has any incomplete portion. If so, data will be copied into new buffer being able to hold full message and is queued; succeeding events will read data from socket and write directly into this buffer. Otherwise, if if the message in local buffer is complete, we free the handle and then send the message to the higher layers of the ORB for processing.

(b) If buffer with incomplete message has been enqueued, while trying to do the above, the reactor will call us back when the handle becomes read ready. The read-operation will copy data directly into the enqueued buffer. If the message has bee read completely the message is sent to the higher layers of the ORB for processing.

(c) If we get multiple messages (possible if the client connected to the server sends oneways or AMI requests), we parse and split the messages. Every message is put in the queue. Once the messages are queued, the thread picks up one message to send to the higher layers of the ORB. Before doing that, if it finds more messages, it sends a notify to the reactor without resuming the handle. The next thread picks up a message from the queue and processes that. Once the queue is drained the last thread resumes the handle.

Sending Replies

We could use the outgoing path of the ORB to send replies. This would allow us to reuse most of the code in the outgoing data path. We were doing this till TAO-1.2.3. We run in to problems. When writing the reply the ORB gets flow controlled, and the ORB tries to flush the message by going into the reactor. This resulted in unnecessary nesting. The thread that gets into the Reactor could potentially handle other messages (incoming or outgoing) and the stack starts growing leading to crashes.

Solution to the nesting problem

The solution that we (plan to) adopt is pretty straight forward. The thread sending replies will not block to send the replies but queue the replies and return to the Reactor. (Note the careful usages of the terms "blocking in the Reactor" as opposed to "return back to the Reactor".

See Also:

https://svn.dre.vanderbilt.edu/viewvc/Middleware/trunk/TAO/docs/pluggable_protocols/index.html?revision=HEAD

Definition at line 245 of file Transport.h.


Member Enumeration Documentation

anonymous enum
 

Enumeration values:
TAO_ONEWAY_REQUEST 
TAO_TWOWAY_REQUEST 
TAO_REPLY 

Definition at line 597 of file Transport.h.

00598     {
00599       TAO_ONEWAY_REQUEST = 0,
00600       TAO_TWOWAY_REQUEST = 1,
00601       TAO_REPLY
00602     };


Constructor & Destructor Documentation

TAO_Transport::TAO_Transport CORBA::ULong  tag,
TAO_ORB_Core orb_core
 

Default creator, requires the tag value be supplied.

Definition at line 126 of file Transport.cpp.

References ACE_NEW_THROW_EX, TAO_ORB_Core::client_factory(), TAO_Client_Strategy_Factory::create_transport_mux_strategy(), TAO_Client_Strategy_Factory::create_wait_strategy(), TAO_HAS_SENDFILE, and ws_.

00128   : tag_ (tag)
00129   , orb_core_ (orb_core)
00130   , cache_map_entry_ (0)
00131   , bidirectional_flag_ (-1)
00132   , opening_connection_role_ (TAO::TAO_UNSPECIFIED_ROLE)
00133   , head_ (0)
00134   , tail_ (0)
00135   , incoming_message_queue_ (orb_core)
00136   , current_deadline_ (ACE_Time_Value::zero)
00137   , flush_timer_id_ (-1)
00138   , transport_timer_ (this)
00139   , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ())
00140   , id_ ((size_t) this)
00141   , purging_order_ (0)
00142   , recv_buffer_size_ (0)
00143   , sent_byte_count_ (0)
00144   , is_connected_ (false)
00145   , char_translator_ (0)
00146   , wchar_translator_ (0)
00147   , tcs_set_ (0)
00148   , first_request_ (1)
00149   , partial_message_ (0)
00150 #if TAO_HAS_SENDFILE == 1
00151     // The ORB has been configured to use the MMAP allocator, meaning
00152     // we could/should use sendfile() to send data.  Cast once rather
00153     // here rather than during each send.  This assumes that all
00154     // TAO_OutputCDR instances are using the same TAO_MMAP_Allocator
00155     // instance as the underlying output CDR buffer allocator.
00156   , mmap_allocator_ (
00157       dynamic_cast<TAO_MMAP_Allocator *> (
00158         orb_core->output_cdr_buffer_allocator ()))
00159 #endif  /* TAO_HAS_SENDFILE==1 */
00160 {
00161   TAO_Client_Strategy_Factory *cf =
00162     this->orb_core_->client_factory ();
00163 
00164   // Create WS now.
00165   this->ws_ = cf->create_wait_strategy (this);
00166 
00167   // Create TMS now.
00168   this->tms_ = cf->create_transport_mux_strategy (this);
00169 
00170 #if TAO_HAS_TRANSPORT_CURRENT == 1
00171   // Allocate stats
00172   ACE_NEW_THROW_EX (this->stats_,
00173                     TAO::Transport::Stats,
00174                     CORBA::NO_MEMORY ());
00175 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
00176 
00177   /*
00178    * Hook to add code that initializes components that
00179    * belong to the concrete protocol implementation.
00180    * Further additions to this Transport class will
00181    * need to add code *before* this hook.
00182    */
00183   //@@ TAO_TRANSPORT_SPL_CONSTRUCTOR_ADD_HOOK
00184 }

TAO_Transport::~TAO_Transport void   )  [virtual]
 

Destructor.

Definition at line 186 of file Transport.cpp.

References ACE_ASSERT, cleanup_queue_i(), handler_lock_, purge_entry(), ACE_Message_Block::release(), stats_, and ws_.

00187 {
00188   delete this->ws_;
00189 
00190   delete this->tms_;
00191 
00192   delete this->handler_lock_;
00193 
00194   if (!this->is_connected_)
00195     {
00196       // When we have a not connected transport we could have buffered
00197       // messages on this transport which we have to cleanup now.
00198       this->cleanup_queue_i();
00199 
00200       // Cleanup our cache entry
00201       this->purge_entry();
00202     }
00203 
00204   // Release the partial message block, however we may
00205   // have never allocated one.
00206   ACE_Message_Block::release (this->partial_message_);
00207 
00208   // By the time the destructor is reached here all the connection stuff
00209   // *must* have been cleaned up.
00210 
00211   // The following assert is needed for the test "Bug_2494_Regression".
00212   // See the bugzilla bug #2494 for details.
00213   ACE_ASSERT (this->head_ == 0);
00214   ACE_ASSERT (this->cache_map_entry_ == 0);
00215 
00216 #if TAO_HAS_TRANSPORT_CURRENT == 1
00217   delete this->stats_;
00218 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
00219 
00220   /*
00221    * Hook to add code that cleans up components
00222    * belong to the concrete protocol implementation.
00223    * Further additions to this Transport class will
00224    * need to add code *before* this hook.
00225    */
00226   //@@ TAO_TRANSPORT_SPL_DESTRUCTOR_ADD_HOOK
00227 }

TAO_Transport::TAO_Transport const TAO_Transport  )  [private]
 


Member Function Documentation

ACE_Event_Handler::Reference_Count TAO_Transport::add_reference void   ) 
 

Memory management routines.

Definition at line 2484 of file Transport.cpp.

References ACE_Event_Handler::add_reference(), and event_handler_i().

Referenced by TAO::Cache_IntId::Cache_IntId(), TAO_Connection_Handler::connection_pending(), TAO::Cache_IntId::operator=(), provide_blockable_handler(), provide_handler(), TAO::Transport_Cache_Manager::purge(), TAO_Thread_Per_Connection_Handler::TAO_Thread_Per_Connection_Handler(), and TAO_Asynch_Reply_Dispatcher_Base::transport().

02485 {
02486   return this->event_handler_i ()->add_reference ();
02487 }

void TAO_Transport::allocate_partial_message_block void   )  [private]
 

Allocate a partial message block and store it in our partial_message_ data member.

Definition at line 2558 of file Transport.cpp.

References ACE_NEW, TAO_Pluggable_Messaging::header_length(), messaging_object(), and partial_message_.

Referenced by handle_input_parse_data().

02559 {
02560   if (this->partial_message_ == 0)
02561     {
02562       // This value must be at least large enough to hold a GIOP message
02563       // header plus a GIOP fragment header
02564       size_t const partial_message_size =
02565         this->messaging_object ()->header_length ();
02566        // + this->messaging_object ()->fragment_header_length ();
02567        // deprecated, conflicts with not-single_read_opt.
02568 
02569       ACE_NEW (this->partial_message_,
02570                ACE_Message_Block (partial_message_size));
02571     }
02572 }

void TAO_Transport::assign_translators TAO_InputCDR ,
TAO_OutputCDR
 

Use the Transport's codeset factories to set the translator for input and output CDRs.

Definition at line 2454 of file Transport.cpp.

References TAO_Codeset_Translator_Base::assign().

Referenced by TAO::LocateRequest_Invocation::check_reply(), TAO::Synch_Twoway_Invocation::check_reply_status(), TAO_GIOP_Message_Base::process_reply_message(), TAO_GIOP_Message_Base::process_request(), TAO_GIOP_Message_Base::process_request_message(), and TAO::Remote_Invocation::write_header().

02455 {
02456   if (this->char_translator_)
02457     {
02458       this->char_translator_->assign (inp);
02459       this->char_translator_->assign (outp);
02460     }
02461   if (this->wchar_translator_)
02462     {
02463       this->wchar_translator_->assign (inp);
02464       this->wchar_translator_->assign (outp);
02465     }
02466 }

ACE_INLINE void TAO_Transport::bidirectional_flag int  flag  ) 
 

Set the bidirectional flag.

Definition at line 39 of file Transport.inl.

References bidirectional_flag_.

00040 {
00041   this->bidirectional_flag_ = flag;
00042 }

ACE_INLINE int TAO_Transport::bidirectional_flag void   )  const
 

Get the bidirectional flag.

Definition at line 33 of file Transport.inl.

References bidirectional_flag_.

Referenced by TAO::Wait_On_LF_No_Upcall::can_process_upcalls(), TAO_IIOP_Transport::generate_request_header(), TAO_Muxed_TMS::request_id(), TAO_Exclusive_TMS::request_id(), and TAO_IIOP_Transport::tear_listen_point_list().

00034 {
00035   return this->bidirectional_flag_;
00036 }

ACE_INLINE TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * TAO_Transport::cache_map_entry void   ) 
 

Get the Cache Map entry.

Definition at line 57 of file Transport.inl.

References cache_map_entry_.

00058 {
00059   return this->cache_map_entry_;
00060 }

ACE_INLINE void TAO_Transport::cache_map_entry TAO::Transport_Cache_Manager::HASH_MAP_ENTRY entry  ) 
 

Set the Cache Map entry.

Definition at line 63 of file Transport.inl.

References cache_map_entry_, and TAO::Transport_Cache_Manager::HASH_MAP_ENTRY.

Referenced by TAO::Transport_Cache_Manager::bind_i().

00065 {
00066   this->cache_map_entry_ = entry;
00067 }

int TAO_Transport::cancel_output_i void   )  [private]
 

Cancel handle_output() callbacks.

Definition at line 794 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, ACE_Reactor::cancel_wakeup(), event_handler_i(), LM_DEBUG, ACE_Event_Handler::reactor(), and TAO_debug_level.

Referenced by TAO_Reactive_Flushing_Strategy::cancel_output(), and TAO_Leader_Follower_Flushing_Strategy::cancel_output().

00795 {
00796   ACE_Event_Handler * const eh = this->event_handler_i ();
00797   ACE_Reactor *const reactor = eh->reactor ();
00798 
00799   if (TAO_debug_level > 3)
00800     {
00801       ACE_DEBUG ((LM_DEBUG,
00802          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"),
00803          this->id ()));
00804     }
00805 
00806   return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00807 }

ACE_INLINE void TAO_Transport::char_translator TAO_Codeset_Translator_Base  ) 
 

CodeSet negotiation - Set the char codeset translator factory.

Definition at line 137 of file Transport.inl.

References tcs_set_.

00138 {
00139   this->char_translator_ = tf;
00140   this->tcs_set_ = 1;
00141 }

ACE_INLINE TAO_Codeset_Translator_Base * TAO_Transport::char_translator void   )  const
 

CodeSet Negotiation - Get the char codeset translator factory.

Definition at line 125 of file Transport.inl.

00126 {
00127   return this->char_translator_;
00128 }

int TAO_Transport::check_buffering_constraints_i TAO_Stub stub,
bool &  must_flush
[private]
 

Check if the buffering constraints have been reached.

Definition at line 1138 of file Transport.cpp.

References TAO::Transport_Queueing_Strategy::buffering_constraints_reached(), ACE_Reactor::cancel_timer(), current_deadline_, event_handler_i(), flush_timer_id_, flush_timer_pending(), ACE_OS::gettimeofday(), TAO_Queued_Message::message_length(), TAO_Queued_Message::next(), ACE_Event_Handler::reactor(), ACE_Reactor::schedule_timer(), and TAO_Stub::transport_queueing_strategy().

Referenced by send_asynchronous_message_i().

01139 {
01140   // First let's compute the size of the queue:
01141   size_t msg_count = 0;
01142   size_t total_bytes = 0;
01143 
01144   for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
01145     {
01146       ++msg_count;
01147       total_bytes += i->message_length ();
01148     }
01149 
01150   bool set_timer = false;
01151   ACE_Time_Value new_deadline;
01152 
01153   TAO::Transport_Queueing_Strategy *queue_strategy =
01154     stub->transport_queueing_strategy ();
01155 
01156   bool constraints_reached = true;
01157 
01158   if (queue_strategy)
01159     {
01160       constraints_reached =
01161         queue_strategy->buffering_constraints_reached (stub,
01162                                                        msg_count,
01163                                                        total_bytes,
01164                                                        must_flush,
01165                                                        this->current_deadline_,
01166                                                        set_timer,
01167                                                        new_deadline);
01168     }
01169   else
01170     {
01171       must_flush = false;
01172     }
01173 
01174   // ... set the new timer, also cancel any previous timers ...
01175   if (set_timer)
01176     {
01177       ACE_Event_Handler *eh = this->event_handler_i ();
01178       ACE_Reactor * const reactor = eh->reactor ();
01179       this->current_deadline_ = new_deadline;
01180       ACE_Time_Value delay = new_deadline - ACE_OS::gettimeofday ();
01181 
01182       if (this->flush_timer_pending ())
01183         {
01184           reactor->cancel_timer (this->flush_timer_id_);
01185         }
01186 
01187       this->flush_timer_id_ =
01188         reactor->schedule_timer (&this->transport_timer_,
01189                                  &this->current_deadline_,
01190                                  delay);
01191     }
01192 
01193   return constraints_reached;
01194 }

void TAO_Transport::cleanup_queue size_t  byte_count  )  [private]
 

Cleanup the queue.

Exactly byte_count bytes have been sent, the queue must be cleaned up as potentially several messages have been completely sent out. It leaves on head_ the next message to send out.

Definition at line 1101 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, TAO_Queued_Message::all_data_sent(), TAO_Queued_Message::bytes_transferred(), TAO_Queued_Message::destroy(), LM_DEBUG, TAO_Queued_Message::message_length(), TAO_Queued_Message::remove_from_list(), and TAO_debug_level.

Referenced by drain_queue_helper().

01102 {
01103   while (this->head_ != 0 && byte_count > 0)
01104     {
01105       TAO_Queued_Message *i = this->head_;
01106 
01107       if (TAO_debug_level > 4)
01108         {
01109           ACE_DEBUG ((LM_DEBUG,
01110              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01111              ACE_TEXT ("byte_count = %d\n"),
01112              this->id (), byte_count));
01113         }
01114 
01115       // Update the state of the first message
01116       i->bytes_transferred (byte_count);
01117 
01118       if (TAO_debug_level > 4)
01119         {
01120           ACE_DEBUG ((LM_DEBUG,
01121              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01122              ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"),
01123              this->id (), byte_count, i->all_data_sent (),
01124              i->message_length ()));
01125         }
01126 
01127       // ... if all the data was sent the message must be removed from
01128       // the queue...
01129       if (i->all_data_sent ())
01130         {
01131           i->remove_from_list (this->head_, this->tail_);
01132           i->destroy ();
01133         }
01134     }
01135 }

void TAO_Transport::cleanup_queue_i  )  [private]
 

Cleanup the complete queue.

Definition at line 1058 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, TAO_Queued_Message::destroy(), LM_DEBUG, TAO_Queued_Message::message_length(), TAO_Queued_Message::remove_from_list(), TAO_LF_Event::state_changed(), and TAO_debug_level.

Referenced by pre_close(), send_connection_closed_notifications_i(), and ~TAO_Transport().

01059 {
01060   if (TAO_debug_level > 4)
01061     {
01062       ACE_DEBUG ((LM_DEBUG,
01063          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
01064          ACE_TEXT ("cleaning up complete queue\n"),
01065          this->id ()));
01066     }
01067 
01068   size_t byte_count = 0;
01069   int msg_count = 0;
01070 
01071   // Cleanup all messages
01072   while (this->head_ != 0)
01073     {
01074       TAO_Queued_Message *i = this->head_;
01075 
01076       if (TAO_debug_level > 4)
01077         {
01078           byte_count += i->message_length();
01079           ++msg_count;
01080         }
01081        // @@ This is a good point to insert a flag to indicate that a
01082        //    CloseConnection message was successfully received.
01083       i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
01084                         this->orb_core_->leader_follower ());
01085 
01086       i->remove_from_list (this->head_, this->tail_);
01087 
01088       i->destroy ();
01089     }
01090 
01091   if (TAO_debug_level > 4)
01092     {
01093       ACE_DEBUG ((LM_DEBUG,
01094                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
01095                   ACE_TEXT ("discarded %d messages, %u bytes.\n"),
01096                   this->id (), msg_count, byte_count));
01097     }
01098 }

void TAO_Transport::clear_translators TAO_InputCDR ,
TAO_OutputCDR
 

It is necessary to clear the codeset translator when a CDR stream is used for more than one GIOP message. This is required since the header must not be translated, whereas the body must be.

Definition at line 2469 of file Transport.cpp.

References ACE_OutputCDR::char_translator(), ACE_InputCDR::char_translator(), ACE_OutputCDR::wchar_translator(), and ACE_InputCDR::wchar_translator().

Referenced by TAO::Remote_Invocation::write_header().

02470 {
02471   if (inp)
02472     {
02473       inp->char_translator (0);
02474       inp->wchar_translator (0);
02475     }
02476   if (outp)
02477     {
02478       outp->char_translator (0);
02479       outp->wchar_translator (0);
02480     }
02481 }

void TAO_Transport::close_connection void   )  [virtual]
 

Call the implementation method after obtaining the lock.

Definition at line 312 of file Transport.cpp.

References TAO_Connection_Handler::close_connection(), and connection_handler_i().

Referenced by TAO_IIOP_Connector::complete_connection(), TAO::LocateRequest_Invocation::invoke(), post_open(), TAO::Transport_Cache_Manager::purge(), TAO::Synch_Twoway_Invocation::remote_twoway(), TAO_GIOP_Message_Base::send_close_connection(), TAO::Remote_Invocation::send_message(), send_message_shared(), TAO_IIOP_Transport::send_message_shared(), TAO_Wait_On_Read::wait(), TAO::Synch_Twoway_Invocation::wait_for_reply(), and TransportCleanupGuard::~TransportCleanupGuard().

00313 {
00314   this->connection_handler_i ()->close_connection ();
00315 }

ACE_INLINE TAO_Connection_Handler * TAO_Transport::connection_handler void   ) 
 

Get the connection handler for this transport.

Definition at line 175 of file Transport.inl.

References connection_handler_i().

Referenced by TAO_IIOP_Connector::complete_connection(), TAO::Remote_Invocation::send_message(), TAO_Connect_Strategy::wait(), and TAO_Connector::wait_for_connection_completion().

00176 {
00177   return this->connection_handler_i();
00178 }

virtual TAO_Connection_Handler* TAO_Transport::connection_handler_i void   )  [protected, pure virtual]
 

Implemented in TAO_IIOP_Transport.

Referenced by close_connection(), and connection_handler().

int TAO_Transport::consolidate_enqueue_message TAO_Queued_Data qd  )  [private]
 

-1 error, otherwise 0

Definition at line 1616 of file Transport.cpp.

References ACE_ERROR, ACE_TEXT, TAO_Pluggable_Messaging::consolidate_fragmented_message(), TAO_Incoming_Message_Queue::enqueue_tail(), incoming_message_queue_, LM_ERROR, messaging_object(), TAO_Queued_Data::missing_data(), TAO_Queued_Data::more_fragments(), TAO_Queued_Data::msg_type(), TAO_Queued_Data::release(), TAO_debug_level, and TAO_PLUGGABLE_MESSAGE_FRAGMENT.

Referenced by handle_input_parse_data(), and handle_input_parse_extra_messages().

01617 {
01618   // consolidate message on top of stack, only for fragmented messages
01619 
01620   // paranoid check
01621   if (q_data->missing_data () != 0)
01622     {
01623        return -1;
01624     }
01625 
01626   if (q_data->more_fragments () ||
01627       q_data->msg_type () == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
01628     {
01629       TAO_Queued_Data *new_q_data = 0;
01630 
01631       switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01632         {
01633         case -1: // error
01634           return -1;
01635 
01636         case 0:  // returning consolidated message in new_q_data
01637           if (!new_q_data)
01638             {
01639               if (TAO_debug_level > 0)
01640                 {
01641                   ACE_ERROR ((LM_ERROR,
01642                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ")
01643                      ACE_TEXT ("error, consolidated message is NULL\n"),
01644                      this->id ()));
01645                 }
01646               return -1;
01647             }
01648 
01649           if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0)
01650             {
01651               TAO_Queued_Data::release (new_q_data);
01652               return -1;
01653             }
01654           break;
01655 
01656         case 1:  // fragment has been stored in messaging_oject()
01657           break;
01658         }
01659     }
01660   else
01661     {
01662       if (this->incoming_message_queue_.enqueue_tail (q_data) != 0)
01663         {
01664           TAO_Queued_Data::release (q_data);
01665           return -1;
01666         }
01667     }
01668 
01669   return 0; // success
01670 }

int TAO_Transport::consolidate_process_message TAO_Queued_Data qd,
TAO_Resume_Handle rh
[private]
 

-1 error, otherwise 0

Definition at line 1529 of file Transport.cpp.

References ACE_ERROR, ACE_TEXT, TAO_Pluggable_Messaging::consolidate_fragmented_message(), LM_ERROR, messaging_object(), TAO_Queued_Data::missing_data(), TAO_Queued_Data::more_fragments(), TAO_Queued_Data::msg_type(), process_parsed_messages(), TAO_Queued_Data::release(), TAO_debug_level, and TAO_PLUGGABLE_MESSAGE_FRAGMENT.

Referenced by handle_input_missing_data().

01531 {
01532   // paranoid check
01533   if (q_data->missing_data () != 0)
01534     {
01535       if (TAO_debug_level > 0)
01536         {
01537            ACE_ERROR ((LM_ERROR,
01538               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01539               ACE_TEXT ("missing data\n"),
01540               this->id ()));
01541         }
01542        return -1;
01543     }
01544 
01545   if (q_data->more_fragments () ||
01546       q_data->msg_type () == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
01547     {
01548       // consolidate message on top of stack, only for fragmented messages
01549       TAO_Queued_Data *new_q_data = 0;
01550 
01551       switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01552         {
01553         case -1: // error
01554           return -1;
01555 
01556         case 0:  // returning consolidated message in q_data
01557           if (!new_q_data)
01558             {
01559               if (TAO_debug_level > 0)
01560                 {
01561                   ACE_ERROR ((LM_ERROR,
01562                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01563                      ACE_TEXT ("error, consolidated message is NULL\n"),
01564                      this->id ()));
01565                 }
01566               return -1;
01567             }
01568 
01569 
01570           if (this->process_parsed_messages (new_q_data, rh) == -1)
01571             {
01572               TAO_Queued_Data::release (new_q_data);
01573 
01574               if (TAO_debug_level > 0)
01575                 {
01576                   ACE_ERROR ((LM_ERROR,
01577                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01578                      ACE_TEXT ("error processing consolidated message\n"),
01579                      this->id ()));
01580                 }
01581               return -1;
01582             }
01583 
01584           TAO_Queued_Data::release (new_q_data);
01585 
01586           break;
01587 
01588         case 1:  // fragment has been stored in messaging_oject()
01589           break;
01590         }
01591     }
01592   else
01593     {
01594       if (this->process_parsed_messages (q_data, rh) == -1)
01595         {
01596           TAO_Queued_Data::release (q_data);
01597 
01598           if (TAO_debug_level > 0)
01599             {
01600               ACE_ERROR ((LM_ERROR,
01601                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01602                  ACE_TEXT ("error processing message\n"),
01603                  this->id ()));
01604             }
01605           return -1;
01606         }
01607 
01608       TAO_Queued_Data::release (q_data);
01609 
01610     }
01611 
01612   return 0;
01613 }

int TAO_Transport::drain_queue void   )  [private]
 

Send some of the data in the queue.

As the outgoing data is drained this method is invoked to send as much of the current message as possible.

Returns 0 if there is more data to send, -1 if there was an error and 1 if the message was completely sent.

Definition at line 849 of file Transport.cpp.

References ACE_GUARD_RETURN, TAO_Flushing_Strategy::cancel_output(), drain_queue_i(), TAO_ORB_Core::flushing_strategy(), and orb_core().

Referenced by handle_output().

00850 {
00851   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00852   int const retval = this->drain_queue_i ();
00853 
00854   if (retval == 1)
00855     {
00856       // ... there is no current message or it was completely
00857       // sent, cancel output...
00858       TAO_Flushing_Strategy *flushing_strategy =
00859         this->orb_core ()->flushing_strategy ();
00860 
00861       flushing_strategy->cancel_output (this);
00862 
00863       return 0;
00864     }
00865 
00866   return retval;
00867 }

int TAO_Transport::drain_queue_helper int &  iovcnt,
iovec  iov[]
[private]
 

A helper routine used in drain_queue_i().

Definition at line 870 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, cleanup_queue(), dump_iov(), EWOULDBLOCK, LM_DEBUG, send(), sent_byte_count_, ssize_t, and TAO_debug_level.

Referenced by drain_queue_i().

00871 {
00872   size_t byte_count = 0;
00873 
00874   // ... send the message ...
00875   ssize_t retval = -1;
00876 
00877 #if TAO_HAS_SENDFILE == 1
00878   if (this->mmap_allocator_)
00879     retval = this->sendfile (this->mmap_allocator_,
00880                              iov,
00881                              iovcnt,
00882                              byte_count);
00883   else
00884 #endif  /* TAO_HAS_SENDFILE==1 */
00885     retval = this->send (iov, iovcnt, byte_count);
00886 
00887   if (TAO_debug_level == 5)
00888     {
00889       dump_iov (iov, iovcnt, this->id (),
00890                 byte_count, "drain_queue_helper");
00891     }
00892 
00893   // ... now we need to update the queue, removing elements
00894   // that have been sent, and updating the last element if it
00895   // was only partially sent ...
00896   this->cleanup_queue (byte_count);
00897   iovcnt = 0;
00898 
00899   if (retval == 0)
00900     {
00901       if (TAO_debug_level > 4)
00902         {
00903           ACE_DEBUG ((LM_DEBUG,
00904              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00905              ACE_TEXT ("send() returns 0\n"),
00906              this->id ()));
00907         }
00908       return -1;
00909     }
00910   else if (retval == -1)
00911     {
00912       if (TAO_debug_level > 4)
00913         {
00914           ACE_DEBUG ((LM_DEBUG,
00915              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00916              ACE_TEXT ("error during %p\n"),
00917              this->id (), ACE_TEXT ("send()")));
00918         }
00919 
00920       if (errno == EWOULDBLOCK || errno == EAGAIN)
00921         {
00922           return 0;
00923         }
00924 
00925       return -1;
00926     }
00927 
00928   // ... start over, how do we guarantee progress?  Because if
00929   // no bytes are sent send() can only return 0 or -1
00930 
00931   // Total no. of bytes sent for a send call
00932   this->sent_byte_count_ += byte_count;
00933 
00934   if (TAO_debug_level > 4)
00935     {
00936       ACE_DEBUG ((LM_DEBUG,
00937          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00938          ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"),
00939          this->id(), byte_count, (this->head_ == 0)));
00940     }
00941 
00942   return 1;
00943 }

int TAO_Transport::drain_queue_i void   )  [private]
 

Implement drain_queue() assuming the lock is held.

Definition at line 946 of file Transport.cpp.

References ACE_DEBUG, ACE_IOV_MAX, ACE_TEXT, ACE_Reactor::cancel_timer(), TAO_Queued_Message::destroy(), drain_queue_helper(), event_handler_i(), TAO_Queued_Message::fill_iov(), flush_timer_pending(), ACE_High_Res_Timer::gettimeofday_hr(), TAO_Queued_Message::is_expired(), LM_DEBUG, TAO_Queued_Message::next(), ACE_Event_Handler::reactor(), TAO_Queued_Message::remove_from_list(), reset_flush_timer(), sent_byte_count_, TAO_LF_Event::state_changed(), and TAO_debug_level.

Referenced by drain_queue(), send_message_block_chain_i(), and send_synch_message_helper_i().

00947 {
00948   // This is the vector used to send data, it must be declared outside
00949   // the loop because after the loop there may still be data to be
00950   // sent
00951   int iovcnt = 0;
00952 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00953   iovec iov[ACE_IOV_MAX] = { 0 , 0 };
00954 #else
00955   iovec iov[ACE_IOV_MAX];
00956 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
00957 
00958   // We loop over all the elements in the queue ...
00959   TAO_Queued_Message *i = this->head_;
00960 
00961   // Reset the value so that the counting is done for each new send
00962   // call.
00963   this->sent_byte_count_ = 0;
00964 
00965   // Avoid calling this expensive function each time through the loop. Instead
00966   // we'll assume that the time is unlikely to change much during the loop.
00967   // If we are forced to send in the loop then we'll recompute the time.
00968   ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr ();
00969 
00970   while (i != 0)
00971     {
00972       if (i->is_expired (now))
00973         {
00974           if (TAO_debug_level > 3)
00975           {
00976             ACE_DEBUG ((LM_DEBUG,
00977               ACE_TEXT ("TAO (%P|%t - Transport[%d]::drain_queue_i, ")
00978               ACE_TEXT ("Discarding expired queued message.\n"),
00979               this->id ()));
00980           }
00981           TAO_Queued_Message *next = i->next ();
00982           i->state_changed (TAO_LF_Event::LFS_TIMEOUT,
00983                             this->orb_core_->leader_follower ());
00984           i->remove_from_list (this->head_, this->tail_);
00985           i->destroy ();
00986           i = next;
00987           continue;
00988         }
00989       // ... each element fills the iovector ...
00990       i->fill_iov (ACE_IOV_MAX, iovcnt, iov);
00991 
00992       // ... the vector is full, no choice but to send some data out.
00993       // We need to loop because a single message can span multiple
00994       // IOV_MAX elements ...
00995       if (iovcnt == ACE_IOV_MAX)
00996         {
00997           int const retval =
00998             this->drain_queue_helper (iovcnt, iov);
00999 
01000           now = ACE_High_Res_Timer::gettimeofday_hr ();
01001 
01002           if (TAO_debug_level > 4)
01003             {
01004               ACE_DEBUG ((LM_DEBUG,
01005                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01006                  ACE_TEXT ("helper retval = %d\n"),
01007                  this->id (), retval));
01008             }
01009 
01010           if (retval != 1)
01011             {
01012               return retval;
01013             }
01014 
01015           i = this->head_;
01016           continue;
01017         }
01018       // ... notice that this line is only reached if there is still
01019       // room in the iovector ...
01020       i = i->next ();
01021     }
01022 
01023   if (iovcnt != 0)
01024     {
01025       int const retval = this->drain_queue_helper (iovcnt, iov);
01026 
01027       if (TAO_debug_level > 4)
01028         {
01029           ACE_DEBUG ((LM_DEBUG,
01030               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01031               ACE_TEXT ("helper retval = %d\n"),
01032               this->id (), retval));
01033         }
01034 
01035       if (retval != 1)
01036         {
01037           return retval;
01038         }
01039     }
01040 
01041   if (this->head_ == 0)
01042     {
01043       if (this->flush_timer_pending ())
01044         {
01045           ACE_Event_Handler *eh = this->event_handler_i ();
01046           ACE_Reactor * const reactor = eh->reactor ();
01047           reactor->cancel_timer (this->flush_timer_id_);
01048           this->reset_flush_timer ();
01049         }
01050 
01051       return 1;
01052     }
01053 
01054   return 0;
01055 }

virtual ACE_Event_Handler* TAO_Transport::event_handler_i void   )  [pure virtual]
 

Normally a concrete TAO_Transport object has-a ACE_Event_Handler member that functions as an adapter between the ACE_Reactor framework and the TAO pluggable protocol framework. In all the protocols implemented so far this role is fullfilled by an instance of ACE_Svc_Handler.

Todo:
Since we only use a limited functionality of ACE_Svc_Handler we could probably implement a generic adapter class (TAO_Transport_Event_Handler or something), this will reduce footprint and simplify the process of implementing a pluggable protocol.

This method has to be renamed to event_handler()

Implemented in TAO_IIOP_Transport.

Referenced by add_reference(), cancel_output_i(), check_buffering_constraints_i(), drain_queue_i(), TAO::Transport_Cache_Manager::find_transport(), notify_reactor(), register_handler(), remove_reference(), schedule_output_i(), TAO_Connection_Handler::transport(), and TAO_Wait_On_Read::wait().

ACE_INLINE void TAO_Transport::first_request_sent  ) 
 

Set the state of the first_request_ flag to 0.

Definition at line 158 of file Transport.inl.

References first_request_.

Referenced by TAO_IIOP_Transport::send_request().

00159 {
00160   this->first_request_ = 0;
00161 }

ACE_INLINE int TAO_Transport::flush_timer_pending void   )  const [private]
 

Check if the flush timer is still pending.

Definition at line 108 of file Transport.inl.

References flush_timer_id_.

Referenced by check_buffering_constraints_i(), drain_queue_i(), and handle_timeout().

00109 {
00110   return this->flush_timer_id_ != -1;
00111 }

int TAO_Transport::format_queue_message TAO_OutputCDR stream,
ACE_Time_Value max_wait_time
 

Format and queue a message for stream

Parameters:
max_wait_time The maximum time that the operation can block, used in the implementation of timeouts.

Definition at line 499 of file Transport.cpp.

References ACE_OutputCDR::begin(), TAO_Pluggable_Messaging::format_message(), messaging_object(), and queue_message_i().

Referenced by TAO::Synch_Oneway_Invocation::remote_oneway().

00501 {
00502   if (this->messaging_object ()->format_message (stream) != 0)
00503     return -1;
00504 
00505   return this->queue_message_i (stream.begin (), max_wait_time);
00506 }

int TAO_Transport::generate_locate_request TAO_Target_Specification spec,
TAO_Operation_Details opdetails,
TAO_OutputCDR output
 

This is a request for the transport object to write a LocateRequest header before it is sent out.

Definition at line 371 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, TAO_Pluggable_Messaging::generate_locate_request_header(), LM_DEBUG, messaging_object(), and TAO_debug_level.

Referenced by TAO::LocateRequest_Invocation::invoke().

00375 {
00376   if (this->messaging_object ()->generate_locate_request_header (opdetails,
00377                                                                  spec,
00378                                                                  output) == -1)
00379     {
00380       if (TAO_debug_level > 0)
00381         {
00382           ACE_DEBUG ((LM_DEBUG,
00383                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ")
00384                       ACE_TEXT ("error while marshalling the LocateRequest header\n"),
00385                       this->id ()));
00386         }
00387 
00388       return -1;
00389     }
00390 
00391   return 0;
00392 }

int TAO_Transport::generate_request_header TAO_Operation_Details opd,
TAO_Target_Specification spec,
TAO_OutputCDR msg
[virtual]
 

This is a request for the transport object to write a request header before it sends out the request

Reimplemented in TAO_IIOP_Transport.

Definition at line 395 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, TAO_ORB_Core::codeset_manager(), first_request_, TAO_Pluggable_Messaging::generate_request_header(), TAO_Codeset_Manager::generate_service_context(), LM_DEBUG, messaging_object(), orb_core(), and TAO_debug_level.

Referenced by TAO_IIOP_Transport::generate_request_header(), and TAO::Remote_Invocation::write_header().

00399 {
00400   // codeset service context is only supposed to be sent in the first request
00401   // on a particular connection.
00402   if (this->first_request_)
00403     {
00404       TAO_Codeset_Manager * const csm = this->orb_core ()->codeset_manager ();
00405       if (csm)
00406         csm->generate_service_context (opdetails,*this);
00407     }
00408 
00409   if (this->messaging_object ()->generate_request_header (opdetails,
00410                                                           spec,
00411                                                           output) == -1)
00412     {
00413       if (TAO_debug_level > 0)
00414         {
00415         ACE_DEBUG ((LM_DEBUG,
00416                    ACE_TEXT ("(%P|%t) - Transport[%d]::generate_request_header, ")
00417                    ACE_TEXT ("error while marshalling the Request header\n"),
00418                       this->id()));
00419         }
00420 
00421       return -1;
00422     }
00423 
00424   return 0;
00425 }

int TAO_Transport::handle_input TAO_Resume_Handle rh,
ACE_Time_Value max_wait_time = 0
[virtual]
 

Callback to read incoming data.

The ACE_Event_Handler adapter invokes this method as part of its handle_input() operation.

Todo:
the method name is confusing! Calling it handle_input() would probably make things easier to understand and follow!
Once a complete message is read the Transport class delegates on the Messaging layer to invoke the right upcall (on the server) or the TAO_Reply_Dispatcher (on the client side).

Parameters:
max_wait_time In some cases the I/O is synchronous, e.g. a thread-per-connection server or when Wait_On_Read is enabled. In those cases a maximum read time can be specified.

Definition at line 1455 of file Transport.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT, handle_input_missing_data(), handle_input_parse_data(), incoming_message_stack_, LM_DEBUG, LM_ERROR, TAO_Queued_Data::missing_data(), process_queue_head(), TAO_debug_level, TAO_MISSING_DATA_UNDEFINED, and TAO::Incoming_Message_Stack::top().

Referenced by TAO_Connection_Handler::handle_input_internal(), TAO_Connection_Handler::svc_i(), and TAO_Wait_On_Read::wait().

01457 {
01458   if (TAO_debug_level > 3)
01459     {
01460       ACE_DEBUG ((LM_DEBUG,
01461          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"),
01462          this->id ()));
01463     }
01464 
01465   // First try to process messages of the head of the incoming queue.
01466   int const retval = this->process_queue_head (rh);
01467 
01468   if (retval <= 0)
01469     {
01470       if (retval == -1)
01471         {
01472           if (TAO_debug_level > 2)
01473             {
01474               ACE_DEBUG ((LM_DEBUG,
01475                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01476                  ACE_TEXT ("error while parsing the head of the queue\n"),
01477                  this->id()));
01478 
01479             }
01480           return -1;
01481         }
01482       else
01483         {
01484           // retval == 0
01485 
01486           // Processed a message in queue successfully. This
01487           // thread must return to thread-pool now.
01488           return 0;
01489         }
01490     }
01491 
01492   TAO_Queued_Data *q_data = 0;
01493 
01494   if (this->incoming_message_stack_.top (q_data) != -1
01495       && q_data->missing_data () != TAO_MISSING_DATA_UNDEFINED)
01496     {
01497       /* PRE: q_data->missing_data_ > 0 as all QD on stack must be incomplete  */
01498       if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1)
01499         {
01500           if (TAO_debug_level > 0)
01501             {
01502               ACE_ERROR ((LM_ERROR,
01503                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01504                  ACE_TEXT ("error consolidating incoming message\n"),
01505                  this->id ()));
01506             }
01507           return -1;
01508         }
01509     }
01510   else
01511     {
01512       if (this->handle_input_parse_data (rh, max_wait_time) == -1)
01513         {
01514           if (TAO_debug_level > 0)
01515             {
01516               ACE_ERROR ((LM_ERROR,
01517                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01518                  ACE_TEXT ("error parsing incoming message\n"),
01519                  this->id ()));
01520             }
01521           return -1;
01522         }
01523     }
01524 
01525   return 0;
01526 }

int TAO_Transport::handle_input_missing_data TAO_Resume_Handle rh,
ACE_Time_Value max_wait_time,
TAO_Queued_Data q_data
[private]
 

Is invoked by handle_input operation. It consolidate message on top of incoming_message_stack. The amount of missing data is known and recv operation copies data directly into message buffer, as much as a single recv-invocation provides.

Definition at line 1673 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, consolidate_process_message(), ACE_CDR::grow(), incoming_message_stack_, ACE_Message_Block::length(), LM_DEBUG, TAO_Queued_Data::missing_data(), TAO_Queued_Data::msg_block(), TAO::Incoming_Message_Stack::pop(), recv(), ACE_Message_Block::space(), ssize_t, TAO_debug_level, and ACE_Message_Block::wr_ptr().

Referenced by handle_input().

01676 {
01677   // paranoid check
01678   if (q_data == 0)
01679     {
01680       return -1;
01681     }
01682 
01683   if (TAO_debug_level > 3)
01684     {
01685       ACE_DEBUG ((LM_DEBUG,
01686          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01687          ACE_TEXT ("enter (missing data == %d)\n"),
01688          this->id (), q_data->missing_data ()));
01689     }
01690 
01691   size_t const recv_size = q_data->missing_data ();
01692 
01693   if (q_data->msg_block ()->space() < recv_size)
01694     {
01695       // make sure the message_block has enough space
01696       size_t const message_size = recv_size + q_data->msg_block ()->length();
01697 
01698       if (ACE_CDR::grow (q_data->msg_block (), message_size) == -1)
01699         {
01700           return -1;
01701         }
01702     }
01703 
01704   // Saving the size of the received buffer in case any one needs to
01705   // get the size of the message thats received in the
01706   // context. Obviously the value will be changed for each recv call
01707   // and the user is supposed to invoke the accessor only in the
01708   // invocation context to get meaningful information.
01709   this->recv_buffer_size_ = recv_size;
01710 
01711   // Read the message into the existing message block on heap
01712   ssize_t const n = this->recv (q_data->msg_block ()->wr_ptr(),
01713                                 recv_size,
01714                                 max_wait_time);
01715 
01716 
01717   if (n <= 0)
01718     {
01719       return n;
01720     }
01721 
01722   if (TAO_debug_level > 3)
01723     {
01724       ACE_DEBUG ((LM_DEBUG,
01725          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01726          ACE_TEXT ("read bytes %d\n"),
01727          this->id (), n));
01728     }
01729 
01730   q_data->msg_block ()->wr_ptr(n);
01731   q_data->missing_data (q_data->missing_data () - n);
01732 
01733   if (q_data->missing_data () == 0)
01734     {
01735       // paranoid check
01736       if (this->incoming_message_stack_.pop (q_data) == -1)
01737         {
01738           return -1;
01739         }
01740 
01741       if (this->consolidate_process_message (q_data, rh) == -1)
01742         {
01743           return -1;
01744         }
01745     }
01746 
01747   return 0;
01748 }

int TAO_Transport::handle_input_parse_data TAO_Resume_Handle rh,
ACE_Time_Value max_wait_time
[private]
 

Is invoked by handle_input operation. It parses new messages from input stream or consolidates messages whose header has been partially read, the message size being unknown so far. It parses as much data as a single recv-invocation provides.

Definition at line 1795 of file Transport.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT, allocate_partial_message_block(), consolidate_enqueue_message(), TAO_Pluggable_Messaging::consolidate_node(), ACE_Message_Block::copy(), TAO_Queued_Data::duplicate(), ACE_CDR::grow(), handle_input_parse_extra_messages(), TAO_Pluggable_Messaging::header_length(), incoming_message_queue_, incoming_message_stack_, TAO_ORB_Core::input_cdr_dblock_allocator(), ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO_ORB_Core::locking_strategy(), ACE_CDR::mb_align(), ACE_OS::memset(), messaging_object(), TAO_Queued_Data::missing_data(), TAO_Queued_Data::more_fragments(), TAO_Queued_Data::msg_block(), TAO_Queued_Data::msg_type(), notify_reactor(), TAO_ORB_Core::orb_params(), TAO_Pluggable_Messaging::parse_next_message(), partial_message_, TAO::Incoming_Message_Stack::pop(), process_parsed_messages(), process_queue_head(), TAO::Incoming_Message_Stack::push(), TAO_Incoming_Message_Queue::queue_length(), ACE_Message_Block::rd_ptr(), recv(), ACE_Message_Block::reset(), TAO_Resume_Handle::set_flag(), TAO_ORB_Parameters::single_read_optimization(), ACE_Message_Block::space(), ssize_t, TAO_debug_level, TAO_MAXBUFSIZE, TAO_MISSING_DATA_UNDEFINED, TAO_PLUGGABLE_MESSAGE_FRAGMENT, TAO::Incoming_Message_Stack::top(), and ACE_Message_Block::wr_ptr().

Referenced by handle_input().

01797 {
01798 
01799   if (TAO_debug_level > 3)
01800     {
01801       ACE_DEBUG ((LM_DEBUG,
01802          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01803          ACE_TEXT ("enter\n"),
01804          this->id ()));
01805     }
01806 
01807 
01808   // The buffer on the stack which will be used to hold the input
01809   // messages, ACE_CDR::MAX_ALIGNMENT compensates the
01810   // memory-alignment. This improves performance with SUN-Java-ORB-1.4
01811   // and higher that sends fragmented requests of size 1024 bytes.
01812   char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT];
01813 
01814 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
01815   (void) ACE_OS::memset (buf,
01816                          '\0',
01817                          sizeof buf);
01818 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
01819 
01820   // Create a data block
01821   ACE_Data_Block db (sizeof (buf),
01822                      ACE_Message_Block::MB_DATA,
01823                      buf,
01824                      this->orb_core_->input_cdr_buffer_allocator (),
01825                      this->orb_core_->locking_strategy (),
01826                      ACE_Message_Block::DONT_DELETE,
01827                      this->orb_core_->input_cdr_dblock_allocator ());
01828 
01829   // Create a message block
01830   ACE_Message_Block message_block (&db,
01831                                    ACE_Message_Block::DONT_DELETE,
01832                                    this->orb_core_->input_cdr_msgblock_allocator ());
01833 
01834 
01835   // Align the message block
01836   ACE_CDR::mb_align (&message_block);
01837 
01838   size_t recv_size = 0; // Note: unsigned integer
01839 
01840   // Pointer to newly parsed message
01841   TAO_Queued_Data *q_data = 0;
01842 
01843   // optimizing access of constants
01844   size_t const header_length = this->messaging_object ()->header_length ();
01845 
01846   // paranoid check
01847   if (header_length > message_block.space ())
01848     {
01849       return -1;
01850     }
01851 
01852   if (this->orb_core_->orb_params ()->single_read_optimization ())
01853     {
01854       recv_size = message_block.space ();
01855     }
01856   else
01857     {
01858       // Single read optimization has been de-activated. That means
01859       // that we need to read from transport the GIOP header first
01860       // before the payload. This codes first checks the incoming
01861       // stack for partial messages which needs to be
01862       // consolidated. Otherwise we are in new cycle, reading complete
01863       // GIOP header of new incoming message.
01864       if (this->incoming_message_stack_.top (q_data) != -1
01865            && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
01866         {
01867           // There is a partial message on incoming_message_stack_
01868           // whose length is unknown so far. We need to consolidate
01869           // the GIOP header to get to know the payload size,
01870           recv_size = header_length - q_data->msg_block ()->length ();
01871         }
01872       else
01873         {
01874           // Read amount of data forming GIOP header of new incoming
01875           // message.
01876           recv_size = header_length;
01877         }
01878       // POST: 0 <= recv_size <= header_length
01879     }
01880   // POST: 0 <= recv_size <= message_block->space ()
01881 
01882   // If we have a partial message, copy it into our message block and
01883   // clear out the partial message.
01884   if (this->partial_message_ != 0 && this->partial_message_->length () > 0)
01885     {
01886       // (*) Copy back the partial message into current read-buffer,
01887       // verify that the read-strategy of "recv_size" bytes is not
01888       // exceeded. The latter check guarantees that recv_size does not
01889       // roll-over and keeps in range
01890       // 0<=recv_size<=message_block->space()
01891       if (this->partial_message_->length () <= recv_size &&
01892           message_block.copy (this->partial_message_->rd_ptr (),
01893                               this->partial_message_->length ()) == 0)
01894         {
01895 
01896           recv_size -= this->partial_message_->length ();
01897           this->partial_message_->reset ();
01898         }
01899       else
01900         {
01901           return -1;
01902         }
01903     }
01904   // POST: 0 <= recv_size <= buffer_space
01905 
01906   if (0 >= recv_size) // paranoid: the check above (*) guarantees recv_size>=0
01907     {
01908       // This event would cause endless looping, trying frequently to
01909       // read zero bytes from stream.  This might happen, if TAOs
01910       // protocol implementation is not correct and tries to read data
01911       // beyond header without "single_read_optimazation" being
01912       // activated.
01913       if (TAO_debug_level > 0)
01914         {
01915           ACE_ERROR ((LM_ERROR,
01916              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01917              ACE_TEXT ("Error - endless loop detection, closing connection"),
01918              this->id ()));
01919         }
01920       return -1;
01921     }
01922 
01923   // Saving the size of the received buffer in case any one needs to
01924   // get the size of the message thats received in the
01925   // context. Obviously the value will be changed for each recv call
01926   // and the user is supposed to invoke the accessor only in the
01927   // invocation context to get meaningful information.
01928   this->recv_buffer_size_ = recv_size;
01929 
01930   // Read the message into the  message block that we have created on
01931   // the stack.
01932   ssize_t const n = this->recv (message_block.wr_ptr (),
01933                                 recv_size,
01934                                 max_wait_time);
01935 
01936   // If there is an error return to the reactor..
01937   if (n <= 0)
01938     {
01939       return n;
01940     }
01941 
01942   if (TAO_debug_level > 3)
01943     {
01944       ACE_DEBUG ((LM_DEBUG,
01945          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01946          ACE_TEXT ("read %d bytes\n"),
01947          this->id (), n));
01948     }
01949 
01950   // Set the write pointer in the stack buffer
01951   message_block.wr_ptr (n);
01952 
01953   //
01954   // STACK PROCESSING OR MESSAGE CONSOLIDATION
01955   //
01956 
01957   // PRE: data in buffer is aligned && message_block.length() > 0
01958 
01959   if (this->incoming_message_stack_.top (q_data) != -1
01960       && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
01961     {
01962       //
01963       // MESSAGE CONSOLIDATION
01964       //
01965 
01966       // Partial message on incoming_message_stack_ needs to be
01967       // consolidated.  The message header could not be parsed so far
01968       // and therefor the message size is unknown yet. Consolidating
01969       // the message destroys the memory alignment of succeeding
01970       // messages sharing the buffer, for that reason consolidation
01971       // and stack based processing are mutial exclusive.
01972       if (this->messaging_object ()->consolidate_node (q_data,
01973                                                        message_block) == -1)
01974         {
01975            if (TAO_debug_level > 0)
01976             {
01977                 ACE_ERROR ((LM_ERROR,
01978                    ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01979                    ACE_TEXT ("error consolidating message from input buffer\n"),
01980                    this->id () ));
01981              }
01982            return -1;
01983         }
01984 
01985       // Complete message are to be enqueued and later processed
01986       if (q_data->missing_data () == 0)
01987         {
01988           if (this->incoming_message_stack_.pop (q_data) == -1)
01989             {
01990               return -1;
01991             }
01992 
01993           if (this->consolidate_enqueue_message (q_data) == -1)
01994             {
01995               return -1;
01996             }
01997         }
01998 
01999       if (message_block.length () > 0
02000           && this->handle_input_parse_extra_messages (message_block) == -1)
02001         {
02002           return -1;
02003         }
02004 
02005       // In any case try to process the enqueued messages
02006       if (this->process_queue_head (rh) == -1)
02007         {
02008           return -1;
02009         }
02010     }
02011   else
02012     {
02013       //
02014       // STACK PROCESSING (critical path)
02015       //
02016 
02017       // Process the first message in buffer on stack
02018 
02019       // (PRE: first message resides in aligned memory) Make a node of
02020       // the message-block..
02021 
02022       TAO_Queued_Data qd (&message_block,
02023                           this->orb_core_->transport_message_buffer_allocator ());
02024 
02025       size_t mesg_length  = 0;
02026 
02027       if (this->messaging_object ()->parse_next_message (qd,
02028                                                          mesg_length) == -1
02029           || (qd.missing_data () == 0
02030               && mesg_length > message_block.length ()) )
02031         {
02032           // extracting message failed
02033           return -1;
02034         }
02035       // POST: qd.missing_data_ == 0 --> mesg_length <= message_block.length()
02036       // This prevents seeking rd_ptr behind the wr_ptr
02037 
02038       if (qd.missing_data () != 0 ||
02039           qd.more_fragments ()   ||
02040           qd.msg_type () == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
02041         {
02042           if (qd.missing_data () == 0)
02043             {
02044               // Dealing with a fragment
02045               TAO_Queued_Data *nqd =
02046                 TAO_Queued_Data::duplicate (qd);
02047 
02048               if (nqd == 0)
02049                 {
02050                   return -1;
02051                 }
02052 
02053               // mark the end of message in new buffer
02054               char* end_mark = nqd->msg_block ()->rd_ptr ()
02055                              + mesg_length;
02056               nqd->msg_block ()->wr_ptr (end_mark);
02057 
02058               // move the read pointer forward in old buffer
02059               message_block.rd_ptr (mesg_length);
02060 
02061               // enqueue the message
02062               if (this->consolidate_enqueue_message (nqd) == -1)
02063                 {
02064                   return -1;
02065                 }
02066 
02067               if (message_block.length () > 0
02068                   && this->handle_input_parse_extra_messages (message_block) == -1)
02069                 {
02070                   return -1;
02071                 }
02072 
02073               // In any case try to process the enqueued messages
02074               if (this->process_queue_head (rh) == -1)
02075                 {
02076                   return -1;
02077                 }
02078             }
02079           else if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED)
02080             {
02081               // Incomplete message, must be the last one in buffer
02082 
02083               if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED &&
02084                   qd.missing_data () > message_block.space ())
02085                 {
02086                   // Re-Allocate correct size on heap
02087                   if (ACE_CDR::grow (qd.msg_block (),
02088                                      message_block.length ()
02089                                      + qd.missing_data ()) == -1)
02090                     {
02091                       return -1;
02092                     }
02093                 }
02094 
02095               TAO_Queued_Data *nqd =
02096                 TAO_Queued_Data::duplicate (qd);
02097 
02098               if (nqd == 0)
02099                 {
02100                   return -1;
02101                 }
02102 
02103               // move read-pointer to end of buffer
02104               message_block.rd_ptr (message_block.length());
02105 
02106               this->incoming_message_stack_.push (nqd);
02107             }
02108         }
02109       else
02110         {
02111           //
02112           // critical path
02113           //
02114 
02115           // We cant process the message on stack right now. First we
02116           // have got to parse extra messages from message_block,
02117           // putting them into queue.  When this is done we can return
02118           // to process this message, and notifying other threads to
02119           // process the messages in queue.
02120 
02121           char * end_marker = message_block.rd_ptr ()
02122                             + mesg_length;
02123 
02124           if (message_block.length () > mesg_length)
02125             {
02126               // There are more message in data stream to be parsed.
02127               // Safe the rd_ptr to restore later.
02128               char *rd_ptr_stack_mesg = message_block.rd_ptr ();
02129 
02130               // Skip parsed message, jump to next message in buffer
02131               // PRE: mesg_length <= message_block.length ()
02132               message_block.rd_ptr (mesg_length);
02133 
02134               // Extract remaining messages and enqueue them for later
02135               // heap processing
02136               if (this->handle_input_parse_extra_messages (message_block) == -1)
02137                 {
02138                   return -1;
02139                 }
02140 
02141               // correct the end_marker
02142               end_marker = message_block.rd_ptr ();
02143 
02144               // Restore rd_ptr
02145               message_block.rd_ptr (rd_ptr_stack_mesg);
02146             }
02147 
02148           // The following if-else has been copied from
02149           // process_queue_head().  While process_queue_head()
02150           // processes message on heap, here we will process a message
02151           // on stack.
02152 
02153           // Now that we have one message on stack to be processed,
02154           // check whether we have one more message in the queue...
02155           if (this->incoming_message_queue_.queue_length () > 0)
02156             {
02157               if (TAO_debug_level > 0)
02158                 {
02159                   ACE_DEBUG ((LM_DEBUG,
02160                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02161                      ACE_TEXT ("notify reactor\n"),
02162                      this->id ()));
02163 
02164                 }
02165 
02166               const int retval = this->notify_reactor ();
02167 
02168               if (retval == 1)
02169                 {
02170                   // Let the class know that it doesn't need to resume  the
02171                   // handle..
02172                   rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02173                 }
02174               else if (retval < 0)
02175                 return -1;
02176             }
02177           else
02178             {
02179               // As there are no further messages in queue just resume
02180               // the handle. Set the flag incase someone had reset the flag..
02181               rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02182             }
02183 
02184           // PRE: incoming_message_queue is empty
02185           if (this->process_parsed_messages (&qd,
02186                                              rh) == -1)
02187             {
02188               return -1;
02189             }
02190 
02191           // move the rd_ptr tp position of end_marker
02192           message_block.rd_ptr (end_marker);
02193         }
02194     }
02195 
02196   // Now that all cases have been processed, there might be kept some data
02197   // in buffer that needs to be safed for next "handle_input" invocations.
02198    if (message_block.length () > 0)
02199      {
02200        if (this->partial_message_ == 0)
02201          {
02202            this->allocate_partial_message_block ();
02203          }
02204 
02205        if (this->partial_message_ != 0 &&
02206            this->partial_message_->copy (message_block.rd_ptr (),
02207                                          message_block.length ()) == 0)
02208          {
02209            message_block.rd_ptr (message_block.length ());
02210          }
02211        else
02212          {
02213            return -1;
02214          }
02215      }
02216 
02217    return 0;
02218 }

int TAO_Transport::handle_input_parse_extra_messages ACE_Message_Block message_block  )  [private]
 

Is invoked by handle_input_parse_data. Parses all messages remaining in message_block.

Definition at line 1752 of file Transport.cpp.

References consolidate_enqueue_message(), TAO_Pluggable_Messaging::extract_next_message(), incoming_message_stack_, ACE_Message_Block::length(), messaging_object(), TAO_Queued_Data::missing_data(), and TAO::Incoming_Message_Stack::push().

Referenced by handle_input_parse_data().

01753 {
01754 
01755   // store buffer status of last extraction: -1 parse error, 0
01756   // incomplete message header in buffer, 1 complete messages header
01757   // parsed
01758   int buf_status = 0;
01759 
01760   TAO_Queued_Data *q_data = 0;     // init
01761 
01762   // parse buffer until all messages have been extracted, consolidate
01763   // and enqueue complete messages, if the last message being parsed
01764   // has missin data, it is stays on top of incoming_message_stack.
01765   while (message_block.length () > 0 &&
01766          (buf_status = this->messaging_object ()->extract_next_message
01767           (message_block, q_data)) != -1 &&
01768          q_data != 0) // paranoid check
01769     {
01770       if (q_data->missing_data () == 0)
01771         {
01772           if (this->consolidate_enqueue_message (q_data) == -1)
01773             {
01774               return -1;
01775             }
01776         }
01777       else  // incomplete message read, probably the last message in buffer
01778         {
01779           // can not fail
01780           this->incoming_message_stack_.push (q_data);
01781         }
01782 
01783       q_data = 0; // reset
01784     } // while
01785 
01786   if (buf_status == -1)
01787     {
01788       return -1;
01789     }
01790 
01791   return 0;
01792 }

int TAO_Transport::handle_output void   ) 
 

Callback method to reactively drain the outgoing data queue.

Definition at line 471 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, drain_queue(), LM_DEBUG, and TAO_debug_level.

Referenced by TAO_Block_Flushing_Strategy::flush_message(), TAO_Block_Flushing_Strategy::flush_transport(), and TAO_Connection_Handler::handle_output_eh().

00472 {
00473   if (TAO_debug_level > 3)
00474     {
00475       ACE_DEBUG ((LM_DEBUG,
00476                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output\n"),
00477                   this->id ()));
00478     }
00479 
00480   // The flushing strategy (potentially via the Reactor) wants to send
00481   // more data, first check if there is a current message that needs
00482   // more sending...
00483   int const retval = this->drain_queue ();
00484 
00485   if (TAO_debug_level > 3)
00486     {
00487       ACE_DEBUG ((LM_DEBUG,
00488                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ")
00489                   ACE_TEXT ("drain_queue returns %d/%d\n"),
00490                   this->id (),
00491                   retval, errno));
00492     }
00493 
00494   // Any errors are returned directly to the Reactor
00495   return retval;
00496 }

int TAO_Transport::handle_timeout const ACE_Time_Value current_time,
const void *  act
 

Parameters:
current_time The current time as reported from the Reactor
act The Asynchronous Completion Token. Currently it is interpreted as follows:
  • If the ACT is the address of this->current_deadline_ the queueing timeout has expired and the queue should start flushing.
Returns:
Returns 0 if there are no problems, -1 if there is an error

Todo:
In the future this function could be used to expire messages (oneways) that have been sitting for too long on the queue.

Definition at line 810 of file Transport.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT, current_deadline_, flush_timer_pending(), TAO_Flushing_Strategy::flush_transport(), TAO_ORB_Core::flushing_strategy(), LM_DEBUG, orb_core(), reset_flush_timer(), TAO_Flushing_Strategy::schedule_output(), and TAO_debug_level.

Referenced by TAO_Transport_Timer::handle_timeout().

00812 {
00813   if (TAO_debug_level > 6)
00814     {
00815       ACE_DEBUG ((LM_DEBUG,
00816          ACE_TEXT ("TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, ")
00817          ACE_TEXT ("timer expired\n"),
00818          this->id ()));
00819     }
00820 
00821   /// This is the only legal ACT in the current configuration....
00822   if (act != &this->current_deadline_)
00823     {
00824       return -1;
00825     }
00826 
00827   if (this->flush_timer_pending ())
00828     {
00829       // The timer is always a oneshot timer, so mark is as not
00830       // pending.
00831       this->reset_flush_timer ();
00832 
00833       TAO_Flushing_Strategy *flushing_strategy =
00834         this->orb_core ()->flushing_strategy ();
00835       int const result = flushing_strategy->schedule_output (this);
00836       if (result == TAO_Flushing_Strategy::MUST_FLUSH)
00837         {
00838           typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00839           TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00840           ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00841           (void) flushing_strategy->flush_transport (this);
00842         }
00843     }
00844 
00845   return 0;
00846 }

ACE_INLINE void TAO_Transport::id size_t  id  ) 
 

Definition at line 92 of file Transport.inl.

00093 {
00094   this->id_ = id;
00095 }

ACE_INLINE size_t TAO_Transport::id void   )  const
 

Set and Get the identifier for this transport instance.

If not set, this will return an integer representation of the this pointer for the instance on which it's called.

Definition at line 86 of file Transport.inl.

Referenced by TAO::Transport_Cache_Manager::bind_i(), TAO_Connection_Handler::close_connection_eh(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Transport_Cache_Manager::find_transport(), TAO_Connection_Handler::handle_input_internal(), TAO_Connector::parallel_connect(), TAO_GIOP_Message_Base::process_reply_message(), TAO::Transport_Cache_Manager::purge(), TAO_GIOP_Message_Base::send_close_connection(), TAO_GIOP_Message_Base::send_error(), TAO_Connector::wait_for_connection_completion(), and TAO_Leader_Follower::wait_for_event().

00087 {
00088   return this->id_;
00089 }

bool TAO_Transport::idle_after_reply void   ) 
 

Request is sent and the reply is received. Idle the transport now.

Definition at line 258 of file Transport.cpp.

References TAO_Transport_Mux_Strategy::idle_after_reply(), and tms().

Referenced by TAO::LocateRequest_Invocation::invoke(), and TAO::Synch_Twoway_Invocation::remote_twoway().

00259 {
00260   return this->tms ()->idle_after_reply ();
00261 }

bool TAO_Transport::idle_after_send void   ) 
 

Request has been just sent, but the reply is not received. Idle the transport now.

Definition at line 252 of file Transport.cpp.

References TAO_Transport_Mux_Strategy::idle_after_send(), and tms().

Referenced by TAO::LocateRequest_Invocation::invoke(), and TAO::Synch_Twoway_Invocation::remote_twoway().

00253 {
00254   return this->tms ()->idle_after_send ();
00255 }

ACE_INLINE bool TAO_Transport::is_connected void   )  const
 

Is this transport really connected.

Definition at line 164 of file Transport.inl.

References ACE_GUARD_RETURN.

Referenced by TAO::Cache_IntId::Cache_IntId(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Synch_Oneway_Invocation::remote_oneway(), and TAO_Connector::wait_for_connection_completion().

00165 {
00166   ACE_GUARD_RETURN (ACE_Lock,
00167                     ace_mon,
00168                     *this->handler_lock_,
00169                     false);
00170 
00171   return this->is_connected_;
00172 }

ACE_INLINE CORBA::Boolean TAO_Transport::is_tcs_set  )  const
 

Return true if the tcs has been set.

Definition at line 152 of file Transport.inl.

References tcs_set_.

00153 {
00154   return tcs_set_;
00155 }

int TAO_Transport::make_idle void   ) 
 

Cache management.

Definition at line 447 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, LM_DEBUG, TAO::Transport_Cache_Manager::make_idle(), TAO_debug_level, and transport_cache_manager().

Referenced by TAO_Exclusive_TMS::idle_after_reply(), TAO_Muxed_TMS::idle_after_send(), and TAO_IIOP_Connection_Handler::process_listen_point_list().

00448 {
00449   if (TAO_debug_level > 3)
00450     {
00451       ACE_DEBUG ((LM_DEBUG,
00452                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"),
00453                   this->id ()));
00454     }
00455 
00456   return this->transport_cache_manager ().make_idle (this->cache_map_entry_);
00457 }

virtual int TAO_Transport::messaging_init CORBA::Octet  major,
CORBA::Octet  minor
[pure virtual]
 

Initialising the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects.

Implemented in TAO_IIOP_Transport.

virtual TAO_Pluggable_Messaging* TAO_Transport::messaging_object void   )  [pure virtual]
 

Return the messaging object that is used to format the data that needs to be sent.

Implemented in TAO_IIOP_Transport.

Referenced by allocate_partial_message_block(), consolidate_enqueue_message(), consolidate_process_message(), format_queue_message(), TAO_On_Demand_Fragmentation_Strategy::fragment(), generate_locate_request(), generate_request_header(), handle_input_parse_data(), handle_input_parse_extra_messages(), out_stream(), process_parsed_messages(), and send_connection_closed_notifications_i().

int TAO_Transport::notify_reactor void   )  [private]
 

Definition at line 2410 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, event_handler_i(), TAO_Wait_Strategy::is_registered(), LM_DEBUG, ACE_Reactor::notify(), orb_core(), TAO_ORB_Core::reactor(), TAO_debug_level, and ws_.

Referenced by handle_input_parse_data(), and process_queue_head().

02411 {
02412   if (!this->ws_->is_registered ())
02413     {
02414       return 0;
02415     }
02416 
02417   ACE_Event_Handler *eh = this->event_handler_i ();
02418 
02419   // Get the reactor associated with the event handler
02420   ACE_Reactor *reactor = this->orb_core ()->reactor ();
02421 
02422   if (TAO_debug_level > 0)
02423     {
02424       ACE_DEBUG ((LM_DEBUG,
02425          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02426          ACE_TEXT ("notify to Reactor\n"),
02427          this->id ()));
02428     }
02429 
02430 
02431   // Send a notification to the reactor...
02432   int const retval = reactor->notify (eh, ACE_Event_Handler::READ_MASK);
02433 
02434   if (retval < 0 && TAO_debug_level > 2)
02435     {
02436       // @todo: need to think about what is the action that
02437       // we can take when we get here.
02438       ACE_DEBUG ((LM_DEBUG,
02439          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02440          ACE_TEXT ("notify to the reactor failed..\n"),
02441          this->id ()));
02442     }
02443 
02444   return 1;
02445 }

ACE_INLINE void TAO_Transport::opened_as TAO::Connection_Role   ) 
 

Definition at line 51 of file Transport.inl.

References opening_connection_role_.

00052 {
00053   this->opening_connection_role_ = role;
00054 }

ACE_INLINE TAO::Connection_Role TAO_Transport::opened_as void   )  const
 

Methods dealing with the role of the connection, e.g., CLIENT or SERVER. See CORBA 2.6 Specification, Section 15.5.1 for origin of definitions.

Definition at line 45 of file Transport.inl.

References opening_connection_role_.

Referenced by TAO::Wait_On_LF_No_Upcall::can_process_upcalls(), TAO_Connector::connect(), and TAO_IIOP_Connection_Handler::open().

00046 {
00047   return this->opening_connection_role_;
00048 }

void TAO_Transport::operator= const TAO_Transport  )  [private]
 

ACE_INLINE TAO_ORB_Core * TAO_Transport::orb_core void   )  const
 

Access the ORB that owns this connection.

Definition at line 14 of file Transport.inl.

Referenced by TAO::Wait_On_LF_No_Upcall::can_process_upcalls(), TAO_Connection_Handler::close_connection_eh(), drain_queue(), TAO::Transport_Cache_Manager::find_transport(), TAO_Reactive_Flushing_Strategy::flush_message(), TAO_Leader_Follower_Flushing_Strategy::flush_message(), TAO_Reactive_Flushing_Strategy::flush_transport(), TAO_Leader_Follower_Flushing_Strategy::flush_transport(), generate_request_header(), TAO_IIOP_Transport::generate_request_header(), handle_timeout(), TAO::Nested_Upcall_Guard::Nested_Upcall_Guard(), notify_reactor(), send_asynchronous_message_i(), send_reply_message_i(), send_synchronous_message_i(), TAO_IIOP_Transport::set_bidir_context_info(), TAO_Wait_On_Read::wait(), TAO_Wait_On_Reactor::wait(), TAO_Wait_On_Leader_Follower::wait(), and TAO::Nested_Upcall_Guard::~Nested_Upcall_Guard().

00015 {
00016   return this->orb_core_;
00017 }

TAO_OutputCDR & TAO_Transport::out_stream void   ) 
 

Accessor for the output CDR stream.

Definition at line 2496 of file Transport.cpp.

References messaging_object(), and TAO_Pluggable_Messaging::out_stream().

Referenced by TAO::LocateRequest_Invocation::invoke(), TAO::Synch_Oneway_Invocation::remote_oneway(), and TAO::Synch_Twoway_Invocation::remote_twoway().

02497 {
02498   return this->messaging_object ()->out_stream ();
02499 }

bool TAO_Transport::post_connect_hook void   )  [virtual]
 

Hooks that can be overridden in concrete transports.

These hooks are invoked just after connection establishment (or after a connection is fetched from cache). The return value signifies whether the invoker should proceed with post connection establishment activities. Protocols like SSLIOP need this to verify whether connections already established have valid certificates. There are no pre_connect_hooks () since the transport doesn't exist before a connection establishment. :-)

Note:
The methods are not made const with a reason.

Definition at line 306 of file Transport.cpp.

Referenced by TAO_Connector::connect().

00307 {
00308   return true;
00309 }

bool TAO_Transport::post_open size_t  id  ) 
 

Perform all the actions when this transport get opened.

Definition at line 2513 of file Transport.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT, close_connection(), LM_ERROR, purge_entry(), queue_is_empty_i(), TAO_Wait_Strategy::register_handler(), TAO_debug_level, and wait_strategy().

Referenced by TAO_IIOP_Connection_Handler::open().

02514 {
02515   this->id_ = id;
02516 
02517   {
02518     ACE_GUARD_RETURN (ACE_Lock,
02519                       ace_mon,
02520                       *this->handler_lock_,
02521                       false);
02522     this->is_connected_ = true;
02523   }
02524 
02525   // When we have data in our outgoing queue schedule ourselves
02526   // for output
02527   if (this->queue_is_empty_i ())
02528     return true;
02529 
02530   // If the wait strategy wants us to be registered with the reactor
02531   // then we do so. If registeration is required and it succeeds,
02532   // #REFCOUNT# becomes two.
02533   if (this->wait_strategy ()->register_handler () != 0)
02534     {
02535       // Registration failures.
02536 
02537       // Purge from the connection cache, if we are not in the cache, this
02538       // just does nothing.
02539       (void) this->purge_entry ();
02540 
02541       // Close the handler.
02542       (void) this->close_connection ();
02543 
02544       if (TAO_debug_level > 0)
02545         ACE_ERROR ((LM_ERROR,
02546            ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_connect , ")
02547            ACE_TEXT ("could not register the transport ")
02548            ACE_TEXT ("in the reactor.\n"),
02549            this->id ()));
02550 
02551       return false;
02552     }
02553 
02554   return true;
02555 }

void TAO_Transport::pre_close void   ) 
 

do what needs to be done when closing the transport

Definition at line 2502 of file Transport.cpp.

References ACE_GUARD, cleanup_queue_i(), and purge_entry().

Referenced by TAO_Connection_Handler::close_connection_eh().

02503 {
02504   this->is_connected_ = false;
02505   this->purge_entry ();
02506   {
02507     ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
02508     this->cleanup_queue_i ();
02509   }
02510 }

int TAO_Transport::process_parsed_messages TAO_Queued_Data qd,
TAO_Resume_Handle rh
[protected]
 

Process the message by sending it to the higher layers of the ORB.

Definition at line 2222 of file Transport.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT, TAO_Pluggable_Messaging::discard_fragmented_message(), ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO::Transport::Stats::messages_received(), messaging_object(), TAO_Queued_Data::missing_data(), TAO_Queued_Data::msg_block(), TAO_Queued_Data::msg_type(), TAO_Pluggable_Messaging::process_reply_message(), TAO_Pluggable_Messaging::process_request_message(), TAO_Resume_Handle::resume_handle(), stats_, TAO_debug_level, TAO_PLUGGABLE_MESSAGE_CANCELREQUEST, TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION, TAO_PLUGGABLE_MESSAGE_FRAGMENT, TAO_PLUGGABLE_MESSAGE_LOCATEREPLY, TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST, TAO_PLUGGABLE_MESSAGE_MESSAGERROR, TAO_PLUGGABLE_MESSAGE_REPLY, and TAO_PLUGGABLE_MESSAGE_REQUEST.

Referenced by consolidate_process_message(), handle_input_parse_data(), and process_queue_head().

02224 {
02225   if (TAO_debug_level > 7)
02226     {
02227       ACE_DEBUG ((LM_DEBUG,
02228          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02229          ACE_TEXT ("entering (missing data == %d)\n"),
02230          this->id(), qd->missing_data ()));
02231     }
02232 
02233 #if TAO_HAS_TRANSPORT_CURRENT == 1
02234   // Update stats, if any
02235   if (this->stats_ != 0)
02236     this->stats_->messages_received (qd->msg_block ()->length ());
02237 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
02238 
02239   switch (qd->msg_type ())
02240   {
02241     case TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION:
02242     {
02243       if (TAO_debug_level > 0)
02244         ACE_DEBUG ((LM_DEBUG,
02245            ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02246            ACE_TEXT ("received CloseConnection message - %m\n"),
02247            this->id()));
02248 
02249       // Return a "-1" so that the next stage can take care of
02250       // closing connection and the necessary memory management.
02251       return -1;
02252     }
02253     break;
02254     case TAO_PLUGGABLE_MESSAGE_REQUEST:
02255     case TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST:
02256     {
02257       // Let us resume the handle before we go ahead to process the
02258       // request. This will open up the handle for other threads.
02259       rh.resume_handle ();
02260 
02261       if (this->messaging_object ()->process_request_message (
02262             this,
02263             qd) == -1)
02264         {
02265           // Return a "-1" so that the next stage can take care of
02266           // closing connection and the necessary memory management.
02267           return -1;
02268         }
02269     }
02270     break;
02271     case TAO_PLUGGABLE_MESSAGE_REPLY:
02272     case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY:
02273     {
02274       rh.resume_handle ();
02275 
02276       TAO_Pluggable_Reply_Params params (this);
02277 
02278       if (this->messaging_object ()->process_reply_message (params,
02279                                                             qd) == -1)
02280         {
02281           if (TAO_debug_level > 0)
02282             ACE_DEBUG ((LM_DEBUG,
02283                ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02284                ACE_TEXT ("error in process_reply_message - %m\n"),
02285                this->id ()));
02286 
02287           return -1;
02288         }
02289 
02290     }
02291     break;
02292     case TAO_PLUGGABLE_MESSAGE_CANCELREQUEST:
02293     {
02294       // The associated request might be incomplete residing
02295       // fragmented in messaging object. We must make sure the
02296       // resources allocated by fragments are released.
02297       if (this->messaging_object ()->discard_fragmented_message (qd) == -1)
02298         {
02299           if (TAO_debug_level > 0)
02300             {
02301               ACE_ERROR ((LM_ERROR,
02302                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02303                  ACE_TEXT ("error processing CancelRequest\n"),
02304                  this->id ()));
02305             }
02306         }
02307 
02308       // We are not able to cancel requests being processed already;
02309       // this is declared as optional feature by CORBA, and TAO does
02310       // not support this currently.
02311 
02312       // Just continue processing, CancelRequest does not mean to cut
02313       // off the connection.
02314     }
02315     break;
02316     case TAO_PLUGGABLE_MESSAGE_MESSAGERROR:
02317     {
02318       if (TAO_debug_level > 0)
02319         {
02320           ACE_ERROR ((LM_ERROR,
02321              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02322              ACE_TEXT ("received MessageError, closing connection\n"),
02323              this->id ()));
02324         }
02325       return -1;
02326     }
02327     break;
02328     case TAO_PLUGGABLE_MESSAGE_FRAGMENT:
02329     {
02330       // Nothing to be done.
02331     }
02332     break;
02333   }
02334 
02335   // If not, just return back..
02336   return 0;
02337 }

int TAO_Transport::process_queue_head TAO_Resume_Handle rh  )  [private]
 

Definition at line 2340 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, TAO_Incoming_Message_Queue::dequeue_head(), incoming_message_queue_, LM_DEBUG, notify_reactor(), process_parsed_messages(), TAO_Incoming_Message_Queue::queue_length(), TAO_Queued_Data::release(), TAO_Resume_Handle::set_flag(), and TAO_debug_level.

Referenced by handle_input(), and handle_input_parse_data().

02341 {
02342   if (TAO_debug_level > 3)
02343     {
02344       ACE_DEBUG ((LM_DEBUG,
02345          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"),
02346          this->id (), this->incoming_message_queue_.queue_length () ));
02347     }
02348 
02349   // See if  message in queue ...
02350   if (this->incoming_message_queue_.queue_length () > 0)
02351     {
02352       // Get the message on the head of the queue..
02353       TAO_Queued_Data *qd =
02354         this->incoming_message_queue_.dequeue_head ();
02355 
02356       if (TAO_debug_level > 3)
02357         {
02358           ACE_DEBUG ((LM_DEBUG,
02359              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02360              ACE_TEXT ("the size of the queue is [%d]\n"),
02361              this->id (),
02362              this->incoming_message_queue_.queue_length()));
02363         }
02364       // Now that we have pulled out out one message out of the queue,
02365       // check whether we have one more message in the queue...
02366       if (this->incoming_message_queue_.queue_length () > 0)
02367         {
02368           if (TAO_debug_level > 0)
02369             {
02370               ACE_DEBUG ((LM_DEBUG,
02371                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02372                  ACE_TEXT ("notify reactor\n"),
02373                  this->id ()));
02374             }
02375 
02376           int const retval = this->notify_reactor ();
02377 
02378           if (retval == 1)
02379             {
02380               // Let the class know that it doesn't need to resume  the
02381               // handle..
02382               rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02383             }
02384           else if (retval < 0)
02385             return -1;
02386         }
02387       else
02388         {
02389           // As we are ready to process the last message just resume
02390           // the handle. Set the flag incase someone had reset the flag..
02391           rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02392         }
02393 
02394       // Process the message...
02395       if (this->process_parsed_messages (qd, rh) == -1)
02396         {
02397           return -1;
02398         }
02399 
02400       // Delete the Queued_Data..
02401       TAO_Queued_Data::release (qd);
02402 
02403       return 0;
02404     }
02405 
02406   return 1;
02407 }

bool TAO_Transport::provide_blockable_handler TAO::Connection_Handler_Set handlers  ) 
 

Called by the cache when the ORB is shuting down.

Parameters:
handlers The TAO_Connection_Handler_Set into which the transport should place its handler if the transport has RW strategy on.
Returns:
true indicates a handler was added to the handler set. false indocates that the transport did not have a blockable handler that could be added.

Definition at line 238 of file Transport.cpp.

References add_reference(), TAO::Connection_Handler_Set, ACE_Unbounded_Set< T >::insert(), TAO_Wait_Strategy::non_blocking(), opening_connection_role_, and ws_.

00239 {
00240   if (this->ws_->non_blocking () ||
00241       this->opening_connection_role_ == TAO::TAO_SERVER_ROLE)
00242     return false;
00243 
00244   (void) this->add_reference ();
00245 
00246   h.insert (this->connection_handler_i ());
00247 
00248   return true;
00249 }

void TAO_Transport::provide_handler TAO::Connection_Handler_Set handlers  ) 
 

Added event handler to the handlers set.

Called by the cache when the cache is closing.

Parameters:
handlers The TAO_Connection_Handler_Set into which the transport should place its handler

Definition at line 230 of file Transport.cpp.

References add_reference(), TAO::Connection_Handler_Set, and ACE_Unbounded_Set< T >::insert().

00231 {
00232   (void) this->add_reference ();
00233 
00234   handlers.insert (this->connection_handler_i ());
00235 }

int TAO_Transport::purge_entry void   ) 
 

Cache management.

Definition at line 441 of file Transport.cpp.

References TAO::Transport_Cache_Manager::purge_entry(), and transport_cache_manager().

Referenced by TAO_Connection_Handler::close_handler(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), post_open(), pre_close(), recache_transport(), ~TAO_Transport(), and TransportCleanupGuard::~TransportCleanupGuard().

00442 {
00443   return this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
00444 }

ACE_INLINE void TAO_Transport::purging_order unsigned long  value  ) 
 

Definition at line 76 of file Transport.inl.

References purging_order_.

00077 {
00078   // This should only be called by the Transport Cache Manager when
00079   // it is holding it's lock.
00080   // The transport should still be here since the cache manager still
00081   // has a reference to it.
00082   this->purging_order_ = value;
00083 }

ACE_INLINE unsigned long TAO_Transport::purging_order void   )  const
 

Get and Set the purging order. The purging strategy uses the set version to set the purging order.

Definition at line 70 of file Transport.inl.

References purging_order_.

Referenced by TAO_LRU_Connection_Purging_Strategy::update_item().

00071 {
00072   return this->purging_order_;
00073 }

ACE_INLINE bool TAO_Transport::queue_is_empty void   ) 
 

Check if there are messages pending in the queue.

Returns:
true if the queue is empty

Definition at line 98 of file Transport.inl.

References ACE_GUARD_RETURN, and queue_is_empty_i().

Referenced by TAO_Reactive_Flushing_Strategy::flush_transport(), TAO_Leader_Follower_Flushing_Strategy::flush_transport(), and TAO_Block_Flushing_Strategy::flush_transport().

00099 {
00100   ACE_GUARD_RETURN (ACE_Lock,
00101                     ace_mon,
00102                     *this->handler_lock_,
00103                     false);
00104   return this->queue_is_empty_i ();
00105 }

bool TAO_Transport::queue_is_empty_i void   )  [private]
 

Check if there are messages pending in the queue.

This version assumes that the lock is already held. Use with care!

Returns:
true if the queue is empty

Definition at line 747 of file Transport.cpp.

Referenced by post_open(), and queue_is_empty().

00748 {
00749   return (this->head_ == 0);
00750 }

int TAO_Transport::queue_message_i const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time
[protected]
 

Queue a message for message_block

Parameters:
max_wait_time The maximum time that the operation can block, used in the implementation of timeouts.

Definition at line 1432 of file Transport.cpp.

References ACE_NEW_RETURN, and TAO_Queued_Message::push_back().

Referenced by format_queue_message(), and send_asynchronous_message_i().

01434 {
01435   TAO_Queued_Message *queued_message = 0;
01436   ACE_NEW_RETURN (queued_message,
01437                   TAO_Asynch_Queued_Message (message_block,
01438                                              this->orb_core_,
01439                                              max_wait_time,
01440                                              0,
01441                                              true),
01442                   -1);
01443   queued_message->push_back (this->head_, this->tail_);
01444 
01445   return 0;
01446 }

int TAO_Transport::recache_transport TAO_Transport_Descriptor_Interface desc  ) 
 

Recache ourselves in the cache.

Todo:
Ideally the following should be inline.

purge_entry has a return value, use it

Definition at line 430 of file Transport.cpp.

References TAO::Transport_Cache_Manager::cache_transport(), purge_entry(), and transport_cache_manager().

Referenced by TAO_IIOP_Connection_Handler::process_listen_point_list().

00431 {
00432   // First purge our entry
00433   this->purge_entry ();
00434 
00435   // Then add ourselves to the cache
00436   return this->transport_cache_manager ().cache_transport (desc,
00437                                                            this);
00438 }

virtual ssize_t TAO_Transport::recv char *  buffer,
size_t  len,
const ACE_Time_Value timeout = 0
[pure virtual]
 

Read len bytes from into buf.

This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.

Parameters:
buffer ORB allocated buffer where the data should be @ The ACE_Time_Value *s is just a place holder for now. It is not clear this this is the best place to specify this. The actual timeout values will be kept in the Policies.

Implemented in TAO_IIOP_Transport.

Referenced by handle_input_missing_data(), and handle_input_parse_data().

ACE_INLINE size_t TAO_Transport::recv_buffer_size void   )  const
 

Accessor to recv_buffer_size_.

Definition at line 181 of file Transport.inl.

00182 {
00183   return this->recv_buffer_size_;
00184 }

int TAO_Transport::register_handler void   )  [virtual]
 

Register the handler with the reactor.

Register the handler with the reactor. This method is used by the Wait_On_Reactor strategy. The transport must register its event handler with the ORB's Reactor.

Todo:
I think this method is pretty much useless, the connections are *always* registered with the Reactor, except in thread-per-connection mode. In that case putting the connection in the Reactor would produce unpredictable results anyway.

Definition at line 318 of file Transport.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT, event_handler_i(), TAO_Wait_Strategy::is_registered(), LM_DEBUG, ACE_Event_Handler::reactor(), TAO_ORB_Core::reactor(), ACE_Reactor::register_handler(), TAO_debug_level, and ws_.

Referenced by TAO_Wait_On_Reactor::register_handler(), TAO_Wait_On_Leader_Follower::register_handler(), and TAO_Wait_On_Leader_Follower::sending_request().

00319 {
00320   if (TAO_debug_level > 4)
00321     {
00322       ACE_DEBUG ((LM_DEBUG,
00323                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"),
00324                   this->id ()));
00325     }
00326 
00327   ACE_Reactor * const r = this->orb_core_->reactor ();
00328 
00329   // @@note: This should be okay since the register handler call will
00330   // not make a nested call into the transport.
00331   ACE_GUARD_RETURN (ACE_Lock,
00332                     ace_mon,
00333                     *this->handler_lock_,
00334                     false);
00335 
00336   if (r == this->event_handler_i ()->reactor ())
00337     {
00338       return 0;
00339     }
00340 
00341   // Set the flag in the Connection Handler and in the Wait Strategy
00342   // @@Maybe we should set these flags after registering with the
00343   // reactor. What if the  registration fails???
00344   this->ws_->is_registered (true);
00345 
00346   // Register the handler with the reactor
00347   return r->register_handler (this->event_handler_i (),
00348                               ACE_Event_Handler::READ_MASK);
00349 }

ACE_Event_Handler::Reference_Count TAO_Transport::remove_reference void   ) 
 

Initialising the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects.

Definition at line 2490 of file Transport.cpp.

References event_handler_i(), and ACE_Event_Handler::remove_reference().

Referenced by TAO_Connection_Handler::cancel_pending_connection(), TAO_Connection_Handler::close_handler(), TAO::Cache_IntId::operator=(), TAO::Transport_Cache_Manager::purge(), TAO_Asynch_Reply_Dispatcher_Base::transport(), TAO::Cache_IntId::~Cache_IntId(), TAO_Asynch_Reply_Dispatcher_Base::~TAO_Asynch_Reply_Dispatcher_Base(), TAO_Thread_Per_Connection_Handler::~TAO_Thread_Per_Connection_Handler(), and TransportCleanupGuard::~TransportCleanupGuard().

02491 {
02492   return this->event_handler_i ()->remove_reference ();
02493 }

void TAO_Transport::report_invalid_event_handler const char *  caller  )  [private]
 

Print out error messages if the event handler is not valid.

Definition at line 1197 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, ACE_TEXT_CHAR_TO_TCHAR, LM_DEBUG, and TAO_debug_level.

01198 {
01199   if (TAO_debug_level > 0)
01200     {
01201       ACE_DEBUG ((LM_DEBUG,
01202          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler")
01203          ACE_TEXT ("(%s) no longer associated with handler [tag=%d]\n"),
01204          this->id (), ACE_TEXT_CHAR_TO_TCHAR (caller), this->tag_));
01205     }
01206 }

ACE_INLINE void TAO_Transport::reset_flush_timer void   )  [private]
 

The flush timer expired or was explicitly cancelled, mark it as not pending

Definition at line 114 of file Transport.inl.

References current_deadline_, and flush_timer_id_.

Referenced by drain_queue_i(), and handle_timeout().

00115 {
00116   this->flush_timer_id_ = -1;
00117   this->current_deadline_ = ACE_Time_Value::zero;
00118 }

int TAO_Transport::schedule_output_i void   )  [private]
 

Schedule handle_output() callbacks.

Definition at line 754 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, event_handler_i(), ACE_Reactor::find_handler(), ACE_Event_Handler::get_handle(), LM_DEBUG, ACE_Event_Handler::reactor(), ACE_Event_Handler::remove_reference(), ACE_Reactor::schedule_wakeup(), and TAO_debug_level.

Referenced by TAO_Reactive_Flushing_Strategy::schedule_output(), and TAO_Leader_Follower_Flushing_Strategy::schedule_output().

00755 {
00756   ACE_Event_Handler * const eh = this->event_handler_i ();
00757   ACE_Reactor * const reactor = eh->reactor ();
00758 
00759   if (reactor == 0)
00760      return -1;
00761 
00762   // Check to see if our event handler is still registered with the
00763   // reactor.  It's possible for another thread to have run close_connection()
00764   // since we last used the event handler.
00765   ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ());
00766   if (found != eh)
00767     {
00768       if(TAO_debug_level > 3)
00769         {
00770           ACE_DEBUG ((LM_DEBUG,
00771                       "TAO (%P|%t) - Transport[%d]::schedule_output_i "
00772                       "event handler not found in reactor, returning -1\n",
00773                       this->id ()));
00774         }
00775       if (found)
00776         {
00777           found->remove_reference ();
00778         }
00779       return -1;
00780     }
00781   found->remove_reference ();
00782 
00783   if (TAO_debug_level > 3)
00784     {
00785       ACE_DEBUG ((LM_DEBUG,
00786          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"),
00787          this->id ()));
00788     }
00789 
00790   return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00791 }

virtual ssize_t TAO_Transport::send iovec *  iov,
int  iovcnt,
size_t &  bytes_transferred,
const ACE_Time_Value timeout = 0
[pure virtual]
 

Write the complete Message_Block chain to the connection.

This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.

Often the implementation simply forwards the arguments to the underlying ACE_Svc_Handler class. Using the code factored out into ACE.

Be careful with protocols that perform non-trivial transformations of the data, such as SSLIOP or protocols that compress the stream.

Parameters:
iov contains the data that must be sent.
timeout is the maximum time that the application is willing to wait for the data to be sent, useful in platforms that implement timed writes. The timeout value is obtained from the policies set by the application.
bytes_transferred should return the total number of bytes successfully transferred before the connection blocked. This is required because in some platforms and/or protocols multiple system calls may be required to send the chain of message blocks. The first few calls can work successfully, but the final one can fail or signal a flow control situation (via EAGAIN). In this case the ORB expects the function to return -1, errno to be appropriately set and this argument to return the number of bytes already on the OS I/O subsystem.
This call can also fail if the transport instance is no longer associated with a connection (e.g., the connection handler closed down). In that case, it returns -1 and sets errno to ENOENT.

Implemented in TAO_IIOP_Transport.

Referenced by drain_queue_helper().

int TAO_Transport::send_asynchronous_message_i TAO_Stub stub,
const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time
[private]
 

Send an asynchronous message, i.e. do not block until the message is on the wire

Definition at line 1269 of file Transport.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT, check_buffering_constraints_i(), ACE_Message_Block::cont(), ETIME, EWOULDBLOCK, TAO_Flushing_Strategy::flush_transport(), TAO_ORB_Core::flushing_strategy(), ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO::Transport_Queueing_Strategy::must_queue(), orb_core(), queue_message_i(), TAO_Flushing_Strategy::schedule_output(), send_message_block_chain_i(), ssize_t, TAO_debug_level, ACE_Message_Block::total_length(), and TAO_Stub::transport_queueing_strategy().

Referenced by send_message_shared_i().

01272 {
01273   // Let's figure out if the message should be queued without trying
01274   // to send first:
01275   bool try_sending_first = true;
01276 
01277   bool const queue_empty = (this->head_ == 0);
01278 
01279   TAO::Transport_Queueing_Strategy *queue_strategy =
01280     stub->transport_queueing_strategy ();
01281 
01282   if (!queue_empty)
01283     {
01284       try_sending_first = false;
01285     }
01286   else if (queue_strategy)
01287     {
01288       if (queue_strategy->must_queue (queue_empty))
01289         {
01290           try_sending_first = false;
01291         }
01292     }
01293 
01294   if (try_sending_first)
01295     {
01296       ssize_t n = 0;
01297       size_t byte_count = 0;
01298       // ... in this case we must try to send the message first ...
01299 
01300       size_t const total_length = message_block->total_length ();
01301 
01302       if (TAO_debug_level > 6)
01303         {
01304           ACE_DEBUG ((LM_DEBUG,
01305              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01306              ACE_TEXT ("trying to send the message (ml = %d)\n"),
01307              this->id (), total_length));
01308         }
01309 
01310       // @@ I don't think we want to hold the mutex here, however if
01311       // we release it we need to recheck the status of the transport
01312       // after we return... once I understand the final form for this
01313       // code I will re-visit this decision
01314       n = this->send_message_block_chain_i (message_block,
01315                                             byte_count,
01316                                             max_wait_time);
01317       if (n == -1)
01318         {
01319           // ... if this is just an EWOULDBLOCK we must schedule the
01320           // message for later, if it is ETIME we still have to send
01321           // the complete message, because cutting off the message at
01322           // this point will destroy the synchronization with the
01323           // server ...
01324           if (errno != EWOULDBLOCK && errno != ETIME)
01325             {
01326               if (TAO_debug_level > 0)
01327                 {
01328                   ACE_ERROR ((LM_ERROR,
01329                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01330                      ACE_TEXT ("fatal error in ")
01331                      ACE_TEXT ("send_message_block_chain_i - %m\n"),
01332                      this->id ()));
01333                 }
01334               return -1;
01335             }
01336         }
01337 
01338       // ... let's figure out if the complete message was sent ...
01339       if (total_length == byte_count)
01340         {
01341           // Done, just return.  Notice that there are no allocations
01342           // or copies up to this point (though some fancy calling
01343           // back and forth).
01344           // This is the common case for the critical path, it should
01345           // be fast.
01346           return 0;
01347         }
01348 
01349       // If it was partially sent, then we can't allow a timeout
01350       if (byte_count > 0)
01351         max_wait_time = 0;
01352 
01353       if (TAO_debug_level > 6)
01354         {
01355           ACE_DEBUG ((LM_DEBUG,
01356              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01357              ACE_TEXT ("partial send %d / %d bytes\n"),
01358              this->id (), byte_count, total_length));
01359         }
01360 
01361       // ... part of the data was sent, need to figure out what piece
01362       // of the message block chain must be queued ...
01363       while (message_block != 0 && message_block->length () == 0)
01364         {
01365           message_block = message_block->cont ();
01366         }
01367 
01368       // ... at least some portion of the message block chain should
01369       // remain ...
01370     }
01371 
01372   // ... either the message must be queued or we need to queue it
01373   // because it was not completely sent out ...
01374 
01375   if (TAO_debug_level > 6)
01376     {
01377       ACE_DEBUG ((LM_DEBUG,
01378          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01379          ACE_TEXT ("message is queued\n"),
01380          this->id ()));
01381     }
01382 
01383   if (this->queue_message_i (message_block, max_wait_time) == -1)
01384   {
01385     if (TAO_debug_level > 0)
01386       {
01387         ACE_DEBUG ((LM_DEBUG,
01388                     ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01389                     ACE_TEXT ("send_asynchronous_message_i, ")
01390                     ACE_TEXT ("cannot queue message for  - %m\n"),
01391                     this->id ()));
01392       }
01393     return -1;
01394   }
01395 
01396   // ... if the queue is full we need to activate the output on the
01397   // queue ...
01398   bool must_flush = false;
01399   const bool constraints_reached =
01400     this->check_buffering_constraints_i (stub,
01401                                          must_flush);
01402 
01403   // ... but we also want to activate it if the message was partially
01404   // sent.... Plus, when we use the blocking flushing strategy the
01405   // queue is flushed as a side-effect of 'schedule_output()'
01406 
01407   TAO_Flushing_Strategy *flushing_strategy =
01408     this->orb_core ()->flushing_strategy ();
01409 
01410   if (constraints_reached || try_sending_first)
01411     {
01412       int const result = flushing_strategy->schedule_output (this);
01413       if (result == TAO_Flushing_Strategy::MUST_FLUSH)
01414         {
01415           must_flush = true;
01416         }
01417     }
01418 
01419   if (must_flush)
01420     {
01421       typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
01422       TAO_REVERSE_LOCK reverse (*this->handler_lock_);
01423       ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
01424 
01425       (void) flushing_strategy->flush_transport (this);
01426     }
01427 
01428   return 0;
01429 }

void TAO_Transport::send_connection_closed_notifications void   ) 
 

Notify all the components inside a Transport when the underlying connection is closed.

Definition at line 1209 of file Transport.cpp.

References ACE_GUARD, TAO_Transport_Mux_Strategy::connection_closed(), send_connection_closed_notifications_i(), and tms().

Referenced by TAO_Connection_Handler::close_connection_eh().

01210 {
01211   {
01212     ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
01213 
01214     this->send_connection_closed_notifications_i ();
01215   }
01216 
01217   this->tms ()->connection_closed ();
01218 }

void TAO_Transport::send_connection_closed_notifications_i void   )  [private]
 

Assume the lock is held.

Definition at line 1221 of file Transport.cpp.

References cleanup_queue_i(), messaging_object(), and TAO_Pluggable_Messaging::reset().

Referenced by send_connection_closed_notifications().

01222 {
01223   this->cleanup_queue_i ();
01224 
01225   this->messaging_object ()->reset ();
01226 }

virtual int TAO_Transport::send_message TAO_OutputCDR stream,
TAO_Stub stub = 0,
int  message_semantics = TAO_Transport::TAO_TWOWAY_REQUEST,
ACE_Time_Value max_time_wait = 0
[pure virtual]
 

Once the ORB is prepared to receive a reply (see send_request() above), and all the arguments have been marshaled the CDR stream must be 'formatted', i.e. the message_size field in the GIOP header can finally be set to the proper value.

Implemented in TAO_IIOP_Transport.

Referenced by TAO_On_Demand_Fragmentation_Strategy::fragment(), TAO_GIOP_Message_Base::make_send_locate_reply(), TAO_GIOP_Message_Base::process_request(), and TAO_GIOP_Message_Base::send_reply_exception().

int TAO_Transport::send_message_block_chain const ACE_Message_Block message_block,
size_t &  bytes_transferred,
ACE_Time_Value max_wait_time = 0
 

Send a message block chain,.

Definition at line 509 of file Transport.cpp.

References ACE_GUARD_RETURN, and send_message_block_chain_i().

Referenced by TAO_GIOP_Message_Base::send_close_connection(), and TAO_GIOP_Message_Base::send_error().

00512 {
00513   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00514 
00515   return this->send_message_block_chain_i (mb,
00516                                            bytes_transferred,
00517                                            max_wait_time);
00518 }

int TAO_Transport::send_message_block_chain_i const ACE_Message_Block message_block,
size_t &  bytes_transferred,
ACE_Time_Value max_wait_time
 

Send a message block chain, assuming the lock is held.

Definition at line 521 of file Transport.cpp.

References drain_queue_i(), TAO_Synch_Queued_Message::message_length(), TAO_Queued_Message::push_back(), TAO_Queued_Message::remove_from_list(), and ACE_Message_Block::total_length().

Referenced by send_asynchronous_message_i(), and send_message_block_chain().

00524 {
00525   size_t const total_length = mb->total_length ();
00526 
00527   // We are going to block, so there is no need to clone
00528   // the message block.
00529   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00530 
00531   synch_message.push_back (this->head_, this->tail_);
00532 
00533   int const n = this->drain_queue_i ();
00534 
00535   if (n == -1)
00536     {
00537       synch_message.remove_from_list (this->head_, this->tail_);
00538       return -1; // Error while sending...
00539     }
00540   else if (n == 1)
00541     {
00542       bytes_transferred = total_length;
00543       return 1;  // Empty queue, message was sent..
00544     }
00545 
00546   // Remove the temporary message from the queue...
00547   synch_message.remove_from_list (this->head_, this->tail_);
00548 
00549   bytes_transferred = total_length - synch_message.message_length ();
00550 
00551   return 0;
00552 }

int TAO_Transport::send_message_shared TAO_Stub stub,
int  message_semantics,
const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time
[virtual]
 

Sent the contents of message_block.

Parameters:
stub The object reference used for this operation, useful to obtain the current policies.
message_semantics If this is set to TAO_TWO_REQUEST this method will block until the operation is completely written on the wire. If it is set to other values this operation could return.
message_block The CDR encapsulation of the GIOP message that must be sent. The message may consist of multiple Message Blocks chained through the cont() field.
max_wait_time The maximum time that the operation can block, used in the implementation of timeouts.

Reimplemented in TAO_IIOP_Transport.

Definition at line 280 of file Transport.cpp.

References ACE_GUARD_RETURN, close_connection(), and send_message_shared_i().

00284 {
00285   int result = 0;
00286 
00287   {
00288     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00289 
00290     result =
00291       this->send_message_shared_i (stub, message_semantics,
00292                                    message_block, max_wait_time);
00293   }
00294 
00295   if (result == -1)
00296     {
00297       this->close_connection ();
00298     }
00299 
00300   return result;
00301 }

int TAO_Transport::send_message_shared_i TAO_Stub stub,
int  message_semantics,
const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time
[protected]
 

Implement send_message_shared() assuming the handler_lock_ is held.

Definition at line 1229 of file Transport.cpp.

References ACE_Message_Block::length(), TAO::Transport::Stats::messages_sent(), send_asynchronous_message_i(), send_reply_message_i(), send_synchronous_message_i(), stats_, TAO_ONEWAY_REQUEST, TAO_REPLY, and TAO_TWOWAY_REQUEST.

Referenced by send_message_shared(), and TAO_IIOP_Transport::send_message_shared().

01233 {
01234   int ret = 0;
01235 
01236 #if TAO_HAS_TRANSPORT_CURRENT == 1
01237   size_t const message_length = message_block->length ();
01238 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
01239 
01240   switch (message_semantics)
01241     {
01242       case TAO_Transport::TAO_TWOWAY_REQUEST:
01243         ret = this->send_synchronous_message_i (message_block,
01244                                                 max_wait_time);
01245         break;
01246 
01247       case TAO_Transport::TAO_REPLY:
01248         ret = this->send_reply_message_i (message_block,
01249                                           max_wait_time);
01250         break;
01251 
01252       case TAO_Transport::TAO_ONEWAY_REQUEST:
01253         ret = this->send_asynchronous_message_i (stub,
01254                                                  message_block,
01255                                                  max_wait_time);
01256         break;
01257     }
01258 
01259 #if TAO_HAS_TRANSPORT_CURRENT == 1
01260   // "Count" the message, only if no error was encountered.
01261   if (ret != -1 && this->stats_ != 0)
01262     this->stats_->messages_sent (message_length);
01263 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
01264 
01265   return ret;
01266 }

int TAO_Transport::send_reply_message_i const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time
[private]
 

Send a reply message, i.e. do not block until the message is on the wire, but just return after adding them to the queue.

Definition at line 657 of file Transport.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT, TAO_Synch_Queued_Message::clone(), TAO_Queued_Message::destroy(), TAO_Flushing_Strategy::flush_message(), TAO_ORB_Core::flushing_strategy(), LM_DEBUG, orb_core(), TAO_Queued_Message::push_back(), TAO_Queued_Message::remove_from_list(), TAO_Flushing_Strategy::schedule_output(), send_synch_message_helper_i(), and TAO_debug_level.

Referenced by send_message_shared_i().

00659 {
00660   // Dont clone now.. We could be sent in one shot!
00661   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00662 
00663   synch_message.push_back (this->head_,
00664                            this->tail_);
00665 
00666   int const n =
00667     this->send_synch_message_helper_i (synch_message,
00668                                        max_wait_time);
00669 
00670   if (n == -1 || n == 1)
00671     {
00672       return n;
00673     }
00674 
00675   if (TAO_debug_level > 3)
00676     {
00677       ACE_DEBUG ((LM_DEBUG,
00678          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ")
00679          ACE_TEXT ("preparing to add to queue before leaving\n"),
00680          this->id ()));
00681     }
00682 
00683   // Till this point we shouldn't have any copying and that is the
00684   // point anyway. Now, remove the node from the list
00685   synch_message.remove_from_list (this->head_, this->tail_);
00686 
00687   // Clone the node that we have.
00688   TAO_Queued_Message *msg =
00689     synch_message.clone (this->orb_core_->transport_message_buffer_allocator ());
00690 
00691   // Stick it in the queue
00692   msg->push_back (this->head_, this->tail_);
00693 
00694   TAO_Flushing_Strategy *flushing_strategy =
00695     this->orb_core ()->flushing_strategy ();
00696 
00697   int const result = flushing_strategy->schedule_output (this);
00698 
00699   if (result == -1)
00700     {
00701       if (TAO_debug_level > 5)
00702         {
00703           ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_"
00704                       "message_i, dequeuing msg due to schedule_output "
00705                       "failure\n", this->id ()));
00706         }
00707       msg->remove_from_list (this->head_, this->tail_);
00708       msg->destroy ();
00709     }
00710   else if (result == TAO_Flushing_Strategy::MUST_FLUSH)
00711     {
00712       typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00713       TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00714       ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00715       (void) flushing_strategy->flush_message(this, msg, 0);
00716     }
00717 
00718   return 1;
00719 }

virtual int TAO_Transport::send_request TAO_Stub stub,
TAO_ORB_Core orb_core,
TAO_OutputCDR stream,
int  message_semantics,
ACE_Time_Value max_time_wait
[pure virtual]
 

Preparing the ORB to receive the reply only once the request is completely sent opens the system to some subtle race conditions: suppose the ORB is running in a multi-threaded configuration, thread A makes a request while thread B is using the Reactor to process all incoming requests. Thread A could be implemented as follows: 1) send the request 2) setup the ORB to receive the reply 3) wait for the request

but in this case thread B may receive the reply between step (1) and (2), and drop it as an invalid or unexpected message. Consequently the correct implementation is: 1) setup the ORB to receive the reply 2) send the request 3) wait for the reply

The following method encapsulates this idiom.

Todo:
This is generic code, it should be factored out into the Transport class.

Implemented in TAO_IIOP_Transport.

Referenced by TAO::Remote_Invocation::send_message().

int TAO_Transport::send_synch_message_helper_i TAO_Synch_Queued_Message s,
ACE_Time_Value max_wait_time
[private]
 

A helper method used by send_synchronous_message_i() and send_reply_message_i(). Reusable code that could be used by both the methods.

Definition at line 722 of file Transport.cpp.

References TAO_Synch_Queued_Message::all_data_sent(), drain_queue_i(), and TAO_Queued_Message::remove_from_list().

Referenced by send_reply_message_i(), and send_synchronous_message_i().

00724 {
00725   // @todo: Need to send timeouts for writing..
00726   int const n = this->drain_queue_i ();
00727 
00728   if (n == -1)
00729     {
00730       synch_message.remove_from_list (this->head_, this->tail_);
00731       return -1; // Error while sending...
00732     }
00733   else if (n == 1)
00734     {
00735       return 1;  // Empty queue, message was sent..
00736     }
00737 
00738   if (synch_message.all_data_sent ())
00739     {
00740       return 1;
00741     }
00742 
00743   return 0;
00744 }

int TAO_Transport::send_synchronous_message_i const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time
[private]
 

Send a synchronous message, i.e. block until the message is on the wire

Definition at line 555 of file Transport.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, ACE_NEW_RETURN, ACE_TEXT, TAO_Synch_Queued_Message::current_block(), ETIME, TAO_Flushing_Strategy::flush_message(), TAO_ORB_Core::flushing_strategy(), LM_ERROR, TAO_Synch_Queued_Message::message_length(), orb_core(), TAO_Queued_Message::push_back(), TAO_Queued_Message::push_front(), TAO_Queued_Message::remove_from_list(), TAO_Flushing_Strategy::schedule_output(), send_synch_message_helper_i(), and TAO_debug_level.

Referenced by send_message_shared_i().

00557 {
00558   // We are going to block, so there is no need to clone
00559   // the message block.
00560   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00561   size_t const message_length = synch_message.message_length ();
00562 
00563   synch_message.push_back (this->head_, this->tail_);
00564 
00565   int const n = this->send_synch_message_helper_i (synch_message,
00566                                                    0 /*ignored*/);
00567   if (n == -1 || n == 1)
00568     {
00569       return n;
00570     }
00571 
00572   // @todo: Check for timeouts!
00573   // if (max_wait_time != 0 && errno == ETIME) return -1;
00574   TAO_Flushing_Strategy *flushing_strategy =
00575     this->orb_core ()->flushing_strategy ();
00576   int result = flushing_strategy->schedule_output (this);
00577   if (result == -1)
00578     {
00579       synch_message.remove_from_list (this->head_, this->tail_);
00580       if (TAO_debug_level > 0)
00581         {
00582           ACE_ERROR ((LM_ERROR,
00583                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
00584                       ACE_TEXT ("send_synchronous_message_i, ")
00585                       ACE_TEXT ("error while scheduling flush - %m\n"),
00586                       this->id ()));
00587         }
00588       return -1;
00589     }
00590 
00591   // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH,
00592   // because we're always going to flush anyway.
00593 
00594   // Release the mutex, other threads may modify the queue as we
00595   // block for a long time writing out data.
00596   {
00597     typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00598     TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00599     ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00600 
00601     result = flushing_strategy->flush_message (this,
00602                                                &synch_message,
00603                                                max_wait_time);
00604   }
00605 
00606   if (result == -1)
00607     {
00608       synch_message.remove_from_list (this->head_, this->tail_);
00609 
00610       if (errno == ETIME)
00611         {
00612           // If partially sent, then we must queue the remainder.
00613           if (message_length != synch_message.message_length ())
00614             {
00615               // This is a timeout, there is only one nasty case: the
00616               // message has been partially sent!  We simply cannot take
00617               // the message out of the queue, because that would corrupt
00618               // the connection.
00619               //
00620               // What we do is replace the queued message with an
00621               // asynchronous message, that contains only what remains of
00622               // the timed out request.  If you think about sending
00623               // CancelRequests in this case: there is no much point in
00624               // doing that: the receiving ORB would probably ignore it,
00625               // and figuring out the request ID would be a bit of a
00626               // nightmare.
00627               //
00628               TAO_Queued_Message *queued_message = 0;
00629               ACE_NEW_RETURN (queued_message,
00630                               TAO_Asynch_Queued_Message (
00631                                   synch_message.current_block (),
00632                                   this->orb_core_,
00633                                   0, // no timeout
00634                                   0,
00635                                   true),
00636                               -1);
00637               queued_message->push_front (this->head_, this->tail_);
00638             }
00639         }
00640 
00641       if (TAO_debug_level > 0)
00642         {
00643           ACE_ERROR ((LM_ERROR,
00644              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ")
00645              ACE_TEXT ("error while flushing message - %m\n"),
00646              this->id ()));
00647         }
00648 
00649       return -1;
00650     }
00651 
00652   return 1;
00653 }

ACE_INLINE size_t TAO_Transport::sent_byte_count void   )  const
 

Accessor to sent_byte_count_.

Definition at line 187 of file Transport.inl.

References sent_byte_count_.

00188 {
00189   return this->sent_byte_count_;
00190 }

TAO::Transport::Stats* TAO_Transport::stats void   )  const
 

Transport statistics.

TAO_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE CORBA::ULong TAO_Transport::tag void   )  const
 

Return the protocol tag.

The OMG assigns unique tags (a 32-bit unsigned number) to each protocol. New protocol tags can be obtained free of charge from the OMG, check the documents in corbafwd.h for more details.

Definition at line 8 of file Transport.inl.

00009 {
00010   return this->tag_;
00011 }

int TAO_Transport::tear_listen_point_list TAO_InputCDR cdr  )  [virtual]
 

Extracts the list of listen points from the cdr stream. The list would have the protocol specific details of the ListenPoints

Reimplemented in TAO_IIOP_Transport.

Definition at line 274 of file Transport.cpp.

References ACE_NOTSUP_RETURN.

Referenced by TAO_GIOP_Message_Generator_Parser_12::process_bidir_context().

00275 {
00276   ACE_NOTSUP_RETURN (-1);
00277 }

ACE_INLINE TAO_Transport_Mux_Strategy * TAO_Transport::tms void   )  const
 

Get the TAO_Tranport_Mux_Strategy used by this object.

The role of the TAO_Transport_Mux_Strategy is described in more detail in that class' documentation. Enough is to say that the class is used to control how many threads can have pending requests over the same connection. Multiplexing multiple threads over the same connection conserves resources and is almost required for AMI, but having only one pending request per connection is more efficient and reduces the possibilities of priority inversions.

Definition at line 20 of file Transport.inl.

Referenced by idle_after_reply(), idle_after_send(), TAO::LocateRequest_Invocation_Adapter::invoke(), TAO::LocateRequest_Invocation::invoke(), TAO::Invocation_Adapter::invoke_remote_i(), TAO_GIOP_Message_Base::process_reply_message(), TAO::Synch_Twoway_Invocation::remote_twoway(), and send_connection_closed_notifications().

00021 {
00022   return tms_;
00023 }

TAO::Transport_Cache_Manager & TAO_Transport::transport_cache_manager void   )  [private]
 

Helper method that returns the Transport Cache Manager.

Definition at line 2448 of file Transport.cpp.

References TAO_ORB_Core::lane_resources(), and TAO_Thread_Lane_Resources::transport_cache().

Referenced by make_idle(), purge_entry(), recache_transport(), and update_transport().

02449 {
02450   return this->orb_core_->lane_resources ().transport_cache ();
02451 }

int TAO_Transport::update_transport void   ) 
 

Cache management.

Definition at line 460 of file Transport.cpp.

References transport_cache_manager(), and TAO::Transport_Cache_Manager::update_entry().

Referenced by TAO_Connection_Handler::handle_input_internal(), TAO_Connection_Handler::handle_output_eh(), and TAO_Connection_Handler::svc_i().

00461 {
00462   return this->transport_cache_manager ().update_entry (this->cache_map_entry_);
00463 }

ACE_INLINE TAO_Wait_Strategy * TAO_Transport::wait_strategy void   )  const
 

Return the TAO_Wait_Strategy used by this object.

The role of the TAO_Wait_Strategy is described in more detail in that class' documentation. Enough is to say that the ORB can wait for a reply blocking on read(), using the Reactor to wait for multiple events concurrently or using the Leader/Followers protocol.

Definition at line 27 of file Transport.inl.

References ws_.

Referenced by TAO_Connection_Handler::close_connection_eh(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Transport_Cache_Manager::find_transport(), TAO_Connection_Handler::handle_input_eh(), TAO_IIOP_Connection_Handler::open(), post_open(), and TAO::Synch_Twoway_Invocation::wait_for_reply().

00028 {
00029   return this->ws_;
00030 }

ACE_INLINE void TAO_Transport::wchar_translator TAO_Codeset_Translator_Base  ) 
 

CodeSet negotiation - Set the wchar codeset translator factory.

Definition at line 144 of file Transport.inl.

References tcs_set_.

00145 {
00146   this->wchar_translator_ = tf;
00147   this->tcs_set_ = 1;
00148 }

ACE_INLINE TAO_Codeset_Translator_Base * TAO_Transport::wchar_translator void   )  const
 

CodeSet Negotiation - Get the wchar codeset translator factory.

Definition at line 131 of file Transport.inl.

00132 {
00133   return this->wchar_translator_;
00134 }


Friends And Related Function Documentation

friend class TAO_Leader_Follower_Flushing_Strategy [friend]
 

Definition at line 808 of file Transport.h.

friend class TAO_Reactive_Flushing_Strategy [friend]
 

These classes need privileged access to:

Definition at line 807 of file Transport.h.

friend class TAO_Thread_Per_Connection_Handler [friend]
 

Needs priveleged access to event_handler_i ()

Definition at line 812 of file Transport.h.


Member Data Documentation

int TAO_Transport::bidirectional_flag_ [protected]
 

Have we sent any info on bidirectional information or have we received any info regarding making the connection served by this transport bidirectional. The flag is used as follows: + We dont want to send the bidirectional context info more than once on the connection. Why? Waste of marshalling and demarshalling time on the client. + On the server side -- once a client that has established the connection asks the server to use the connection both ways, we *dont* want the server to pack service info to the client. That is not allowed. We need a flag to prevent such a things from happening.

The value of this flag will be 0 if the client sends info and 1 if the server receives the info.

Definition at line 963 of file Transport.h.

Referenced by bidirectional_flag().

TAO::Transport_Cache_Manager::HASH_MAP_ENTRY* TAO_Transport::cache_map_entry_ [protected]
 

Our entry in the cache. We don't own this. It is here for our convenience. We cannot just change things around.

Definition at line 935 of file Transport.h.

Referenced by cache_map_entry().

TAO_Codeset_Translator_Base* TAO_Transport::char_translator_ [private]
 

Additional member values required to support codeset translation.

@Phil, I think it would be nice if we could think of a way to do the following. We have been trying to use the transport for marking about translator factories and such! IMHO this is a wrong encapulation ie. trying to populate the transport object with these details. We should probably have a class something like TAO_Message_Property or TAO_Message_Translator or whatever (I am sure you get the idea) and encapsulate all these details. Coupling these seems odd. if I have to be more cynical we can move this to the connection_handler and it may more sense with the DSCP stuff around there. Do you agree?

Definition at line 1038 of file Transport.h.

ACE_Time_Value TAO_Transport::current_deadline_ [protected]
 

The queue will start draining no later than if* the deadline is

Definition at line 980 of file Transport.h.

Referenced by check_buffering_constraints_i(), handle_timeout(), and reset_flush_timer().

CORBA::Boolean TAO_Transport::first_request_ [private]
 

First_request_ is true until the first request is sent or received. This is necessary since codeset context information is necessary only on the first request. After that, the translators are fixed for the life of the connection.

Definition at line 1050 of file Transport.h.

Referenced by first_request_sent(), and generate_request_header().

long TAO_Transport::flush_timer_id_ [protected]
 

The timer ID.

Definition at line 983 of file Transport.h.

Referenced by check_buffering_constraints_i(), flush_timer_pending(), and reset_flush_timer().

ACE_Lock* TAO_Transport::handler_lock_ [mutable, protected]
 

This is an ACE_Lock that gets initialized from TAO_ORB_Core::resource_factory()->create_cached_connection_lock(). This way, one can use a lock appropriate for the type of system, i.e., a null lock for single-threaded systems, and a real lock for multi-threaded systems.

Definition at line 997 of file Transport.h.

Referenced by ~TAO_Transport().

TAO_Queued_Message* TAO_Transport::head_ [protected]
 

Implement the outgoing data queue.

Definition at line 968 of file Transport.h.

size_t TAO_Transport::id_ [protected]
 

A unique identifier for the transport.

This never *never* changes over the lifespan, so we don't have to worry about locking it.

HINT: Protocol-specific transports that use connection handler might choose to set this to the handle for their connection.

Definition at line 1007 of file Transport.h.

TAO_Incoming_Message_Queue TAO_Transport::incoming_message_queue_ [protected]
 

Queue of the consolidated, incoming messages..

Definition at line 972 of file Transport.h.

Referenced by consolidate_enqueue_message(), handle_input_parse_data(), and process_queue_head().

TAO::Incoming_Message_Stack TAO_Transport::incoming_message_stack_ [protected]
 

Stack of incoming fragments, consolidated messages are going to be enqueued in "incoming_message_queue_"

Definition at line 976 of file Transport.h.

Referenced by handle_input(), handle_input_missing_data(), handle_input_parse_data(), and handle_input_parse_extra_messages().

bool TAO_Transport::is_connected_ [protected]
 

Is this transport really connected or not. In case of oneways with SYNC_NONE Policy we don't wait until the connection is ready and we buffer the requests in this transport until the connection is ready

Definition at line 1021 of file Transport.h.

TAO::Connection_Role TAO_Transport::opening_connection_role_ [protected]
 

Definition at line 965 of file Transport.h.

Referenced by opened_as(), and provide_blockable_handler().

TAO_ORB_Core* const TAO_Transport::orb_core_ [protected]
 

Global orbcore resource.

Definition at line 931 of file Transport.h.

Referenced by TAO_GIOP_Message_Base::process_locate_request(), and TAO_GIOP_Message_Base::process_request().

ACE_Message_Block* TAO_Transport::partial_message_ [private]
 

Holds the partial GIOP message (if there is one).

Definition at line 1053 of file Transport.h.

Referenced by allocate_partial_message_block(), and handle_input_parse_data().

unsigned long TAO_Transport::purging_order_ [protected]
 

Used by the LRU, LFU and FIFO Connection Purging Strategies.

Definition at line 1010 of file Transport.h.

Referenced by purging_order().

size_t TAO_Transport::recv_buffer_size_ [protected]
 

Size of the buffer received.

Definition at line 1013 of file Transport.h.

size_t TAO_Transport::sent_byte_count_ [protected]
 

Number of bytes sent.

Definition at line 1016 of file Transport.h.

Referenced by drain_queue_helper(), drain_queue_i(), and sent_byte_count().

TAO::Transport::Stats* TAO_Transport::stats_ [private]
 

Statistics.

Definition at line 1066 of file Transport.h.

Referenced by process_parsed_messages(), send_message_shared_i(), and ~TAO_Transport().

CORBA::ULong const TAO_Transport::tag_ [protected]
 

IOP protocol tag.

Definition at line 928 of file Transport.h.

TAO_Queued_Message* TAO_Transport::tail_ [protected]
 

Definition at line 969 of file Transport.h.

CORBA::Boolean TAO_Transport::tcs_set_ [private]
 

The tcs_set_ flag indicates that negotiation has occured and so the translators are correct, since a null translator is valid if both ends are using the same codeset, whatever that codeset might be.

Definition at line 1044 of file Transport.h.

Referenced by char_translator(), is_tcs_set(), and wchar_translator().

TAO_Transport_Mux_Strategy* TAO_Transport::tms_ [protected]
 

Strategy to decide whether multiple requests can be sent over the same connection or the connection is exclusive for a request.

Definition at line 939 of file Transport.h.

TAO_Transport_Timer TAO_Transport::transport_timer_ [protected]
 

The adapter used to receive timeout callbacks from the Reactor.

Definition at line 986 of file Transport.h.

TAO_Codeset_Translator_Base* TAO_Transport::wchar_translator_ [private]
 

Definition at line 1039 of file Transport.h.

TAO_Wait_Strategy* TAO_Transport::ws_ [protected]
 

Strategy for waiting for the reply after sending the request.

Definition at line 942 of file Transport.h.

Referenced by notify_reactor(), provide_blockable_handler(), register_handler(), TAO_Transport(), wait_strategy(), and ~TAO_Transport().


The documentation for this class was generated from the following files:
Generated on Sun Jan 27 13:16:45 2008 for TAO by doxygen 1.3.6