TAO_Transport Class Reference

Generic definitions for the Transport class. More...

#include <Transport.h>

Inheritance diagram for TAO_Transport:

Inheritance graph
[legend]
Collaboration diagram for TAO_Transport:

Collaboration graph
[legend]
List of all members.

Template methods

The Transport class uses the Template Method Pattern to implement the protocol specific functionality. Implementors of a pluggable protocol should override the following methods with the semantics documented below.

enum  TAO_Message_Semantics { TAO_ONEWAY_REQUEST = 0, TAO_TWOWAY_REQUEST = 1, TAO_REPLY }
virtual ACE_Event_Handlerevent_handler_i (void)=0
bool is_connected (void) const
 Is this transport really connected.
bool post_open (size_t id)
 Perform all the actions when this transport get opened.
void pre_close (void)
 do what needs to be done when closing the transport
TAO_Connection_Handlerconnection_handler (void)
 Get the connection handler for this transport.
TAO_OutputCDRout_stream (void)
 Accessor for the output CDR stream.
int generate_locate_request (TAO_Target_Specification &spec, TAO_Operation_Details &opdetails, TAO_OutputCDR &output)
virtual int generate_request_header (TAO_Operation_Details &opd, TAO_Target_Specification &spec, TAO_OutputCDR &msg)
int recache_transport (TAO_Transport_Descriptor_Interface *desc)
 Recache ourselves in the cache.
virtual int handle_input (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time=0)
 Callback to read incoming data.
virtual int send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, TAO_Message_Semantics message_semantics, ACE_Time_Value *max_time_wait)=0
virtual int send_message (TAO_OutputCDR &stream, TAO_Stub *stub=0, TAO_Message_Semantics message_semantics=TAO_Transport::TAO_TWOWAY_REQUEST, ACE_Time_Value *max_time_wait=0)=0
virtual int send_message_shared (TAO_Stub *stub, TAO_Message_Semantics message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
 Sent the contents of message_block.
int format_queue_message (TAO_OutputCDR &stream, ACE_Time_Value *max_wait_time)
int send_message_block_chain (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time=0)
 Send a message block chain,.
int send_message_block_chain_i (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time)
 Send a message block chain, assuming the lock is held.
int purge_entry (void)
 Cache management.
int make_idle (void)
 Cache management.
int update_transport (void)
 Cache management.
int handle_timeout (const ACE_Time_Value &current_time, const void *act)
size_t recv_buffer_size (void) const
 Accessor to recv_buffer_size_.
size_t sent_byte_count (void) const
 Accessor to sent_byte_count_.
TAO_Codeset_Translator_Basechar_translator (void) const
 CodeSet Negotiation - Get the char codeset translator factory.
TAO_Codeset_Translator_Basewchar_translator (void) const
 CodeSet Negotiation - Get the wchar codeset translator factory.
void char_translator (TAO_Codeset_Translator_Base *)
 CodeSet negotiation - Set the char codeset translator factory.
void wchar_translator (TAO_Codeset_Translator_Base *)
 CodeSet negotiation - Set the wchar codeset translator factory.
void assign_translators (TAO_InputCDR *, TAO_OutputCDR *)
void clear_translators (TAO_InputCDR *, TAO_OutputCDR *)
CORBA::Boolean is_tcs_set () const
 Return true if the tcs has been set.
void first_request_sent ()
 Set the state of the first_request_ flag to 0.
void send_connection_closed_notifications (void)
TAO::Transport::Statsstats (void) const
 Transport statistics.
virtual TAO_Connection_Handlerconnection_handler_i (void)=0
int process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh)
int send_message_shared_i (TAO_Stub *stub, TAO_Message_Semantics message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
int queue_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time, bool back=true)
CORBA::ULong const tag_
 IOP protocol tag.
TAO_ORB_Core *const orb_core_
 Global orbcore resource.
TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * cache_map_entry_
TAO_Transport_Mux_Strategytms_
TAO_Wait_Strategyws_
 Strategy for waiting for the reply after sending the request.
int bidirectional_flag_
TAO::Connection_Role opening_connection_role_
TAO_Queued_Messagehead_
 Implement the outgoing data queue.
TAO_Queued_Messagetail_
TAO_Incoming_Message_Queue incoming_message_queue_
 Queue of the consolidated, incoming messages..
TAO::Incoming_Message_Stack incoming_message_stack_
ACE_Time_Value current_deadline_
long flush_timer_id_
 The timer ID.
TAO_Transport_Timer transport_timer_
 The adapter used to receive timeout callbacks from the Reactor.
ACE_Lockhandler_lock_
size_t id_
 A unique identifier for the transport.
unsigned long purging_order_
 Used by the LRU, LFU and FIFO Connection Purging Strategies.
size_t recv_buffer_size_
 Size of the buffer received.
size_t sent_byte_count_
 Number of bytes sent.
bool is_connected_
TAO_GIOP_Message_Basemessaging_object_
 Our messaging object.
TAO::Transport_Cache_Manager & transport_cache_manager (void)
 Helper method that returns the Transport Cache Manager.
int drain_queue (ACE_Time_Value *max_wait_time)
 Send some of the data in the queue.
int drain_queue_i (ACE_Time_Value *max_wait_time)
 Implement drain_queue() assuming the lock is held.
bool queue_is_empty_i (void)
 Check if there are messages pending in the queue.
int drain_queue_helper (int &iovcnt, iovec iov[], ACE_Time_Value *max_wait_time)
 A helper routine used in drain_queue_i().
int schedule_output_i (void)
 Schedule handle_output() callbacks.
int cancel_output_i (void)
 Cancel handle_output() callbacks.
void cleanup_queue (size_t byte_count)
 Cleanup the queue.
void cleanup_queue_i ()
 Cleanup the complete queue.
int check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush)
 Check if the buffering constraints have been reached.
int send_synchronous_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
int send_reply_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
int send_asynchronous_message_i (TAO_Stub *stub, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
int send_synch_message_helper_i (TAO_Synch_Queued_Message &s, ACE_Time_Value *max_wait_time)
int flush_timer_pending (void) const
 Check if the flush timer is still pending.
void reset_flush_timer (void)
void report_invalid_event_handler (const char *caller)
 Print out error messages if the event handler is not valid.
int handle_input_missing_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time, TAO_Queued_Data *q_data)
int handle_input_parse_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time)
int handle_input_parse_extra_messages (ACE_Message_Block &message_block)
int consolidate_enqueue_message (TAO_Queued_Data *qd)
int consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh)
int process_queue_head (TAO_Resume_Handle &rh)
int notify_reactor (void)
void send_connection_closed_notifications_i (void)
 Assume the lock is held.
void allocate_partial_message_block (void)
 TAO_Transport (const TAO_Transport &)
void operator= (const TAO_Transport &)
TAO_Codeset_Translator_Basechar_translator_
 Additional member values required to support codeset translation.
TAO_Codeset_Translator_Basewchar_translator_
CORBA::Boolean tcs_set_
CORBA::Boolean first_request_
ACE_Message_Blockpartial_message_
 Holds the partial GIOP message (if there is one).
TAO::Transport::Statsstats_
 Statistics.
class TAO_Reactive_Flushing_Strategy
class TAO_Leader_Follower_Flushing_Strategy
class TAO_Thread_Per_Connection_Handler

Public Member Functions

 TAO_Transport (CORBA::ULong tag, TAO_ORB_Core *orb_core, size_t input_cdr_size=ACE_CDR::DEFAULT_BUFSIZE)
 Default creator, requires the tag value be supplied.
virtual ~TAO_Transport (void)
 Destructor.
CORBA::ULong tag (void) const
 Return the protocol tag.
TAO_ORB_Coreorb_core (void) const
 Access the ORB that owns this connection.
TAO_Transport_Mux_Strategytms (void) const
 Get the TAO_Tranport_Mux_Strategy used by this object.
TAO_Wait_Strategywait_strategy (void) const
 Return the TAO_Wait_Strategy used by this object.
int handle_output (ACE_Time_Value *max_wait_time)
 Callback method to reactively drain the outgoing data queue.
int bidirectional_flag (void) const
 Get the bidirectional flag.
void bidirectional_flag (int flag)
 Set the bidirectional flag.
void cache_map_entry (TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *entry)
 Set the Cache Map entry.
TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * cache_map_entry (void)
 Get the Cache Map entry.
size_t id (void) const
 Set and Get the identifier for this transport instance.
void id (size_t id)
TAO::Connection_Role opened_as (void) const
void opened_as (TAO::Connection_Role)
unsigned long purging_order (void) const
void purging_order (unsigned long value)
bool queue_is_empty (void)
 Check if there are messages pending in the queue.
void provide_handler (TAO::Connection_Handler_Set &handlers)
 Added event handler to the handlers set.
bool provide_blockable_handler (TAO::Connection_Handler_Set &handlers)
virtual int register_handler (void)
 Register the handler with the reactor.
virtual ssize_t send (iovec *iov, int iovcnt, size_t &bytes_transferred, const ACE_Time_Value *timeout=0)=0
 Write the complete Message_Block chain to the connection.
virtual ssize_t recv (char *buffer, size_t len, const ACE_Time_Value *timeout=0)=0
 Read len bytes from into buf.
Control connection lifecycle
These methods are routed through the TMS object. The TMS strategies implement them correctly.

bool idle_after_send (void)
bool idle_after_reply (void)
virtual void close_connection (void)
 Call the implementation method after obtaining the lock.
Template methods
The Transport class uses the Template Method Pattern to implement the protocol specific functionality. Implementors of a pluggable protocol should override the following methods with the semantics documented below.

void messaging_init (TAO_GIOP_Message_Version const &version)
virtual int tear_listen_point_list (TAO_InputCDR &cdr)
virtual bool post_connect_hook (void)
 Hooks that can be overridden in concrete transports.
ACE_Event_Handler::Reference_Count add_reference (void)
 Memory management routines.
ACE_Event_Handler::Reference_Count remove_reference (void)
TAO_GIOP_Message_Basemessaging_object (void)

Detailed Description

Generic definitions for the Transport class.

The transport object is created in the Service handler constructor and deleted in the Service Handler's destructor!!

The main responsability of a Transport object is to encapsulate a connection, and provide a transport independent way to send and receive data. Since TAO is heavily based on the Reactor for all if not all its I/O the Transport class is usually implemented with a helper Connection Handler that adapts the generic Transport interface to the Reactor types.

The outgoing data path:

One of the responsibilities of the TAO_Transport class is to send out GIOP messages as efficiently as possible. In most cases messages are put out in FIFO order, the transport object will put out the message using a single system call and return control to the application. However, for oneways and AMI requests it may be more efficient (or required if the SYNC_NONE policy is in effect) to queue the messages until a large enough data set is available. Another reason to queue is that some applications cannot block for I/O, yet they want to send messages so large that a single write() operation would not be able to cope with them. In such cases we need to queue the data and use the Reactor to drain the queue.

Therefore, the Transport class may need to use a queue to temporarily hold the messages, and, in some configurations, it may need to use the Reactor to concurrently drain such queues.

Out of order messages:

TAO provides explicit policies to send 'urgent' messages. Such messages may put at the head of the queue. However, they cannot be sent immediately because the transport may already be sending another message in a reactive fashion.

Consequently, the Transport must also know if the head of the queue has been partially sent. In that case new messages can only follow the head. Only once the head is completely sent we can start sending new messages.

Waiting threads:

One or more threads can be blocked waiting for the connection to completely send the message. The thread should return as soon as its message has been sent, so a per-thread condition is required. This suggest that simply using a ACE_Message_Queue would not be enough: there is a significant amount of ancillary information, to keep on each message that the Message_Block class does not provide room for.

Blocking I/O is still attractive for some applications. First, my eliminating the Reactor overhead performance is improved when sending large blocks of data. Second, using the Reactor to send out data opens the door for nested upcalls, yet some applications cannot deal with the reentrancy issues in this case.

Timeouts:

Some or all messages could have a timeout period attached to them. The timeout source could either be some high-level policy or maybe some strategy to prevent denial of service attacks. In any case the timeouts are per-message, and later messages could have shorter timeouts. In fact, some kind of scheduling (such as EDF) could be required in a few applications.

Conclusions:

The outgoing data path consist in several components:

The Transport object provides a single method to send request messages (send_request_message ()).

The incoming data path:

One of the main responsibilities of the transport is to read and process the incoming GIOP message as quickly and efficiently as possible. There are other forces that needs to be given due consideration. They are

Parsing messages (GIOP) & processing the message:

The messages should be checked for validity and the right information should be sent to the higher layer for processing. The process of doing a sanity check and preparing the messages for the higher layers of the ORB are done by the messaging protocol.

Design forces and Challenges

To keep things as efficient as possible for medium sized requests, it would be good to minimise data copying and locking along the incoming path ie. from the time of reading the data from the handle to the application. We achieve this by creating a buffer on stack and reading the data from the handle into the buffer. We then pass the same data block (the buffer is encapsulated into a data block) to the higher layers of the ORB. The problems stem from the following (a) Data is bigger than the buffer that we have on stack (b) Transports like TCP do not guarantee availability of the whole chunk of data in one shot. Data could trickle in byte by byte. (c) Single read gives multiple messages

We solve the problems as follows

(a) First do a read with the buffer on stack. Query the underlying messaging object whether the message has any incomplete portion. If so, data will be copied into new buffer being able to hold full message and is queued; succeeding events will read data from socket and write directly into this buffer. Otherwise, if if the message in local buffer is complete, we free the handle and then send the message to the higher layers of the ORB for processing.

(b) If buffer with incomplete message has been enqueued, while trying to do the above, the reactor will call us back when the handle becomes read ready. The read-operation will copy data directly into the enqueued buffer. If the message has bee read completely the message is sent to the higher layers of the ORB for processing.

(c) If we get multiple messages (possible if the client connected to the server sends oneways or AMI requests), we parse and split the messages. Every message is put in the queue. Once the messages are queued, the thread picks up one message to send to the higher layers of the ORB. Before doing that, if it finds more messages, it sends a notify to the reactor without resuming the handle. The next thread picks up a message from the queue and processes that. Once the queue is drained the last thread resumes the handle.

Sending Replies

We could use the outgoing path of the ORB to send replies. This would allow us to reuse most of the code in the outgoing data path. We were doing this till TAO-1.2.3. We run in to problems. When writing the reply the ORB gets flow controlled, and the ORB tries to flush the message by going into the reactor. This resulted in unnecessary nesting. The thread that gets into the Reactor could potentially handle other messages (incoming or outgoing) and the stack starts growing leading to crashes.

Solution to the nesting problem

The solution that we (plan to) adopt is pretty straight forward. The thread sending replies will not block to send the replies but queue the replies and return to the Reactor. (Note the careful usages of the terms "blocking in the Reactor" as opposed to "return back to the Reactor".

See Also:

https://svn.dre.vanderbilt.edu/viewvc/Middleware/trunk/TAO/docs/pluggable_protocols/index.html?revision=HEAD

Definition at line 245 of file Transport.h.


Member Enumeration Documentation

enum TAO_Transport::TAO_Message_Semantics

Enumerator:
TAO_ONEWAY_REQUEST 
TAO_TWOWAY_REQUEST 
TAO_REPLY 

Definition at line 599 of file Transport.h.

00600     {
00601       TAO_ONEWAY_REQUEST = 0,
00602       TAO_TWOWAY_REQUEST = 1,
00603       TAO_REPLY
00604     };


Constructor & Destructor Documentation

TAO_Transport::TAO_Transport ( CORBA::ULong  tag,
TAO_ORB_Core orb_core,
size_t  input_cdr_size = ACE_CDR::DEFAULT_BUFSIZE 
)

Default creator, requires the tag value be supplied.

Definition at line 127 of file Transport.cpp.

References ACE_NEW, ACE_NEW_THROW_EX, TAO_ORB_Core::client_factory(), TAO_Client_Strategy_Factory::create_transport_mux_strategy(), TAO_Client_Strategy_Factory::create_wait_strategy(), orb_core(), orb_core_, tms_, and ws_.

00130   : tag_ (tag)
00131   , orb_core_ (orb_core)
00132   , cache_map_entry_ (0)
00133   , bidirectional_flag_ (-1)
00134   , opening_connection_role_ (TAO::TAO_UNSPECIFIED_ROLE)
00135   , head_ (0)
00136   , tail_ (0)
00137   , incoming_message_queue_ (orb_core)
00138   , current_deadline_ (ACE_Time_Value::zero)
00139   , flush_timer_id_ (-1)
00140   , transport_timer_ (this)
00141   , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ())
00142   , id_ ((size_t) this)
00143   , purging_order_ (0)
00144   , recv_buffer_size_ (0)
00145   , sent_byte_count_ (0)
00146   , is_connected_ (false)
00147   , messaging_object_ (0)
00148   , char_translator_ (0)
00149   , wchar_translator_ (0)
00150   , tcs_set_ (0)
00151   , first_request_ (1)
00152   , partial_message_ (0)
00153 #if TAO_HAS_SENDFILE == 1
00154     // The ORB has been configured to use the MMAP allocator, meaning
00155     // we could/should use sendfile() to send data.  Cast once rather
00156     // here rather than during each send.  This assumes that all
00157     // TAO_OutputCDR instances are using the same TAO_MMAP_Allocator
00158     // instance as the underlying output CDR buffer allocator.
00159   , mmap_allocator_ (
00160       dynamic_cast<TAO_MMAP_Allocator *> (
00161         orb_core->output_cdr_buffer_allocator ()))
00162 #endif  /* TAO_HAS_SENDFILE==1 */
00163 #if TAO_HAS_TRANSPORT_CURRENT == 1
00164   , stats_ (0)
00165 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
00166 {
00167   ACE_NEW (this->messaging_object_,
00168             TAO_GIOP_Message_Base (orb_core,
00169                                    this,
00170                                    input_cdr_size));
00171 
00172   TAO_Client_Strategy_Factory *cf =
00173     this->orb_core_->client_factory ();
00174 
00175   // Create WS now.
00176   this->ws_ = cf->create_wait_strategy (this);
00177 
00178   // Create TMS now.
00179   this->tms_ = cf->create_transport_mux_strategy (this);
00180 
00181 #if TAO_HAS_TRANSPORT_CURRENT == 1
00182   // Allocate stats
00183   ACE_NEW_THROW_EX (this->stats_,
00184                     TAO::Transport::Stats,
00185                     CORBA::NO_MEMORY ());
00186 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
00187 
00188   /*
00189    * Hook to add code that initializes components that
00190    * belong to the concrete protocol implementation.
00191    * Further additions to this Transport class will
00192    * need to add code *before* this hook.
00193    */
00194   //@@ TAO_TRANSPORT_SPL_CONSTRUCTOR_ADD_HOOK
00195 }

TAO_Transport::~TAO_Transport ( void   )  [virtual]

Destructor.

Definition at line 197 of file Transport.cpp.

References ACE_ASSERT, cleanup_queue_i(), handler_lock_, messaging_object_, purge_entry(), ACE_Message_Block::release(), stats_, tms_, and ws_.

00198 {
00199   delete this->messaging_object_;
00200 
00201   delete this->ws_;
00202 
00203   delete this->tms_;
00204 
00205   delete this->handler_lock_;
00206 
00207   if (!this->is_connected_)
00208     {
00209       // When we have a not connected transport we could have buffered
00210       // messages on this transport which we have to cleanup now.
00211       this->cleanup_queue_i();
00212 
00213       // Cleanup our cache entry
00214       this->purge_entry();
00215     }
00216 
00217   // Release the partial message block, however we may
00218   // have never allocated one.
00219   ACE_Message_Block::release (this->partial_message_);
00220 
00221   // By the time the destructor is reached here all the connection stuff
00222   // *must* have been cleaned up.
00223 
00224   // The following assert is needed for the test "Bug_2494_Regression".
00225   // See the bugzilla bug #2494 for details.
00226   ACE_ASSERT (this->head_ == 0);
00227   ACE_ASSERT (this->cache_map_entry_ == 0);
00228 
00229 #if TAO_HAS_TRANSPORT_CURRENT == 1
00230   delete this->stats_;
00231 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
00232 
00233   /*
00234    * Hook to add code that cleans up components
00235    * belong to the concrete protocol implementation.
00236    * Further additions to this Transport class will
00237    * need to add code *before* this hook.
00238    */
00239   //@@ TAO_TRANSPORT_SPL_DESTRUCTOR_ADD_HOOK
00240 }

TAO_Transport::TAO_Transport ( const TAO_Transport  )  [private]


Member Function Documentation

ACE_Event_Handler::Reference_Count TAO_Transport::add_reference ( void   ) 

Memory management routines.

Definition at line 2582 of file Transport.cpp.

References ACE_Event_Handler::add_reference(), and event_handler_i().

Referenced by TAO::Cache_IntId::Cache_IntId(), TAO_Connection_Handler::connection_pending(), TAO::Cache_IntId::operator=(), TAO::Transport_Cache_Manager::purge(), and TAO_Asynch_Reply_Dispatcher_Base::transport().

02583 {
02584   return this->event_handler_i ()->add_reference ();
02585 }

void TAO_Transport::allocate_partial_message_block ( void   )  [private]

Allocate a partial message block and store it in our partial_message_ data member.

Definition at line 2662 of file Transport.cpp.

References ACE_NEW, TAO_GIOP_Message_Base::header_length(), and messaging_object().

Referenced by handle_input_parse_data().

02663 {
02664   if (this->partial_message_ == 0)
02665     {
02666       // This value must be at least large enough to hold a GIOP message
02667       // header plus a GIOP fragment header
02668       size_t const partial_message_size =
02669         this->messaging_object ()->header_length ();
02670        // + this->messaging_object ()->fragment_header_length ();
02671        // deprecated, conflicts with not-single_read_opt.
02672 
02673       ACE_NEW (this->partial_message_,
02674                ACE_Message_Block (partial_message_size));
02675     }
02676 }

void TAO_Transport::assign_translators ( TAO_InputCDR ,
TAO_OutputCDR  
)

Use the Transport's codeset factories to set the translator for input and output CDRs.

Definition at line 2552 of file Transport.cpp.

References TAO_Codeset_Translator_Base::assign(), char_translator_, and wchar_translator_.

Referenced by TAO::LocateRequest_Invocation::check_reply(), TAO::Synch_Twoway_Invocation::check_reply_status(), TAO_GIOP_Message_Base::process_reply_message(), TAO_GIOP_Message_Base::process_request(), TAO_GIOP_Message_Base::process_request_message(), and TAO::Remote_Invocation::write_header().

02553 {
02554   if (this->char_translator_)
02555     {
02556       this->char_translator_->assign (inp);
02557       this->char_translator_->assign (outp);
02558     }
02559   if (this->wchar_translator_)
02560     {
02561       this->wchar_translator_->assign (inp);
02562       this->wchar_translator_->assign (outp);
02563     }
02564 }

ACE_INLINE void TAO_Transport::bidirectional_flag ( int  flag  ) 

Set the bidirectional flag.

Definition at line 39 of file Transport.inl.

References bidirectional_flag_.

00040 {
00041   this->bidirectional_flag_ = flag;
00042 }

ACE_INLINE int TAO_Transport::bidirectional_flag ( void   )  const

Get the bidirectional flag.

Definition at line 33 of file Transport.inl.

References bidirectional_flag_.

Referenced by TAO_IIOP_Transport::generate_request_header(), TAO_Muxed_TMS::request_id(), TAO_Exclusive_TMS::request_id(), and TAO_IIOP_Transport::tear_listen_point_list().

00034 {
00035   return this->bidirectional_flag_;
00036 }

ACE_INLINE TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * TAO_Transport::cache_map_entry ( void   ) 

Get the Cache Map entry.

Definition at line 57 of file Transport.inl.

References cache_map_entry_.

00058 {
00059   return this->cache_map_entry_;
00060 }

ACE_INLINE void TAO_Transport::cache_map_entry ( TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *  entry  ) 

Set the Cache Map entry.

Definition at line 63 of file Transport.inl.

References cache_map_entry_.

Referenced by TAO::Transport_Cache_Manager::bind_i().

00065 {
00066   this->cache_map_entry_ = entry;
00067 }

int TAO_Transport::cancel_output_i ( void   )  [private]

Cancel handle_output() callbacks.

Definition at line 805 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, ACE_Reactor::cancel_wakeup(), event_handler_i(), LM_DEBUG, ACE_Event_Handler::reactor(), TAO_debug_level, and ACE_Event_Handler::WRITE_MASK.

Referenced by TAO_Reactive_Flushing_Strategy::cancel_output(), and TAO_Leader_Follower_Flushing_Strategy::cancel_output().

00806 {
00807   ACE_Event_Handler * const eh = this->event_handler_i ();
00808   ACE_Reactor *const reactor = eh->reactor ();
00809 
00810   if (TAO_debug_level > 3)
00811     {
00812       ACE_DEBUG ((LM_DEBUG,
00813          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"),
00814          this->id ()));
00815     }
00816 
00817   return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00818 }

ACE_INLINE void TAO_Transport::char_translator ( TAO_Codeset_Translator_Base  ) 

CodeSet negotiation - Set the char codeset translator factory.

Definition at line 143 of file Transport.inl.

References char_translator_, and tcs_set_.

00144 {
00145   this->char_translator_ = tf;
00146   this->tcs_set_ = 1;
00147 }

ACE_INLINE TAO_Codeset_Translator_Base * TAO_Transport::char_translator ( void   )  const

CodeSet Negotiation - Get the char codeset translator factory.

Definition at line 131 of file Transport.inl.

References char_translator_.

00132 {
00133   return this->char_translator_;
00134 }

int TAO_Transport::check_buffering_constraints_i ( TAO_Stub stub,
bool &  must_flush 
) [private]

Check if the buffering constraints have been reached.

Definition at line 1152 of file Transport.cpp.

References ACE_Reactor::cancel_timer(), current_deadline_, event_handler_i(), flush_timer_id_, ACE_OS::gettimeofday(), ACE_Event_Handler::reactor(), ACE_Reactor::schedule_timer(), and TAO_Stub::transport_queueing_strategy().

Referenced by send_asynchronous_message_i().

01153 {
01154   // First let's compute the size of the queue:
01155   size_t msg_count = 0;
01156   size_t total_bytes = 0;
01157 
01158   for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
01159     {
01160       ++msg_count;
01161       total_bytes += i->message_length ();
01162     }
01163 
01164   bool set_timer = false;
01165   ACE_Time_Value new_deadline;
01166 
01167   TAO::Transport_Queueing_Strategy *queue_strategy =
01168     stub->transport_queueing_strategy ();
01169 
01170   bool constraints_reached = true;
01171 
01172   if (queue_strategy)
01173     {
01174       constraints_reached =
01175         queue_strategy->buffering_constraints_reached (stub,
01176                                                        msg_count,
01177                                                        total_bytes,
01178                                                        must_flush,
01179                                                        this->current_deadline_,
01180                                                        set_timer,
01181                                                        new_deadline);
01182     }
01183   else
01184     {
01185       must_flush = false;
01186     }
01187 
01188   // ... set the new timer, also cancel any previous timers ...
01189   if (set_timer)
01190     {
01191       ACE_Event_Handler *eh = this->event_handler_i ();
01192       ACE_Reactor * const reactor = eh->reactor ();
01193       this->current_deadline_ = new_deadline;
01194       ACE_Time_Value delay = new_deadline - ACE_OS::gettimeofday ();
01195 
01196       if (this->flush_timer_pending ())
01197         {
01198           reactor->cancel_timer (this->flush_timer_id_);
01199         }
01200 
01201       this->flush_timer_id_ =
01202         reactor->schedule_timer (&this->transport_timer_,
01203                                  &this->current_deadline_,
01204                                  delay);
01205     }
01206 
01207   return constraints_reached;
01208 }

void TAO_Transport::cleanup_queue ( size_t  byte_count  )  [private]

Cleanup the queue.

Exactly byte_count bytes have been sent, the queue must be cleaned up as potentially several messages have been completely sent out. It leaves on head_ the next message to send out.

Definition at line 1115 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, TAO_Queued_Message::all_data_sent(), TAO_Queued_Message::bytes_transferred(), TAO_Queued_Message::destroy(), head_, LM_DEBUG, TAO_Queued_Message::message_length(), TAO_Queued_Message::remove_from_list(), and TAO_debug_level.

Referenced by drain_queue_helper().

01116 {
01117   while (this->head_ != 0 && byte_count > 0)
01118     {
01119       TAO_Queued_Message *i = this->head_;
01120 
01121       if (TAO_debug_level > 4)
01122         {
01123           ACE_DEBUG ((LM_DEBUG,
01124              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01125              ACE_TEXT ("byte_count = %d\n"),
01126              this->id (), byte_count));
01127         }
01128 
01129       // Update the state of the first message
01130       i->bytes_transferred (byte_count);
01131 
01132       if (TAO_debug_level > 4)
01133         {
01134           ACE_DEBUG ((LM_DEBUG,
01135              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01136              ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"),
01137              this->id (), byte_count, i->all_data_sent (),
01138              i->message_length ()));
01139         }
01140 
01141       // ... if all the data was sent the message must be removed from
01142       // the queue...
01143       if (i->all_data_sent ())
01144         {
01145           i->remove_from_list (this->head_, this->tail_);
01146           i->destroy ();
01147         }
01148     }
01149 }

void TAO_Transport::cleanup_queue_i (  )  [private]

Cleanup the complete queue.

Definition at line 1072 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, TAO_Queued_Message::destroy(), head_, TAO_LF_Event::LFS_CONNECTION_CLOSED, LM_DEBUG, TAO_Queued_Message::message_length(), TAO_Queued_Message::remove_from_list(), TAO_LF_Event::state_changed(), and TAO_debug_level.

Referenced by pre_close(), send_connection_closed_notifications_i(), and ~TAO_Transport().

01073 {
01074   if (TAO_debug_level > 4)
01075     {
01076       ACE_DEBUG ((LM_DEBUG,
01077          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
01078          ACE_TEXT ("cleaning up complete queue\n"),
01079          this->id ()));
01080     }
01081 
01082   size_t byte_count = 0;
01083   int msg_count = 0;
01084 
01085   // Cleanup all messages
01086   while (this->head_ != 0)
01087     {
01088       TAO_Queued_Message *i = this->head_;
01089 
01090       if (TAO_debug_level > 4)
01091         {
01092           byte_count += i->message_length();
01093           ++msg_count;
01094         }
01095        // @@ This is a good point to insert a flag to indicate that a
01096        //    CloseConnection message was successfully received.
01097       i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
01098                         this->orb_core_->leader_follower ());
01099 
01100       i->remove_from_list (this->head_, this->tail_);
01101 
01102       i->destroy ();
01103     }
01104 
01105   if (TAO_debug_level > 4)
01106     {
01107       ACE_DEBUG ((LM_DEBUG,
01108                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
01109                   ACE_TEXT ("discarded %d messages, %u bytes.\n"),
01110                   this->id (), msg_count, byte_count));
01111     }
01112 }

void TAO_Transport::clear_translators ( TAO_InputCDR ,
TAO_OutputCDR  
)

It is necessary to clear the codeset translator when a CDR stream is used for more than one GIOP message. This is required since the header must not be translated, whereas the body must be.

Definition at line 2567 of file Transport.cpp.

References ACE_OutputCDR::char_translator(), ACE_InputCDR::char_translator(), ACE_OutputCDR::wchar_translator(), and ACE_InputCDR::wchar_translator().

Referenced by TAO::Remote_Invocation::write_header().

02568 {
02569   if (inp)
02570     {
02571       inp->char_translator (0);
02572       inp->wchar_translator (0);
02573     }
02574   if (outp)
02575     {
02576       outp->char_translator (0);
02577       outp->wchar_translator (0);
02578     }
02579 }

void TAO_Transport::close_connection ( void   )  [virtual]

Call the implementation method after obtaining the lock.

Definition at line 329 of file Transport.cpp.

References TAO_Connection_Handler::close_connection(), and connection_handler_i().

Referenced by TAO_IIOP_Connector::complete_connection(), TAO::Synch_Twoway_Invocation::remote_twoway(), TAO_GIOP_Message_Base::send_close_connection(), TAO::Remote_Invocation::send_message(), send_message_shared(), and TAO::Synch_Twoway_Invocation::wait_for_reply().

00330 {
00331   this->connection_handler_i ()->close_connection ();
00332 }

ACE_INLINE TAO_Connection_Handler * TAO_Transport::connection_handler ( void   ) 

Get the connection handler for this transport.

Definition at line 181 of file Transport.inl.

References connection_handler_i().

Referenced by TAO_IIOP_Connector::complete_connection(), TAO::Remote_Invocation::send_message(), TAO_Connect_Strategy::wait(), and TAO_Connector::wait_for_connection_completion().

00182 {
00183   return this->connection_handler_i();
00184 }

virtual TAO_Connection_Handler* TAO_Transport::connection_handler_i ( void   )  [protected, pure virtual]

Implemented in TAO_IIOP_Transport.

Referenced by close_connection(), and connection_handler().

int TAO_Transport::consolidate_enqueue_message ( TAO_Queued_Data qd  )  [private]

Returns:
-1 error, otherwise 0

Definition at line 1716 of file Transport.cpp.

References ACE_ERROR, ACE_TEXT, LM_ERROR, TAO_Queued_Data::missing_data(), TAO_Queued_Data::more_fragments(), TAO_Queued_Data::msg_type(), TAO_Queued_Data::release(), and TAO_debug_level.

01717 {
01718   // consolidate message on top of stack, only for fragmented messages
01719 
01720   // paranoid check
01721   if (q_data->missing_data () != 0)
01722     {
01723        return -1;
01724     }
01725 
01726   if (q_data->more_fragments () ||
01727       q_data->msg_type () == GIOP::Fragment)
01728     {
01729       TAO_Queued_Data *new_q_data = 0;
01730 
01731       switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01732         {
01733         case -1: // error
01734           return -1;
01735 
01736         case 0:  // returning consolidated message in new_q_data
01737           if (!new_q_data)
01738             {
01739               if (TAO_debug_level > 0)
01740                 {
01741                   ACE_ERROR ((LM_ERROR,
01742                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ")
01743                      ACE_TEXT ("error, consolidated message is NULL\n"),
01744                      this->id ()));
01745                 }
01746               return -1;
01747             }
01748 
01749           if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0)
01750             {
01751               TAO_Queued_Data::release (new_q_data);
01752               return -1;
01753             }
01754           break;
01755 
01756         case 1:  // fragment has been stored in messaging_oject()
01757           break;
01758         }
01759     }
01760   else
01761     {
01762       if (this->incoming_message_queue_.enqueue_tail (q_data) != 0)
01763         {
01764           TAO_Queued_Data::release (q_data);
01765           return -1;
01766         }
01767     }
01768 
01769   return 0; // success
01770 }

int TAO_Transport::consolidate_process_message ( TAO_Queued_Data qd,
TAO_Resume_Handle rh 
) [private]

Returns:
-1 error, otherwise 0

Definition at line 1629 of file Transport.cpp.

References ACE_ERROR, ACE_TEXT, LM_ERROR, TAO_Queued_Data::missing_data(), TAO_Queued_Data::more_fragments(), TAO_Queued_Data::msg_type(), TAO_Queued_Data::release(), and TAO_debug_level.

01631 {
01632   // paranoid check
01633   if (q_data->missing_data () != 0)
01634     {
01635       if (TAO_debug_level > 0)
01636         {
01637            ACE_ERROR ((LM_ERROR,
01638               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01639               ACE_TEXT ("missing data\n"),
01640               this->id ()));
01641         }
01642        return -1;
01643     }
01644 
01645   if (q_data->more_fragments () ||
01646       q_data->msg_type () == GIOP::Fragment)
01647     {
01648       // consolidate message on top of stack, only for fragmented messages
01649       TAO_Queued_Data *new_q_data = 0;
01650 
01651       switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01652         {
01653         case -1: // error
01654           return -1;
01655 
01656         case 0:  // returning consolidated message in q_data
01657           if (!new_q_data)
01658             {
01659               if (TAO_debug_level > 0)
01660                 {
01661                   ACE_ERROR ((LM_ERROR,
01662                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01663                      ACE_TEXT ("error, consolidated message is NULL\n"),
01664                      this->id ()));
01665                 }
01666               return -1;
01667             }
01668 
01669 
01670           if (this->process_parsed_messages (new_q_data, rh) == -1)
01671             {
01672               TAO_Queued_Data::release (new_q_data);
01673 
01674               if (TAO_debug_level > 0)
01675                 {
01676                   ACE_ERROR ((LM_ERROR,
01677                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01678                      ACE_TEXT ("error processing consolidated message\n"),
01679                      this->id ()));
01680                 }
01681               return -1;
01682             }
01683 
01684           TAO_Queued_Data::release (new_q_data);
01685 
01686           break;
01687 
01688         case 1:  // fragment has been stored in messaging_oject()
01689           break;
01690         }
01691     }
01692   else
01693     {
01694       if (this->process_parsed_messages (q_data, rh) == -1)
01695         {
01696           TAO_Queued_Data::release (q_data);
01697 
01698           if (TAO_debug_level > 0)
01699             {
01700               ACE_ERROR ((LM_ERROR,
01701                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01702                  ACE_TEXT ("error processing message\n"),
01703                  this->id ()));
01704             }
01705           return -1;
01706         }
01707 
01708       TAO_Queued_Data::release (q_data);
01709 
01710     }
01711 
01712   return 0;
01713 }

int TAO_Transport::drain_queue ( ACE_Time_Value max_wait_time  )  [private]

Send some of the data in the queue.

As the outgoing data is drained this method is invoked to send as much of the current message as possible.

Returns 0 if there is more data to send, -1 if there was an error and 1 if the message was completely sent.

Definition at line 862 of file Transport.cpp.

References ACE_GUARD_RETURN, TAO_Flushing_Strategy::cancel_output(), drain_queue_i(), TAO_ORB_Core::flushing_strategy(), and orb_core().

Referenced by handle_output().

00863 {
00864   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00865   int const retval = this->drain_queue_i (max_wait_time);
00866 
00867   if (retval == 1)
00868     {
00869       // ... there is no current message or it was completely
00870       // sent, cancel output...
00871       TAO_Flushing_Strategy *flushing_strategy =
00872         this->orb_core ()->flushing_strategy ();
00873 
00874       flushing_strategy->cancel_output (this);
00875 
00876       return 0;
00877     }
00878 
00879   return retval;
00880 }

int TAO_Transport::drain_queue_helper ( int &  iovcnt,
iovec  iov[],
ACE_Time_Value max_wait_time 
) [private]

A helper routine used in drain_queue_i().

Definition at line 883 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, cleanup_queue(), dump_iov(), head_, LM_DEBUG, send(), sendfile(), sent_byte_count_, and TAO_debug_level.

Referenced by drain_queue_i().

00884 {
00885   size_t byte_count = 0;
00886   ACE_Countdown_Time countdown (max_wait_time);
00887 
00888   // ... send the message ...
00889   ssize_t retval = -1;
00890 
00891 #if TAO_HAS_SENDFILE == 1
00892   if (this->mmap_allocator_)
00893     retval = this->sendfile (this->mmap_allocator_,
00894                              iov,
00895                              iovcnt,
00896                              byte_count);
00897   else
00898 #endif  /* TAO_HAS_SENDFILE==1 */
00899     retval = this->send (iov, iovcnt, byte_count, max_wait_time);
00900 
00901   if (TAO_debug_level == 5)
00902     {
00903       dump_iov (iov, iovcnt, this->id (),
00904                 byte_count, "drain_queue_helper");
00905     }
00906 
00907   // ... now we need to update the queue, removing elements
00908   // that have been sent, and updating the last element if it
00909   // was only partially sent ...
00910   this->cleanup_queue (byte_count);
00911   iovcnt = 0;
00912 
00913   if (retval == 0)
00914     {
00915       if (TAO_debug_level > 4)
00916         {
00917           ACE_DEBUG ((LM_DEBUG,
00918              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00919              ACE_TEXT ("send() returns 0\n"),
00920              this->id ()));
00921         }
00922       return -1;
00923     }
00924   else if (retval == -1)
00925     {
00926       if (TAO_debug_level > 4)
00927         {
00928           ACE_DEBUG ((LM_DEBUG,
00929              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00930              ACE_TEXT ("error during send (errno: %d) - %m\n"),
00931              this->id (), errno));
00932         }
00933 
00934       if (errno == EWOULDBLOCK || errno == EAGAIN)
00935         {
00936           return 0;
00937         }
00938 
00939       return -1;
00940     }
00941 
00942   // ... start over, how do we guarantee progress?  Because if
00943   // no bytes are sent send() can only return 0 or -1
00944 
00945   // Total no. of bytes sent for a send call
00946   this->sent_byte_count_ += byte_count;
00947 
00948   if (TAO_debug_level > 4)
00949     {
00950       ACE_DEBUG ((LM_DEBUG,
00951          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00952          ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"),
00953          this->id(), byte_count, (this->head_ == 0)));
00954     }
00955 
00956   return 1;
00957 }

int TAO_Transport::drain_queue_i ( ACE_Time_Value max_wait_time  )  [private]

Implement drain_queue() assuming the lock is held.

Definition at line 960 of file Transport.cpp.

References ACE_DEBUG, ACE_IOV_MAX, ACE_TEXT, ACE_Reactor::cancel_timer(), TAO_Queued_Message::destroy(), drain_queue_helper(), TAO_Queued_Message::fill_iov(), ACE_High_Res_Timer::gettimeofday_hr(), head_, TAO_Queued_Message::is_expired(), TAO_LF_Event::LFS_TIMEOUT, LM_DEBUG, TAO_Queued_Message::next(), TAO_Queued_Message::remove_from_list(), reset_flush_timer(), sent_byte_count_, TAO_LF_Event::state_changed(), and TAO_debug_level.

Referenced by drain_queue(), send_message_block_chain_i(), and send_synch_message_helper_i().

00961 {
00962   // This is the vector used to send data, it must be declared outside
00963   // the loop because after the loop there may still be data to be
00964   // sent
00965   int iovcnt = 0;
00966 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00967   iovec iov[ACE_IOV_MAX] = { { 0 , 0 } };
00968 #else
00969   iovec iov[ACE_IOV_MAX];
00970 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
00971 
00972   // We loop over all the elements in the queue ...
00973   TAO_Queued_Message *i = this->head_;
00974 
00975   // Reset the value so that the counting is done for each new send
00976   // call.
00977   this->sent_byte_count_ = 0;
00978 
00979   // Avoid calling this expensive function each time through the loop. Instead
00980   // we'll assume that the time is unlikely to change much during the loop.
00981   // If we are forced to send in the loop then we'll recompute the time.
00982   ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr ();
00983 
00984   while (i != 0)
00985     {
00986       if (i->is_expired (now))
00987         {
00988           if (TAO_debug_level > 3)
00989           {
00990             ACE_DEBUG ((LM_DEBUG,
00991               ACE_TEXT ("TAO (%P|%t - Transport[%d]::drain_queue_i, ")
00992               ACE_TEXT ("Discarding expired queued message.\n"),
00993               this->id ()));
00994           }
00995           TAO_Queued_Message *next = i->next ();
00996           i->state_changed (TAO_LF_Event::LFS_TIMEOUT,
00997                             this->orb_core_->leader_follower ());
00998           i->remove_from_list (this->head_, this->tail_);
00999           i->destroy ();
01000           i = next;
01001           continue;
01002         }
01003       // ... each element fills the iovector ...
01004       i->fill_iov (ACE_IOV_MAX, iovcnt, iov);
01005 
01006       // ... the vector is full, no choice but to send some data out.
01007       // We need to loop because a single message can span multiple
01008       // IOV_MAX elements ...
01009       if (iovcnt == ACE_IOV_MAX)
01010         {
01011           int const retval = this->drain_queue_helper (iovcnt, iov,
01012                                                        max_wait_time);
01013 
01014           now = ACE_High_Res_Timer::gettimeofday_hr ();
01015 
01016           if (TAO_debug_level > 4)
01017             {
01018               ACE_DEBUG ((LM_DEBUG,
01019                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01020                  ACE_TEXT ("helper retval = %d\n"),
01021                  this->id (), retval));
01022             }
01023 
01024           if (retval != 1)
01025             {
01026               return retval;
01027             }
01028 
01029           i = this->head_;
01030           continue;
01031         }
01032       // ... notice that this line is only reached if there is still
01033       // room in the iovector ...
01034       i = i->next ();
01035     }
01036 
01037   if (iovcnt != 0)
01038     {
01039       int const retval = this->drain_queue_helper (iovcnt, iov, max_wait_time);
01040 
01041       if (TAO_debug_level > 4)
01042         {
01043           ACE_DEBUG ((LM_DEBUG,
01044               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01045               ACE_TEXT ("helper retval = %d\n"),
01046               this->id (), retval));
01047         }
01048 
01049       if (retval != 1)
01050         {
01051           return retval;
01052         }
01053     }
01054 
01055   if (this->head_ == 0)
01056     {
01057       if (this->flush_timer_pending ())
01058         {
01059           ACE_Event_Handler *eh = this->event_handler_i ();
01060           ACE_Reactor * const reactor = eh->reactor ();
01061           reactor->cancel_timer (this->flush_timer_id_);
01062           this->reset_flush_timer ();
01063         }
01064 
01065       return 1;
01066     }
01067 
01068   return 0;
01069 }

virtual ACE_Event_Handler* TAO_Transport::event_handler_i ( void   )  [pure virtual]

Normally a concrete TAO_Transport object has-a ACE_Event_Handler member that functions as an adapter between the ACE_Reactor framework and the TAO pluggable protocol framework. In all the protocols implemented so far this role is fullfilled by an instance of ACE_Svc_Handler.

Todo:
Since we only use a limited functionality of ACE_Svc_Handler we could probably implement a generic adapter class (TAO_Transport_Event_Handler or something), this will reduce footprint and simplify the process of implementing a pluggable protocol.
Todo:
This method has to be renamed to event_handler()

Implemented in TAO_IIOP_Transport.

Referenced by add_reference(), cancel_output_i(), check_buffering_constraints_i(), TAO::Transport_Cache_Manager::find_transport(), notify_reactor(), remove_reference(), schedule_output_i(), TAO_Connection_Handler::transport(), and TAO_Wait_On_Read::wait().

ACE_INLINE void TAO_Transport::first_request_sent (  ) 

Set the state of the first_request_ flag to 0.

Definition at line 164 of file Transport.inl.

References first_request_.

Referenced by TAO_IIOP_Transport::send_request().

00165 {
00166   this->first_request_ = 0;
00167 }

ACE_INLINE int TAO_Transport::flush_timer_pending ( void   )  const [private]

Check if the flush timer is still pending.

Definition at line 108 of file Transport.inl.

References flush_timer_id_.

00109 {
00110   return this->flush_timer_id_ != -1;
00111 }

int TAO_Transport::format_queue_message ( TAO_OutputCDR stream,
ACE_Time_Value max_wait_time 
)

Format and queue a message for stream

Parameters:
max_wait_time The maximum time that the operation can block, used in the implementation of timeouts.

Definition at line 515 of file Transport.cpp.

References ACE_OutputCDR::begin(), and queue_message_i().

Referenced by TAO::Synch_Oneway_Invocation::remote_oneway().

00517 {
00518   if (this->messaging_object ()->format_message (stream) != 0)
00519     return -1;
00520 
00521   return this->queue_message_i (stream.begin (), max_wait_time);
00522 }

int TAO_Transport::generate_locate_request ( TAO_Target_Specification spec,
TAO_Operation_Details opdetails,
TAO_OutputCDR output 
)

This is a request for the transport object to write a LocateRequest header before it is sent out.

Definition at line 388 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, LM_DEBUG, and TAO_debug_level.

Referenced by TAO::LocateRequest_Invocation::invoke().

00392 {
00393   if (this->messaging_object ()->generate_locate_request_header (opdetails,
00394                                                                  spec,
00395                                                                  output) == -1)
00396     {
00397       if (TAO_debug_level > 0)
00398         {
00399           ACE_DEBUG ((LM_DEBUG,
00400                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ")
00401                       ACE_TEXT ("error while marshalling the LocateRequest header\n"),
00402                       this->id ()));
00403         }
00404 
00405       return -1;
00406     }
00407 
00408   return 0;
00409 }

int TAO_Transport::generate_request_header ( TAO_Operation_Details opd,
TAO_Target_Specification spec,
TAO_OutputCDR msg 
) [virtual]

This is a request for the transport object to write a request header before it sends out the request

Reimplemented in TAO_IIOP_Transport.

Definition at line 412 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, TAO_ORB_Core::codeset_manager(), TAO_Codeset_Manager::generate_service_context(), LM_DEBUG, orb_core(), and TAO_debug_level.

Referenced by TAO_IIOP_Transport::generate_request_header().

00416 {
00417   // codeset service context is only supposed to be sent in the first request
00418   // on a particular connection.
00419   if (this->first_request_)
00420     {
00421       TAO_Codeset_Manager * const csm = this->orb_core ()->codeset_manager ();
00422       if (csm)
00423         csm->generate_service_context (opdetails,*this);
00424     }
00425 
00426   if (this->messaging_object ()->generate_request_header (opdetails,
00427                                                           spec,
00428                                                           output) == -1)
00429     {
00430       if (TAO_debug_level > 0)
00431         {
00432         ACE_DEBUG ((LM_DEBUG,
00433                    ACE_TEXT ("(%P|%t) - Transport[%d]::generate_request_header, ")
00434                    ACE_TEXT ("error while marshalling the Request header\n"),
00435                       this->id()));
00436         }
00437 
00438       return -1;
00439     }
00440 
00441   return 0;
00442 }

int TAO_Transport::handle_input ( TAO_Resume_Handle rh,
ACE_Time_Value max_wait_time = 0 
) [virtual]

Callback to read incoming data.

The ACE_Event_Handler adapter invokes this method as part of its handle_input() operation.

Todo:
the method name is confusing! Calling it handle_input() would probably make things easier to understand and follow!
Once a complete message is read the Transport class delegates on the Messaging layer to invoke the right upcall (on the server) or the TAO_Reply_Dispatcher (on the client side).

Parameters:
max_wait_time In some cases the I/O is synchronous, e.g. a thread-per-connection server or when Wait_On_Read is enabled. In those cases a maximum read time can be specified.

Definition at line 1555 of file Transport.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT, LM_DEBUG, LM_ERROR, TAO_Queued_Data::missing_data(), process_queue_head(), TAO_debug_level, and TAO_MISSING_DATA_UNDEFINED.

Referenced by TAO_Connection_Handler::handle_input_internal(), TAO_Connection_Handler::svc_i(), and TAO_Wait_On_Read::wait().

01557 {
01558   if (TAO_debug_level > 3)
01559     {
01560       ACE_DEBUG ((LM_DEBUG,
01561          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"),
01562          this->id ()));
01563     }
01564 
01565   // First try to process messages of the head of the incoming queue.
01566   int const retval = this->process_queue_head (rh);
01567 
01568   if (retval <= 0)
01569     {
01570       if (retval == -1)
01571         {
01572           if (TAO_debug_level > 2)
01573             {
01574               ACE_DEBUG ((LM_DEBUG,
01575                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01576                  ACE_TEXT ("error while parsing the head of the queue\n"),
01577                  this->id()));
01578 
01579             }
01580           return -1;
01581         }
01582       else
01583         {
01584           // retval == 0
01585 
01586           // Processed a message in queue successfully. This
01587           // thread must return to thread-pool now.
01588           return 0;
01589         }
01590     }
01591 
01592   TAO_Queued_Data *q_data = 0;
01593 
01594   if (this->incoming_message_stack_.top (q_data) != -1
01595       && q_data->missing_data () != TAO_MISSING_DATA_UNDEFINED)
01596     {
01597       /* PRE: q_data->missing_data_ > 0 as all QD on stack must be incomplete  */
01598       if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1)
01599         {
01600           if (TAO_debug_level > 0)
01601             {
01602               ACE_ERROR ((LM_ERROR,
01603                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01604                  ACE_TEXT ("error consolidating incoming message\n"),
01605                  this->id ()));
01606             }
01607           return -1;
01608         }
01609     }
01610   else
01611     {
01612       if (this->handle_input_parse_data (rh, max_wait_time) == -1)
01613         {
01614           if (TAO_debug_level > 0)
01615             {
01616               ACE_ERROR ((LM_ERROR,
01617                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01618                  ACE_TEXT ("error parsing incoming message\n"),
01619                  this->id ()));
01620             }
01621           return -1;
01622         }
01623     }
01624 
01625   return 0;
01626 }

int TAO_Transport::handle_input_missing_data ( TAO_Resume_Handle rh,
ACE_Time_Value max_wait_time,
TAO_Queued_Data q_data 
) [private]

Is invoked by handle_input operation. It consolidate message on top of incoming_message_stack. The amount of missing data is known and recv operation copies data directly into message buffer, as much as a single recv-invocation provides.

Definition at line 1773 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, ACE_CDR::grow(), ACE_Message_Block::length(), LM_DEBUG, TAO_Queued_Data::missing_data(), TAO_Queued_Data::msg_block(), recv(), recv_buffer_size_, ACE_Message_Block::space(), TAO_debug_level, and ACE_Message_Block::wr_ptr().

01776 {
01777   // paranoid check
01778   if (q_data == 0)
01779     {
01780       return -1;
01781     }
01782 
01783   if (TAO_debug_level > 3)
01784     {
01785       ACE_DEBUG ((LM_DEBUG,
01786          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01787          ACE_TEXT ("enter (missing data == %d)\n"),
01788          this->id (), q_data->missing_data ()));
01789     }
01790 
01791   size_t const recv_size = q_data->missing_data ();
01792 
01793   if (q_data->msg_block ()->space() < recv_size)
01794     {
01795       // make sure the message_block has enough space
01796       size_t const message_size = recv_size + q_data->msg_block ()->length();
01797 
01798       if (ACE_CDR::grow (q_data->msg_block (), message_size) == -1)
01799         {
01800           return -1;
01801         }
01802     }
01803 
01804   // Saving the size of the received buffer in case any one needs to
01805   // get the size of the message thats received in the
01806   // context. Obviously the value will be changed for each recv call
01807   // and the user is supposed to invoke the accessor only in the
01808   // invocation context to get meaningful information.
01809   this->recv_buffer_size_ = recv_size;
01810 
01811   // Read the message into the existing message block on heap
01812   ssize_t const n = this->recv (q_data->msg_block ()->wr_ptr(),
01813                                 recv_size,
01814                                 max_wait_time);
01815 
01816   if (n <= 0)
01817     {
01818       return n;
01819     }
01820 
01821   if (TAO_debug_level > 3)
01822     {
01823       ACE_DEBUG ((LM_DEBUG,
01824          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01825          ACE_TEXT ("read bytes %d\n"),
01826          this->id (), n));
01827     }
01828 
01829   q_data->msg_block ()->wr_ptr(n);
01830   q_data->missing_data (q_data->missing_data () - n);
01831 
01832   if (q_data->missing_data () == 0)
01833     {
01834       // paranoid check
01835       if (this->incoming_message_stack_.pop (q_data) == -1)
01836         {
01837           return -1;
01838         }
01839 
01840       if (this->consolidate_process_message (q_data, rh) == -1)
01841         {
01842           return -1;
01843         }
01844     }
01845 
01846   return 0;
01847 }

int TAO_Transport::handle_input_parse_data ( TAO_Resume_Handle rh,
ACE_Time_Value max_wait_time 
) [private]

Is invoked by handle_input operation. It parses new messages from input stream or consolidates messages whose header has been partially read, the message size being unknown so far. It parses as much data as a single recv-invocation provides.

Definition at line 1894 of file Transport.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT, allocate_partial_message_block(), ACE_Message_Block::DONT_DELETE, TAO_Queued_Data::duplicate(), ACE_CDR::grow(), handle_input_parse_extra_messages(), TAO_GIOP_Message_Base::header_length(), incoming_message_stack_, TAO_ORB_Core::input_cdr_dblock_allocator(), ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO_ORB_Core::locking_strategy(), ACE_CDR::MAX_ALIGNMENT, ACE_CDR::mb_align(), ACE_Message_Block::MB_DATA, ACE_OS::memset(), messaging_object(), TAO_Queued_Data::missing_data(), TAO_Queued_Data::msg_block(), notify_reactor(), orb_core_, partial_message_, ACE_Message_Block::rd_ptr(), recv(), recv_buffer_size_, ACE_Message_Block::reset(), TAO_Resume_Handle::set_flag(), TAO_debug_level, TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED, TAO_Resume_Handle::TAO_HANDLE_RESUMABLE, TAO_MAXBUFSIZE, and TAO_MISSING_DATA_UNDEFINED.

01896 {
01897 
01898   if (TAO_debug_level > 3)
01899     {
01900       ACE_DEBUG ((LM_DEBUG,
01901          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01902          ACE_TEXT ("enter\n"),
01903          this->id ()));
01904     }
01905 
01906 
01907   // The buffer on the stack which will be used to hold the input
01908   // messages, ACE_CDR::MAX_ALIGNMENT compensates the
01909   // memory-alignment. This improves performance with SUN-Java-ORB-1.4
01910   // and higher that sends fragmented requests of size 1024 bytes.
01911   char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT];
01912 
01913 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
01914   (void) ACE_OS::memset (buf,
01915                          '\0',
01916                          sizeof buf);
01917 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
01918 
01919   // Create a data block
01920   ACE_Data_Block db (sizeof (buf),
01921                      ACE_Message_Block::MB_DATA,
01922                      buf,
01923                      this->orb_core_->input_cdr_buffer_allocator (),
01924                      this->orb_core_->locking_strategy (),
01925                      ACE_Message_Block::DONT_DELETE,
01926                      this->orb_core_->input_cdr_dblock_allocator ());
01927 
01928   // Create a message block
01929   ACE_Message_Block message_block (&db,
01930                                    ACE_Message_Block::DONT_DELETE,
01931                                    this->orb_core_->input_cdr_msgblock_allocator ());
01932 
01933 
01934   // Align the message block
01935   ACE_CDR::mb_align (&message_block);
01936 
01937   size_t recv_size = 0; // Note: unsigned integer
01938 
01939   // Pointer to newly parsed message
01940   TAO_Queued_Data *q_data = 0;
01941 
01942   // optimizing access of constants
01943   size_t const header_length = this->messaging_object ()->header_length ();
01944 
01945   // paranoid check
01946   if (header_length > message_block.space ())
01947     {
01948       return -1;
01949     }
01950 
01951   if (this->orb_core_->orb_params ()->single_read_optimization ())
01952     {
01953       recv_size = message_block.space ();
01954     }
01955   else
01956     {
01957       // Single read optimization has been de-activated. That means
01958       // that we need to read from transport the GIOP header first
01959       // before the payload. This codes first checks the incoming
01960       // stack for partial messages which needs to be
01961       // consolidated. Otherwise we are in new cycle, reading complete
01962       // GIOP header of new incoming message.
01963       if (this->incoming_message_stack_.top (q_data) != -1
01964            && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
01965         {
01966           // There is a partial message on incoming_message_stack_
01967           // whose length is unknown so far. We need to consolidate
01968           // the GIOP header to get to know the payload size,
01969           recv_size = header_length - q_data->msg_block ()->length ();
01970         }
01971       else
01972         {
01973           // Read amount of data forming GIOP header of new incoming
01974           // message.
01975           recv_size = header_length;
01976         }
01977       // POST: 0 <= recv_size <= header_length
01978     }
01979   // POST: 0 <= recv_size <= message_block->space ()
01980 
01981   // If we have a partial message, copy it into our message block and
01982   // clear out the partial message.
01983   if (this->partial_message_ != 0 && this->partial_message_->length () > 0)
01984     {
01985       // (*) Copy back the partial message into current read-buffer,
01986       // verify that the read-strategy of "recv_size" bytes is not
01987       // exceeded. The latter check guarantees that recv_size does not
01988       // roll-over and keeps in range
01989       // 0<=recv_size<=message_block->space()
01990       if (this->partial_message_->length () <= recv_size &&
01991           message_block.copy (this->partial_message_->rd_ptr (),
01992                               this->partial_message_->length ()) == 0)
01993         {
01994 
01995           recv_size -= this->partial_message_->length ();
01996           this->partial_message_->reset ();
01997         }
01998       else
01999         {
02000           return -1;
02001         }
02002     }
02003   // POST: 0 <= recv_size <= buffer_space
02004 
02005   if (0 >= recv_size) // paranoid: the check above (*) guarantees recv_size>=0
02006     {
02007       // This event would cause endless looping, trying frequently to
02008       // read zero bytes from stream.  This might happen, if TAOs
02009       // protocol implementation is not correct and tries to read data
02010       // beyond header without "single_read_optimazation" being
02011       // activated.
02012       if (TAO_debug_level > 0)
02013         {
02014           ACE_ERROR ((LM_ERROR,
02015              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02016              ACE_TEXT ("Error - endless loop detection, closing connection"),
02017              this->id ()));
02018         }
02019       return -1;
02020     }
02021 
02022   // Saving the size of the received buffer in case any one needs to
02023   // get the size of the message thats received in the
02024   // context. Obviously the value will be changed for each recv call
02025   // and the user is supposed to invoke the accessor only in the
02026   // invocation context to get meaningful information.
02027   this->recv_buffer_size_ = recv_size;
02028 
02029   // Read the message into the  message block that we have created on
02030   // the stack.
02031   ssize_t const n = this->recv (message_block.wr_ptr (),
02032                                 recv_size,
02033                                 max_wait_time);
02034 
02035   // If there is an error return to the reactor..
02036   if (n <= 0)
02037     {
02038       return n;
02039     }
02040 
02041   if (TAO_debug_level > 3)
02042     {
02043       ACE_DEBUG ((LM_DEBUG,
02044          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02045          ACE_TEXT ("read %d bytes\n"),
02046          this->id (), n));
02047     }
02048 
02049   // Set the write pointer in the stack buffer
02050   message_block.wr_ptr (n);
02051 
02052   //
02053   // STACK PROCESSING OR MESSAGE CONSOLIDATION
02054   //
02055 
02056   // PRE: data in buffer is aligned && message_block.length() > 0
02057 
02058   if (this->incoming_message_stack_.top (q_data) != -1
02059       && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
02060     {
02061       //
02062       // MESSAGE CONSOLIDATION
02063       //
02064 
02065       // Partial message on incoming_message_stack_ needs to be
02066       // consolidated.  The message header could not be parsed so far
02067       // and therefor the message size is unknown yet. Consolidating
02068       // the message destroys the memory alignment of succeeding
02069       // messages sharing the buffer, for that reason consolidation
02070       // and stack based processing are mutial exclusive.
02071       if (this->messaging_object ()->consolidate_node (q_data,
02072                                                        message_block) == -1)
02073         {
02074            if (TAO_debug_level > 0)
02075             {
02076                 ACE_ERROR ((LM_ERROR,
02077                    ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02078                    ACE_TEXT ("error consolidating message from input buffer\n"),
02079                    this->id () ));
02080              }
02081            return -1;
02082         }
02083 
02084       // Complete message are to be enqueued and later processed
02085       if (q_data->missing_data () == 0)
02086         {
02087           if (this->incoming_message_stack_.pop (q_data) == -1)
02088             {
02089               return -1;
02090             }
02091 
02092           if (this->consolidate_enqueue_message (q_data) == -1)
02093             {
02094               return -1;
02095             }
02096         }
02097 
02098       if (message_block.length () > 0
02099           && this->handle_input_parse_extra_messages (message_block) == -1)
02100         {
02101           return -1;
02102         }
02103 
02104       // In any case try to process the enqueued messages
02105       if (this->process_queue_head (rh) == -1)
02106         {
02107           return -1;
02108         }
02109     }
02110   else
02111     {
02112       //
02113       // STACK PROCESSING (critical path)
02114       //
02115 
02116       // Process the first message in buffer on stack
02117 
02118       // (PRE: first message resides in aligned memory) Make a node of
02119       // the message-block..
02120 
02121       TAO_Queued_Data qd (&message_block,
02122                           this->orb_core_->transport_message_buffer_allocator ());
02123 
02124       size_t mesg_length  = 0;
02125 
02126       if (this->messaging_object ()->parse_next_message (qd,
02127                                                          mesg_length) == -1
02128           || (qd.missing_data () == 0
02129               && mesg_length > message_block.length ()) )
02130         {
02131           // extracting message failed
02132           return -1;
02133         }
02134       // POST: qd.missing_data_ == 0 --> mesg_length <= message_block.length()
02135       // This prevents seeking rd_ptr behind the wr_ptr
02136 
02137       if (qd.missing_data () != 0 ||
02138           qd.more_fragments ()   ||
02139           qd.msg_type () == GIOP::Fragment)
02140         {
02141           if (qd.missing_data () == 0)
02142             {
02143               // Dealing with a fragment
02144               TAO_Queued_Data *nqd =
02145                 TAO_Queued_Data::duplicate (qd);
02146 
02147               if (nqd == 0)
02148                 {
02149                   return -1;
02150                 }
02151 
02152               // mark the end of message in new buffer
02153               char* end_mark = nqd->msg_block ()->rd_ptr ()
02154                              + mesg_length;
02155               nqd->msg_block ()->wr_ptr (end_mark);
02156 
02157               // move the read pointer forward in old buffer
02158               message_block.rd_ptr (mesg_length);
02159 
02160               // enqueue the message
02161               if (this->consolidate_enqueue_message (nqd) == -1)
02162                 {
02163                   return -1;
02164                 }
02165 
02166               if (message_block.length () > 0
02167                   && this->handle_input_parse_extra_messages (message_block) == -1)
02168                 {
02169                   return -1;
02170                 }
02171 
02172               // In any case try to process the enqueued messages
02173               if (this->process_queue_head (rh) == -1)
02174                 {
02175                   return -1;
02176                 }
02177             }
02178           else if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED)
02179             {
02180               // Incomplete message, must be the last one in buffer
02181 
02182               if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED &&
02183                   qd.missing_data () > message_block.space ())
02184                 {
02185                   // Re-Allocate correct size on heap
02186                   if (ACE_CDR::grow (qd.msg_block (),
02187                                      message_block.length ()
02188                                      + qd.missing_data ()) == -1)
02189                     {
02190                       return -1;
02191                     }
02192                 }
02193 
02194               TAO_Queued_Data *nqd =
02195                 TAO_Queued_Data::duplicate (qd);
02196 
02197               if (nqd == 0)
02198                 {
02199                   return -1;
02200                 }
02201 
02202               // move read-pointer to end of buffer
02203               message_block.rd_ptr (message_block.length());
02204 
02205               this->incoming_message_stack_.push (nqd);
02206             }
02207         }
02208       else
02209         {
02210           //
02211           // critical path
02212           //
02213 
02214           // We cant process the message on stack right now. First we
02215           // have got to parse extra messages from message_block,
02216           // putting them into queue.  When this is done we can return
02217           // to process this message, and notifying other threads to
02218           // process the messages in queue.
02219 
02220           char * end_marker = message_block.rd_ptr ()
02221                             + mesg_length;
02222 
02223           if (message_block.length () > mesg_length)
02224             {
02225               // There are more message in data stream to be parsed.
02226               // Safe the rd_ptr to restore later.
02227               char *rd_ptr_stack_mesg = message_block.rd_ptr ();
02228 
02229               // Skip parsed message, jump to next message in buffer
02230               // PRE: mesg_length <= message_block.length ()
02231               message_block.rd_ptr (mesg_length);
02232 
02233               // Extract remaining messages and enqueue them for later
02234               // heap processing
02235               if (this->handle_input_parse_extra_messages (message_block) == -1)
02236                 {
02237                   return -1;
02238                 }
02239 
02240               // correct the end_marker
02241               end_marker = message_block.rd_ptr ();
02242 
02243               // Restore rd_ptr
02244               message_block.rd_ptr (rd_ptr_stack_mesg);
02245             }
02246 
02247           // The following if-else has been copied from
02248           // process_queue_head().  While process_queue_head()
02249           // processes message on heap, here we will process a message
02250           // on stack.
02251 
02252           // Now that we have one message on stack to be processed,
02253           // check whether we have one more message in the queue...
02254           if (this->incoming_message_queue_.queue_length () > 0)
02255             {
02256               if (TAO_debug_level > 0)
02257                 {
02258                   ACE_DEBUG ((LM_DEBUG,
02259                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02260                      ACE_TEXT ("notify reactor\n"),
02261                      this->id ()));
02262 
02263                 }
02264 
02265               const int retval = this->notify_reactor ();
02266 
02267               if (retval == 1)
02268                 {
02269                   // Let the class know that it doesn't need to resume  the
02270                   // handle..
02271                   rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02272                 }
02273               else if (retval < 0)
02274                 return -1;
02275             }
02276           else
02277             {
02278               // As there are no further messages in queue just resume
02279               // the handle. Set the flag incase someone had reset the flag..
02280               rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02281             }
02282 
02283           // PRE: incoming_message_queue is empty
02284           if (this->process_parsed_messages (&qd,
02285                                              rh) == -1)
02286             {
02287               return -1;
02288             }
02289 
02290           // move the rd_ptr tp position of end_marker
02291           message_block.rd_ptr (end_marker);
02292         }
02293     }
02294 
02295   // Now that all cases have been processed, there might be kept some data
02296   // in buffer that needs to be safed for next "handle_input" invocations.
02297    if (message_block.length () > 0)
02298      {
02299        if (this->partial_message_ == 0)
02300          {
02301            this->allocate_partial_message_block ();
02302          }
02303 
02304        if (this->partial_message_ != 0 &&
02305            this->partial_message_->copy (message_block.rd_ptr (),
02306                                          message_block.length ()) == 0)
02307          {
02308            message_block.rd_ptr (message_block.length ());
02309          }
02310        else
02311          {
02312            return -1;
02313          }
02314      }
02315 
02316    return 0;
02317 }

int TAO_Transport::handle_input_parse_extra_messages ( ACE_Message_Block message_block  )  [private]

Is invoked by handle_input_parse_data. Parses all messages remaining in message_block.

Definition at line 1851 of file Transport.cpp.

References TAO_GIOP_Message_Base::extract_next_message(), incoming_message_stack_, ACE_Message_Block::length(), messaging_object(), and TAO_Queued_Data::missing_data().

Referenced by handle_input_parse_data().

01852 {
01853 
01854   // store buffer status of last extraction: -1 parse error, 0
01855   // incomplete message header in buffer, 1 complete messages header
01856   // parsed
01857   int buf_status = 0;
01858 
01859   TAO_Queued_Data *q_data = 0;     // init
01860 
01861   // parse buffer until all messages have been extracted, consolidate
01862   // and enqueue complete messages, if the last message being parsed
01863   // has missin data, it is stays on top of incoming_message_stack.
01864   while (message_block.length () > 0 &&
01865          (buf_status = this->messaging_object ()->extract_next_message
01866           (message_block, q_data)) != -1 &&
01867          q_data != 0) // paranoid check
01868     {
01869       if (q_data->missing_data () == 0)
01870         {
01871           if (this->consolidate_enqueue_message (q_data) == -1)
01872             {
01873               return -1;
01874             }
01875         }
01876       else  // incomplete message read, probably the last message in buffer
01877         {
01878           // can not fail
01879           this->incoming_message_stack_.push (q_data);
01880         }
01881 
01882       q_data = 0; // reset
01883     } // while
01884 
01885   if (buf_status == -1)
01886     {
01887       return -1;
01888     }
01889 
01890   return 0;
01891 }

int TAO_Transport::handle_output ( ACE_Time_Value max_wait_time  ) 

Callback method to reactively drain the outgoing data queue.

Definition at line 487 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, drain_queue(), LM_DEBUG, and TAO_debug_level.

Referenced by TAO_Block_Flushing_Strategy::flush_message(), TAO_Block_Flushing_Strategy::flush_transport(), and TAO_Connection_Handler::handle_output_eh().

00488 {
00489   if (TAO_debug_level > 3)
00490     {
00491       ACE_DEBUG ((LM_DEBUG,
00492                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output\n"),
00493                   this->id ()));
00494     }
00495 
00496   // The flushing strategy (potentially via the Reactor) wants to send
00497   // more data, first check if there is a current message that needs
00498   // more sending...
00499   int const retval = this->drain_queue (max_wait_time);
00500 
00501   if (TAO_debug_level > 3)
00502     {
00503       ACE_DEBUG ((LM_DEBUG,
00504                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ")
00505                   ACE_TEXT ("drain_queue returns %d/%d\n"),
00506                   this->id (),
00507                   retval, errno));
00508     }
00509 
00510   // Any errors are returned directly to the Reactor
00511   return retval;
00512 }

int TAO_Transport::handle_timeout ( const ACE_Time_Value current_time,
const void *  act 
)

Parameters:
current_time The current time as reported from the Reactor
act The Asynchronous Completion Token. Currently it is interpreted as follows:
  • If the ACT is the address of this->current_deadline_ the queueing timeout has expired and the queue should start flushing.
Returns:
Returns 0 if there are no problems, -1 if there is an error
Todo:
In the future this function could be used to expire messages (oneways) that have been sitting for too long on the queue.

This is the only legal ACT in the current configuration....

Definition at line 821 of file Transport.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT, TAO_Flushing_Strategy::flush_transport(), TAO_ORB_Core::flushing_strategy(), LM_DEBUG, TAO_Flushing_Strategy::MUST_FLUSH, orb_core(), TAO_Flushing_Strategy::schedule_output(), and TAO_debug_level.

Referenced by TAO_Transport_Timer::handle_timeout().

00823 {
00824   if (TAO_debug_level > 6)
00825     {
00826       ACE_DEBUG ((LM_DEBUG,
00827          ACE_TEXT ("TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, ")
00828          ACE_TEXT ("timer expired\n"),
00829          this->id ()));
00830     }
00831 
00832   /// This is the only legal ACT in the current configuration....
00833   if (act != &this->current_deadline_)
00834     {
00835       return -1;
00836     }
00837 
00838   if (this->flush_timer_pending ())
00839     {
00840       // The timer is always a oneshot timer, so mark is as not
00841       // pending.
00842       this->reset_flush_timer ();
00843 
00844       TAO_Flushing_Strategy *flushing_strategy =
00845         this->orb_core ()->flushing_strategy ();
00846       int const result = flushing_strategy->schedule_output (this);
00847       if (result == TAO_Flushing_Strategy::MUST_FLUSH)
00848         {
00849           typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00850           TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00851           ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00852           if (flushing_strategy->flush_transport (this, 0) == -1) {
00853             return -1;
00854           }
00855         }
00856     }
00857 
00858   return 0;
00859 }

ACE_INLINE void TAO_Transport::id ( size_t  id  ) 

Definition at line 92 of file Transport.inl.

References id_.

00093 {
00094   this->id_ = id;
00095 }

ACE_INLINE size_t TAO_Transport::id ( void   )  const

Set and Get the identifier for this transport instance.

If not set, this will return an integer representation of the this pointer for the instance on which it's called.

Definition at line 86 of file Transport.inl.

References id_.

Referenced by TAO::Transport_Cache_Manager::bind_i(), TAO_Connection_Handler::close_connection_eh(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Transport_Cache_Manager::find_transport(), TAO_Connection_Handler::handle_input_internal(), TAO_Connector::parallel_connect(), TAO_GIOP_Message_Base::process_reply_message(), TAO::Transport_Cache_Manager::purge(), TAO_GIOP_Message_Base::send_close_connection(), TAO_GIOP_Message_Base::send_error(), TAO_Connector::wait_for_connection_completion(), and TAO_Leader_Follower::wait_for_event().

00087 {
00088   return this->id_;
00089 }

bool TAO_Transport::idle_after_reply ( void   ) 

Request is sent and the reply is received. Idle the transport now.

Definition at line 271 of file Transport.cpp.

References TAO_Transport_Mux_Strategy::idle_after_reply(), and tms().

Referenced by TAO::Synch_Twoway_Invocation::remote_twoway().

00272 {
00273   return this->tms ()->idle_after_reply ();
00274 }

bool TAO_Transport::idle_after_send ( void   ) 

Request has been just sent, but the reply is not received. Idle the transport now.

Definition at line 265 of file Transport.cpp.

References TAO_Transport_Mux_Strategy::idle_after_send(), and tms().

Referenced by TAO::Synch_Twoway_Invocation::remote_twoway().

00266 {
00267   return this->tms ()->idle_after_send ();
00268 }

ACE_INLINE bool TAO_Transport::is_connected ( void   )  const

Is this transport really connected.

Definition at line 170 of file Transport.inl.

References ACE_GUARD_RETURN, and is_connected_.

Referenced by TAO::Cache_IntId::Cache_IntId(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Synch_Oneway_Invocation::remote_oneway(), and TAO_Connector::wait_for_connection_completion().

00171 {
00172   ACE_GUARD_RETURN (ACE_Lock,
00173                     ace_mon,
00174                     *this->handler_lock_,
00175                     false);
00176 
00177   return this->is_connected_;
00178 }

ACE_INLINE CORBA::Boolean TAO_Transport::is_tcs_set (  )  const

Return true if the tcs has been set.

Definition at line 158 of file Transport.inl.

References tcs_set_.

00159 {
00160   return tcs_set_;
00161 }

int TAO_Transport::make_idle ( void   ) 

Cache management.

Definition at line 463 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, LM_DEBUG, TAO_debug_level, and transport_cache_manager().

Referenced by TAO_IIOP_Connection_Handler::process_listen_point_list().

00464 {
00465   if (TAO_debug_level > 3)
00466     {
00467       ACE_DEBUG ((LM_DEBUG,
00468                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"),
00469                   this->id ()));
00470     }
00471 
00472   return this->transport_cache_manager ().make_idle (this->cache_map_entry_);
00473 }

void TAO_Transport::messaging_init ( TAO_GIOP_Message_Version const &  version  ) 

Initialising the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects.

Definition at line 2600 of file Transport.cpp.

References TAO_GIOP_Message_Base::init(), TAO_GIOP_Message_Version::major, messaging_object(), and TAO_GIOP_Message_Version::minor.

02601 {
02602   this->messaging_object ()->init (version.major, version.minor);
02603 }

ACE_INLINE TAO_GIOP_Message_Base * TAO_Transport::messaging_object ( void   ) 

Return the messaging object that is used to format the data that needs to be sent.

Definition at line 121 of file Transport.inl.

References messaging_object_.

Referenced by allocate_partial_message_block(), handle_input_parse_data(), handle_input_parse_extra_messages(), messaging_init(), and out_stream().

00122 {
00123   return this->messaging_object_;
00124 }

int TAO_Transport::notify_reactor ( void   )  [private]

Definition at line 2508 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, event_handler_i(), LM_DEBUG, ACE_Reactor::notify(), orb_core(), TAO_ORB_Core::reactor(), ACE_Event_Handler::READ_MASK, and TAO_debug_level.

Referenced by handle_input_parse_data(), and process_queue_head().

02509 {
02510   if (!this->ws_->is_registered ())
02511     {
02512       return 0;
02513     }
02514 
02515   ACE_Event_Handler *eh = this->event_handler_i ();
02516 
02517   // Get the reactor associated with the event handler
02518   ACE_Reactor *reactor = this->orb_core ()->reactor ();
02519 
02520   if (TAO_debug_level > 0)
02521     {
02522       ACE_DEBUG ((LM_DEBUG,
02523          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02524          ACE_TEXT ("notify to Reactor\n"),
02525          this->id ()));
02526     }
02527 
02528 
02529   // Send a notification to the reactor...
02530   int const retval = reactor->notify (eh, ACE_Event_Handler::READ_MASK);
02531 
02532   if (retval < 0 && TAO_debug_level > 2)
02533     {
02534       // @todo: need to think about what is the action that
02535       // we can take when we get here.
02536       ACE_DEBUG ((LM_DEBUG,
02537          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02538          ACE_TEXT ("notify to the reactor failed..\n"),
02539          this->id ()));
02540     }
02541 
02542   return 1;
02543 }

ACE_INLINE void TAO_Transport::opened_as ( TAO::Connection_Role   ) 

Definition at line 51 of file Transport.inl.

References opening_connection_role_.

00052 {
00053   this->opening_connection_role_ = role;
00054 }

ACE_INLINE TAO::Connection_Role TAO_Transport::opened_as ( void   )  const

Methods dealing with the role of the connection, e.g., CLIENT or SERVER. See CORBA 2.6 Specification, Section 15.5.1 for origin of definitions.

Definition at line 45 of file Transport.inl.

References opening_connection_role_.

Referenced by TAO_Connector::connect(), and TAO_IIOP_Connection_Handler::open().

00046 {
00047   return this->opening_connection_role_;
00048 }

void TAO_Transport::operator= ( const TAO_Transport  )  [private]

ACE_INLINE TAO_ORB_Core * TAO_Transport::orb_core ( void   )  const

Access the ORB that owns this connection.

Definition at line 14 of file Transport.inl.

References orb_core_.

Referenced by TAO::Wait_On_LF_No_Upcall::can_process_upcalls(), TAO_Connection_Handler::close_connection_eh(), drain_queue(), TAO::Transport_Cache_Manager::find_transport(), TAO_Reactive_Flushing_Strategy::flush_message(), TAO_Leader_Follower_Flushing_Strategy::flush_message(), TAO_Reactive_Flushing_Strategy::flush_transport(), TAO_Leader_Follower_Flushing_Strategy::flush_transport(), generate_request_header(), handle_timeout(), notify_reactor(), send_asynchronous_message_i(), send_reply_message_i(), TAO_IIOP_Transport::send_request(), send_synchronous_message_i(), TAO_IIOP_Transport::set_bidir_context_info(), TAO_Transport(), TAO_Wait_On_Read::wait(), TAO_Wait_On_Reactor::wait(), and TAO_Wait_On_Leader_Follower::wait().

00015 {
00016   return this->orb_core_;
00017 }

TAO_OutputCDR & TAO_Transport::out_stream ( void   ) 

Accessor for the output CDR stream.

Definition at line 2594 of file Transport.cpp.

References messaging_object(), and TAO_GIOP_Message_Base::out_stream().

Referenced by TAO::LocateRequest_Invocation::invoke(), TAO::Synch_Oneway_Invocation::remote_oneway(), and TAO::Synch_Twoway_Invocation::remote_twoway().

02595 {
02596   return this->messaging_object ()->out_stream ();
02597 }

bool TAO_Transport::post_connect_hook ( void   )  [virtual]

Hooks that can be overridden in concrete transports.

These hooks are invoked just after connection establishment (or after a connection is fetched from cache). The return value signifies whether the invoker should proceed with post connection establishment activities. Protocols like SSLIOP need this to verify whether connections already established have valid certificates. There are no pre_connect_hooks () since the transport doesn't exist before a connection establishment. :-)

Note:
The methods are not made const with a reason.

Definition at line 323 of file Transport.cpp.

Referenced by TAO_Connector::connect().

00324 {
00325   return true;
00326 }

bool TAO_Transport::post_open ( size_t  id  ) 

Perform all the actions when this transport get opened.

Definition at line 2617 of file Transport.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT, id_, is_connected_, LM_ERROR, purge_entry(), and TAO_debug_level.

02618 {
02619   this->id_ = id;
02620 
02621   {
02622     ACE_GUARD_RETURN (ACE_Lock,
02623                       ace_mon,
02624                       *this->handler_lock_,
02625                       false);
02626     this->is_connected_ = true;
02627   }
02628 
02629   // When we have data in our outgoing queue schedule ourselves
02630   // for output
02631   if (this->queue_is_empty_i ())
02632     return true;
02633 
02634   // If the wait strategy wants us to be registered with the reactor
02635   // then we do so. If registeration is required and it succeeds,
02636   // #REFCOUNT# becomes two.
02637   if (this->wait_strategy ()->register_handler () != 0)
02638     {
02639       // Registration failures.
02640 
02641       // Purge from the connection cache, if we are not in the cache, this
02642       // just does nothing.
02643       (void) this->purge_entry ();
02644 
02645       // Close the handler.
02646       (void) this->close_connection ();
02647 
02648       if (TAO_debug_level > 0)
02649         ACE_ERROR ((LM_ERROR,
02650            ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_connect , ")
02651            ACE_TEXT ("could not register the transport ")
02652            ACE_TEXT ("in the reactor.\n"),
02653            this->id ()));
02654 
02655       return false;
02656     }
02657 
02658   return true;
02659 }

void TAO_Transport::pre_close ( void   ) 

do what needs to be done when closing the transport

Definition at line 2606 of file Transport.cpp.

References ACE_GUARD, cleanup_queue_i(), is_connected_, and purge_entry().

Referenced by TAO_Connection_Handler::close_connection_eh().

02607 {
02608   this->is_connected_ = false;
02609   this->purge_entry ();
02610   {
02611     ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
02612     this->cleanup_queue_i ();
02613   }
02614 }

int TAO_Transport::process_parsed_messages ( TAO_Queued_Data qd,
TAO_Resume_Handle rh 
) [protected]

Process the message by sending it to the higher layers of the ORB.

Definition at line 2321 of file Transport.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT, ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO::Transport::Stats::messages_received(), TAO_Queued_Data::missing_data(), TAO_Queued_Data::msg_block(), TAO_Queued_Data::msg_type(), TAO_Resume_Handle::resume_handle(), stats_, and TAO_debug_level.

02323 {
02324   if (TAO_debug_level > 7)
02325     {
02326       ACE_DEBUG ((LM_DEBUG,
02327          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02328          ACE_TEXT ("entering (missing data == %d)\n"),
02329          this->id(), qd->missing_data ()));
02330     }
02331 
02332 #if TAO_HAS_TRANSPORT_CURRENT == 1
02333   // Update stats, if any
02334   if (this->stats_ != 0)
02335     this->stats_->messages_received (qd->msg_block ()->length ());
02336 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
02337 
02338   switch (qd->msg_type ())
02339   {
02340     case GIOP::CloseConnection:
02341     {
02342       if (TAO_debug_level > 0)
02343         ACE_DEBUG ((LM_DEBUG,
02344            ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02345            ACE_TEXT ("received CloseConnection message - %m\n"),
02346            this->id()));
02347 
02348       // Return a "-1" so that the next stage can take care of
02349       // closing connection and the necessary memory management.
02350       return -1;
02351     }
02352     break;
02353     case GIOP::Request:
02354     case GIOP::LocateRequest:
02355     {
02356       // Let us resume the handle before we go ahead to process the
02357       // request. This will open up the handle for other threads.
02358       rh.resume_handle ();
02359 
02360       if (this->messaging_object ()->process_request_message (
02361             this,
02362             qd) == -1)
02363         {
02364           // Return a "-1" so that the next stage can take care of
02365           // closing connection and the necessary memory management.
02366           return -1;
02367         }
02368     }
02369     break;
02370     case GIOP::Reply:
02371     case GIOP::LocateReply:
02372     {
02373       rh.resume_handle ();
02374 
02375       TAO_Pluggable_Reply_Params params (this);
02376 
02377       if (this->messaging_object ()->process_reply_message (params, qd) == -1)
02378         {
02379           if (TAO_debug_level > 0)
02380             ACE_DEBUG ((LM_DEBUG,
02381                ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02382                ACE_TEXT ("error in process_reply_message - %m\n"),
02383                this->id ()));
02384 
02385           return -1;
02386         }
02387 
02388     }
02389     break;
02390     case GIOP::CancelRequest:
02391     {
02392       // The associated request might be incomplete residing
02393       // fragmented in messaging object. We must make sure the
02394       // resources allocated by fragments are released.
02395       if (this->messaging_object ()->discard_fragmented_message (qd) == -1)
02396         {
02397           if (TAO_debug_level > 0)
02398             {
02399               ACE_ERROR ((LM_ERROR,
02400                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02401                  ACE_TEXT ("error processing CancelRequest\n"),
02402                  this->id ()));
02403             }
02404         }
02405 
02406       // We are not able to cancel requests being processed already;
02407       // this is declared as optional feature by CORBA, and TAO does
02408       // not support this currently.
02409 
02410       // Just continue processing, CancelRequest does not mean to cut
02411       // off the connection.
02412     }
02413     break;
02414     case GIOP::MessageError:
02415     {
02416       if (TAO_debug_level > 0)
02417         {
02418           ACE_ERROR ((LM_ERROR,
02419              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02420              ACE_TEXT ("received MessageError, closing connection\n"),
02421              this->id ()));
02422         }
02423       return -1;
02424     }
02425     break;
02426     case GIOP::Fragment:
02427     {
02428       // Nothing to be done.
02429     }
02430     break;
02431   }
02432 
02433   // If not, just return back..
02434   return 0;
02435 }

int TAO_Transport::process_queue_head ( TAO_Resume_Handle rh  )  [private]

Definition at line 2438 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, incoming_message_queue_, LM_DEBUG, notify_reactor(), TAO_Incoming_Message_Queue::queue_length(), TAO_Queued_Data::release(), TAO_Resume_Handle::set_flag(), TAO_debug_level, TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED, and TAO_Resume_Handle::TAO_HANDLE_RESUMABLE.

Referenced by handle_input().

02439 {
02440   if (TAO_debug_level > 3)
02441     {
02442       ACE_DEBUG ((LM_DEBUG,
02443          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"),
02444          this->id (), this->incoming_message_queue_.queue_length () ));
02445     }
02446 
02447   // See if  message in queue ...
02448   if (this->incoming_message_queue_.queue_length () > 0)
02449     {
02450       // Get the message on the head of the queue..
02451       TAO_Queued_Data *qd =
02452         this->incoming_message_queue_.dequeue_head ();
02453 
02454       if (TAO_debug_level > 3)
02455         {
02456           ACE_DEBUG ((LM_DEBUG,
02457              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02458              ACE_TEXT ("the size of the queue is [%d]\n"),
02459              this->id (),
02460              this->incoming_message_queue_.queue_length()));
02461         }
02462       // Now that we have pulled out out one message out of the queue,
02463       // check whether we have one more message in the queue...
02464       if (this->incoming_message_queue_.queue_length () > 0)
02465         {
02466           if (TAO_debug_level > 0)
02467             {
02468               ACE_DEBUG ((LM_DEBUG,
02469                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02470                  ACE_TEXT ("notify reactor\n"),
02471                  this->id ()));
02472             }
02473 
02474           int const retval = this->notify_reactor ();
02475 
02476           if (retval == 1)
02477             {
02478               // Let the class know that it doesn't need to resume  the
02479               // handle..
02480               rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02481             }
02482           else if (retval < 0)
02483             return -1;
02484         }
02485       else
02486         {
02487           // As we are ready to process the last message just resume
02488           // the handle. Set the flag incase someone had reset the flag..
02489           rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02490         }
02491 
02492       // Process the message...
02493       if (this->process_parsed_messages (qd, rh) == -1)
02494         {
02495           return -1;
02496         }
02497 
02498       // Delete the Queued_Data..
02499       TAO_Queued_Data::release (qd);
02500 
02501       return 0;
02502     }
02503 
02504   return 1;
02505 }

bool TAO_Transport::provide_blockable_handler ( TAO::Connection_Handler_Set handlers  ) 

Called by the cache when the ORB is shuting down.

Parameters:
handlers The TAO_Connection_Handler_Set into which the transport should place its handler if the transport has RW strategy on.
Returns:
true indicates a handler was added to the handler set. false indocates that the transport did not have a blockable handler that could be added.

Definition at line 251 of file Transport.cpp.

References opening_connection_role_, and TAO::TAO_SERVER_ROLE.

00252 {
00253   if (this->ws_->non_blocking () ||
00254       this->opening_connection_role_ == TAO::TAO_SERVER_ROLE)
00255     return false;
00256 
00257   (void) this->add_reference ();
00258 
00259   h.insert (this->connection_handler_i ());
00260 
00261   return true;
00262 }

void TAO_Transport::provide_handler ( TAO::Connection_Handler_Set handlers  ) 

Added event handler to the handlers set.

Called by the cache when the cache is closing.

Parameters:
handlers The TAO_Connection_Handler_Set into which the transport should place its handler

Definition at line 243 of file Transport.cpp.

00244 {
00245   (void) this->add_reference ();
00246 
00247   handlers.insert (this->connection_handler_i ());
00248 }

int TAO_Transport::purge_entry ( void   ) 

Cache management.

Definition at line 457 of file Transport.cpp.

References transport_cache_manager().

Referenced by TAO_Connection_Handler::close_handler(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), post_open(), pre_close(), recache_transport(), and ~TAO_Transport().

00458 {
00459   return this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
00460 }

ACE_INLINE void TAO_Transport::purging_order ( unsigned long  value  ) 

Definition at line 76 of file Transport.inl.

References purging_order_.

00077 {
00078   // This should only be called by the Transport Cache Manager when
00079   // it is holding it's lock.
00080   // The transport should still be here since the cache manager still
00081   // has a reference to it.
00082   this->purging_order_ = value;
00083 }

ACE_INLINE unsigned long TAO_Transport::purging_order ( void   )  const

Get and Set the purging order. The purging strategy uses the set version to set the purging order.

Definition at line 70 of file Transport.inl.

References purging_order_.

Referenced by TAO_LRU_Connection_Purging_Strategy::update_item().

00071 {
00072   return this->purging_order_;
00073 }

ACE_INLINE bool TAO_Transport::queue_is_empty ( void   ) 

Check if there are messages pending in the queue.

Returns:
true if the queue is empty

Definition at line 98 of file Transport.inl.

References ACE_GUARD_RETURN, and queue_is_empty_i().

Referenced by TAO_Reactive_Flushing_Strategy::flush_transport(), TAO_Leader_Follower_Flushing_Strategy::flush_transport(), and TAO_Block_Flushing_Strategy::flush_transport().

00099 {
00100   ACE_GUARD_RETURN (ACE_Lock,
00101                     ace_mon,
00102                     *this->handler_lock_,
00103                     false);
00104   return this->queue_is_empty_i ();
00105 }

bool TAO_Transport::queue_is_empty_i ( void   )  [private]

Check if there are messages pending in the queue.

This version assumes that the lock is already held. Use with care!

Returns:
true if the queue is empty

Definition at line 755 of file Transport.cpp.

Referenced by queue_is_empty().

00756 {
00757   return (this->head_ == 0);
00758 }

int TAO_Transport::queue_message_i ( const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time,
bool  back = true 
) [protected]

Queue a message for message_block

Parameters:
max_wait_time The maximum time that the operation can block, used in the implementation of timeouts.
back If true, the message will be pushed to the back of the queue. If false, the message will be pushed to the front of the queue.

Definition at line 1527 of file Transport.cpp.

References ACE_NEW_RETURN, TAO_Queued_Message::push_back(), and TAO_Queued_Message::push_front().

Referenced by format_queue_message().

01529 {
01530   TAO_Queued_Message *queued_message = 0;
01531   ACE_NEW_RETURN (queued_message,
01532                   TAO_Asynch_Queued_Message (message_block,
01533                                              this->orb_core_,
01534                                              max_wait_time,
01535                                              0,
01536                                              true),
01537                   -1);
01538   if (back) {
01539     queued_message->push_back (this->head_, this->tail_);
01540   }
01541   else {
01542     queued_message->push_front (this->head_, this->tail_);
01543   }
01544 
01545   return 0;
01546 }

int TAO_Transport::recache_transport ( TAO_Transport_Descriptor_Interface desc  ) 

Recache ourselves in the cache.

Todo:
Ideally the following should be inline.

purge_entry has a return value, use it

Definition at line 447 of file Transport.cpp.

References purge_entry(), and transport_cache_manager().

00448 {
00449   // First purge our entry
00450   this->purge_entry ();
00451 
00452   // Then add ourselves to the cache
00453   return this->transport_cache_manager ().cache_transport (desc, this);
00454 }

virtual ssize_t TAO_Transport::recv ( char *  buffer,
size_t  len,
const ACE_Time_Value timeout = 0 
) [pure virtual]

Read len bytes from into buf.

This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.

Parameters:
buffer ORB allocated buffer where the data should be @ The ACE_Time_Value *s is just a place holder for now. It is not clear this this is the best place to specify this. The actual timeout values will be kept in the Policies.

Implemented in TAO_IIOP_Transport.

Referenced by handle_input_missing_data(), and handle_input_parse_data().

ACE_INLINE size_t TAO_Transport::recv_buffer_size ( void   )  const

Accessor to recv_buffer_size_.

Definition at line 187 of file Transport.inl.

References recv_buffer_size_.

00188 {
00189   return this->recv_buffer_size_;
00190 }

int TAO_Transport::register_handler ( void   )  [virtual]

Register the handler with the reactor.

Register the handler with the reactor. This method is used by the Wait_On_Reactor strategy. The transport must register its event handler with the ORB's Reactor.

Todo:
I think this method is pretty much useless, the connections are *always* registered with the Reactor, except in thread-per-connection mode. In that case putting the connection in the Reactor would produce unpredictable results anyway.

Definition at line 335 of file Transport.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT, TAO_Wait_Strategy::is_registered(), LM_DEBUG, orb_core_, TAO_ORB_Core::reactor(), ACE_Event_Handler::READ_MASK, ACE_Reactor::register_handler(), TAO_debug_level, and ws_.

Referenced by TAO_Wait_On_Reactor::register_handler(), TAO_Wait_On_Leader_Follower::register_handler(), and TAO_Wait_On_Leader_Follower::sending_request().

00336 {
00337   if (TAO_debug_level > 4)
00338     {
00339       ACE_DEBUG ((LM_DEBUG,
00340                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"),
00341                   this->id ()));
00342     }
00343 
00344   ACE_Reactor * const r = this->orb_core_->reactor ();
00345 
00346   // @@note: This should be okay since the register handler call will
00347   // not make a nested call into the transport.
00348   ACE_GUARD_RETURN (ACE_Lock,
00349                     ace_mon,
00350                     *this->handler_lock_,
00351                     false);
00352 
00353   if (r == this->event_handler_i ()->reactor ())
00354     {
00355       return 0;
00356     }
00357 
00358   // Set the flag in the Connection Handler and in the Wait Strategy
00359   // @@Maybe we should set these flags after registering with the
00360   // reactor. What if the  registration fails???
00361   this->ws_->is_registered (true);
00362 
00363   // Register the handler with the reactor
00364   return r->register_handler (this->event_handler_i (),
00365                               ACE_Event_Handler::READ_MASK);
00366 }

ACE_Event_Handler::Reference_Count TAO_Transport::remove_reference ( void   ) 

Initialising the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects.

Definition at line 2588 of file Transport.cpp.

References event_handler_i(), and ACE_Event_Handler::remove_reference().

Referenced by TAO_Connection_Handler::cancel_pending_connection(), TAO_Connection_Handler::close_handler(), TAO::Cache_IntId::operator=(), TAO_Asynch_Reply_Dispatcher_Base::transport(), TAO::Cache_IntId::~Cache_IntId(), TAO_Asynch_Reply_Dispatcher_Base::~TAO_Asynch_Reply_Dispatcher_Base(), and TAO_Thread_Per_Connection_Handler::~TAO_Thread_Per_Connection_Handler().

02589 {
02590   return this->event_handler_i ()->remove_reference ();
02591 }

void TAO_Transport::report_invalid_event_handler ( const char *  caller  )  [private]

Print out error messages if the event handler is not valid.

Definition at line 1211 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, ACE_TEXT_CHAR_TO_TCHAR, LM_DEBUG, tag_, and TAO_debug_level.

01212 {
01213   if (TAO_debug_level > 0)
01214     {
01215       ACE_DEBUG ((LM_DEBUG,
01216          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler")
01217          ACE_TEXT ("(%s) no longer associated with handler [tag=%d]\n"),
01218          this->id (), ACE_TEXT_CHAR_TO_TCHAR (caller), this->tag_));
01219     }
01220 }

ACE_INLINE void TAO_Transport::reset_flush_timer ( void   )  [private]

The flush timer expired or was explicitly cancelled, mark it as not pending

Definition at line 114 of file Transport.inl.

References current_deadline_, flush_timer_id_, and ACE_Time_Value::zero.

Referenced by drain_queue_i().

00115 {
00116   this->flush_timer_id_ = -1;
00117   this->current_deadline_ = ACE_Time_Value::zero;
00118 }

int TAO_Transport::schedule_output_i ( void   )  [private]

Schedule handle_output() callbacks.

Definition at line 762 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, event_handler_i(), ACE_Reactor::find_handler(), ACE_Event_Handler::get_handle(), LM_DEBUG, ACE_Event_Handler::reactor(), ACE_Event_Handler::remove_reference(), ACE_Reactor::schedule_wakeup(), TAO_debug_level, and ACE_Event_Handler::WRITE_MASK.

00763 {
00764   ACE_Event_Handler * const eh = this->event_handler_i ();
00765   ACE_Reactor * const reactor = eh->reactor ();
00766 
00767   if (reactor == 0)
00768      return -1;
00769 
00770   // Check to see if our event handler is still registered with the
00771   // reactor.  It's possible for another thread to have run close_connection()
00772   // since we last used the event handler.
00773   ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ());
00774   if (found)
00775     {
00776       found->remove_reference ();
00777 
00778       if (found != eh)
00779         {
00780           if (TAO_debug_level > 3)
00781             {
00782               ACE_DEBUG ((LM_DEBUG,
00783                           ACE_TEXT ("TAO (%P|%t) - ")
00784                           ACE_TEXT ("Transport[%d]::schedule_output_i ")
00785                           ACE_TEXT ("event handler not found in reactor,")
00786                           ACE_TEXT ("returning -1\n"),
00787                           this->id ()));
00788             }
00789 
00790           return -1;
00791         }
00792     }
00793 
00794   if (TAO_debug_level > 3)
00795     {
00796       ACE_DEBUG ((LM_DEBUG,
00797          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"),
00798          this->id ()));
00799     }
00800 
00801   return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00802 }

virtual ssize_t TAO_Transport::send ( iovec *  iov,
int  iovcnt,
size_t &  bytes_transferred,
const ACE_Time_Value timeout = 0 
) [pure virtual]

Write the complete Message_Block chain to the connection.

This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.

Often the implementation simply forwards the arguments to the underlying ACE_Svc_Handler class. Using the code factored out into ACE.

Be careful with protocols that perform non-trivial transformations of the data, such as SSLIOP or protocols that compress the stream.

Parameters:
iov contains the data that must be sent.
timeout is the maximum time that the application is willing to wait for the data to be sent, useful in platforms that implement timed writes. The timeout value is obtained from the policies set by the application.
bytes_transferred should return the total number of bytes successfully transferred before the connection blocked. This is required because in some platforms and/or protocols multiple system calls may be required to send the chain of message blocks. The first few calls can work successfully, but the final one can fail or signal a flow control situation (via EAGAIN). In this case the ORB expects the function to return -1, errno to be appropriately set and this argument to return the number of bytes already on the OS I/O subsystem.
This call can also fail if the transport instance is no longer associated with a connection (e.g., the connection handler closed down). In that case, it returns -1 and sets errno to ENOENT.

Implemented in TAO_IIOP_Transport.

Referenced by drain_queue_helper().

int TAO_Transport::send_asynchronous_message_i ( TAO_Stub stub,
const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time 
) [private]

Send an asynchronous message, i.e. do not block until the message is on the wire

Definition at line 1279 of file Transport.cpp.

References CORBA::SystemException::_tao_minor_code(), ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT, check_buffering_constraints_i(), CORBA::COMPLETED_NO, ACE_Message_Block::cont(), TAO_Flushing_Strategy::flush_transport(), TAO_ORB_Core::flushing_strategy(), head_, ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO_Flushing_Strategy::MUST_FLUSH, orb_core(), TAO_Flushing_Strategy::schedule_output(), send_message_block_chain_i(), sent_byte_count_, TAO_debug_level, TAO_TIMEOUT_SEND_MINOR_CODE, ACE_Message_Block::total_length(), and TAO_Stub::transport_queueing_strategy().

Referenced by send_message_shared_i().

01282 {
01283   // Let's figure out if the message should be queued without trying
01284   // to send first:
01285   bool try_sending_first = true;
01286 
01287   bool const queue_empty = (this->head_ == 0);
01288 
01289   TAO::Transport_Queueing_Strategy *queue_strategy =
01290     stub->transport_queueing_strategy ();
01291 
01292   if (!queue_empty)
01293     {
01294       try_sending_first = false;
01295     }
01296   else if (queue_strategy)
01297     {
01298       if (queue_strategy->must_queue (queue_empty))
01299         {
01300           try_sending_first = false;
01301         }
01302     }
01303 
01304   bool partially_sent = false;
01305   bool timeout_encountered = false;
01306 
01307   if (try_sending_first)
01308     {
01309       ssize_t n = 0;
01310       size_t byte_count = 0;
01311       // ... in this case we must try to send the message first ...
01312 
01313       size_t const total_length = message_block->total_length ();
01314 
01315       if (TAO_debug_level > 6)
01316         {
01317           ACE_DEBUG ((LM_DEBUG,
01318              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01319              ACE_TEXT ("trying to send the message (ml = %d)\n"),
01320              this->id (), total_length));
01321         }
01322 
01323       // @@ I don't think we want to hold the mutex here, however if
01324       // we release it we need to recheck the status of the transport
01325       // after we return... once I understand the final form for this
01326       // code I will re-visit this decision
01327       n = this->send_message_block_chain_i (message_block,
01328                                             byte_count,
01329                                             max_wait_time);
01330 
01331       if (n == -1)
01332         {
01333           // ... if this is just an EWOULDBLOCK we must schedule the
01334           // message for later, if it is ETIME we still have to send
01335           // the complete message, because cutting off the message at
01336           // this point will destroy the synchronization with the
01337           // server ...
01338           if (errno != EWOULDBLOCK && errno != ETIME)
01339             {
01340               if (TAO_debug_level > 0)
01341                 {
01342                   ACE_ERROR ((LM_ERROR,
01343                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01344                      ACE_TEXT ("fatal error in ")
01345                      ACE_TEXT ("send_message_block_chain_i - %m\n"),
01346                      this->id ()));
01347                 }
01348               return -1;
01349             }
01350         }
01351 
01352       // ... let's figure out if the complete message was sent ...
01353       if (total_length == byte_count)
01354         {
01355           // Done, just return.  Notice that there are no allocations
01356           // or copies up to this point (though some fancy calling
01357           // back and forth).
01358           // This is the common case for the critical path, it should
01359           // be fast.
01360           return 0;
01361         }
01362 
01363       if (byte_count > 0)
01364       {
01365         partially_sent = true;
01366       }
01367 
01368       // If it was partially sent, then push to front of queue and don't flush
01369       if (errno == ETIME)
01370       {
01371         timeout_encountered = true;
01372         if (byte_count == 0)
01373         {
01374           //This request has timed out and none of it was sent to the transport
01375           //We can't return -1 here, since that would end up closing the tranpsort
01376           if (TAO_debug_level > 2)
01377             {
01378               ACE_DEBUG ((LM_DEBUG,
01379                           ACE_TEXT ("TAO (%P|%t) - ")
01380                           ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ")
01381                           ACE_TEXT ("timeout encountered before any bytes sent\n"),
01382                           this->id ()));
01383             }
01384           throw ::CORBA::TIMEOUT (
01385             CORBA::SystemException::_tao_minor_code (
01386               TAO_TIMEOUT_SEND_MINOR_CODE,
01387               ETIME),
01388             CORBA::COMPLETED_NO);
01389         }
01390       }
01391 
01392       if (TAO_debug_level > 6)
01393         {
01394           ACE_DEBUG ((LM_DEBUG,
01395              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01396              ACE_TEXT ("partial send %d / %d bytes\n"),
01397              this->id (), byte_count, total_length));
01398         }
01399 
01400       // ... part of the data was sent, need to figure out what piece
01401       // of the message block chain must be queued ...
01402       while (message_block != 0 && message_block->length () == 0)
01403         {
01404           message_block = message_block->cont ();
01405         }
01406 
01407       // ... at least some portion of the message block chain should
01408       // remain ...
01409     }
01410 
01411   // ... either the message must be queued or we need to queue it
01412   // because it was not completely sent out ...
01413 
01414   ACE_Time_Value *wait_time = (partially_sent ? 0: max_wait_time);
01415   if (this->queue_message_i (message_block, wait_time, !partially_sent)
01416       == -1)
01417     {
01418       if (TAO_debug_level > 0)
01419         {
01420           ACE_DEBUG ((LM_DEBUG,
01421                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01422                       ACE_TEXT ("send_asynchronous_message_i, ")
01423                       ACE_TEXT ("cannot queue message for  - %m\n"),
01424                       this->id ()));
01425         }
01426       return -1;
01427     }
01428 
01429   if (TAO_debug_level > 6)
01430     {
01431       ACE_DEBUG ((LM_DEBUG,
01432          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01433          ACE_TEXT ("message is queued\n"),
01434          this->id ()));
01435     }
01436 
01437   if (timeout_encountered && partially_sent)
01438     {
01439       //Must close down the transport here since we can't guarantee the
01440       //integrity of the GIOP stream (the next send may try to write to
01441       //the socket before looking at the queue).
01442       if (TAO_debug_level > 0)
01443         {
01444           ACE_DEBUG ((LM_DEBUG,
01445                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01446                       ACE_TEXT ("send_asynchronous_message_i, ")
01447                       ACE_TEXT ("timeout after partial send, closing.\n"),
01448                       this->id ()));
01449         }
01450       return -1;
01451     }
01452   else if (!timeout_encountered)
01453     {
01454       // We can't flush if we have already encountered a timeout
01455       // ... if the queue is full we need to activate the output on the
01456       // queue ...
01457       bool must_flush = false;
01458       const bool constraints_reached =
01459         this->check_buffering_constraints_i (stub,
01460                                              must_flush);
01461 
01462       // ... but we also want to activate it if the message was partially
01463       // sent.... Plus, when we use the blocking flushing strategy the
01464       // queue is flushed as a side-effect of 'schedule_output()'
01465 
01466       TAO_Flushing_Strategy *flushing_strategy =
01467         this->orb_core ()->flushing_strategy ();
01468 
01469       if (constraints_reached || try_sending_first)
01470         {
01471           int const result = flushing_strategy->schedule_output (this);
01472           if (result == TAO_Flushing_Strategy::MUST_FLUSH)
01473             {
01474               must_flush = true;
01475             }
01476         }
01477 
01478       if (must_flush)
01479         {
01480           if (TAO_debug_level > 0)
01481             {
01482               ACE_DEBUG ((LM_DEBUG,
01483                           ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01484                           ACE_TEXT ("send_asynchronous_message_i, ")
01485                           ACE_TEXT ("flushing transport.\n"),
01486                           this->id ()));
01487             }
01488 
01489           size_t sent_byte = sent_byte_count_;
01490           int ret = 0;
01491           {
01492             typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
01493             TAO_REVERSE_LOCK reverse (*this->handler_lock_);
01494             ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
01495             ret = flushing_strategy->flush_transport (this, max_wait_time);
01496           }
01497 
01498           if (ret == -1)
01499             {
01500               if (errno == ETIME)
01501                 {
01502                   if (sent_byte == sent_byte_count_) // if nothing was actually flushed
01503                     {
01504                       //This request has timed out and none of it was sent to the transport
01505                       //We can't return -1 here, since that would end up closing the tranpsort
01506                       if (TAO_debug_level > 2)
01507                         {
01508                           ACE_DEBUG ((LM_DEBUG,
01509                                       ACE_TEXT ("TAO (%P|%t) - ")
01510                                       ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ")
01511                                       ACE_TEXT ("2 timeout encountered before any bytes sent\n"),
01512                                       this->id ()));
01513                         }
01514                       throw ::CORBA::TIMEOUT (CORBA::SystemException::_tao_minor_code
01515                                               (TAO_TIMEOUT_SEND_MINOR_CODE, ETIME),
01516                                               CORBA::COMPLETED_NO);
01517                     }
01518                 }
01519               return -1;
01520             }
01521         }
01522     }
01523   return 0;
01524 }

void TAO_Transport::send_connection_closed_notifications ( void   ) 

Notify all the components inside a Transport when the underlying connection is closed.

Definition at line 1223 of file Transport.cpp.

References ACE_GUARD, TAO_Transport_Mux_Strategy::connection_closed(), send_connection_closed_notifications_i(), and tms().

Referenced by TAO_Connection_Handler::close_connection_eh().

01224 {
01225   {
01226     ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
01227 
01228     this->send_connection_closed_notifications_i ();
01229   }
01230 
01231   this->tms ()->connection_closed ();
01232 }

void TAO_Transport::send_connection_closed_notifications_i ( void   )  [private]

Assume the lock is held.

Definition at line 1235 of file Transport.cpp.

References cleanup_queue_i().

Referenced by send_connection_closed_notifications().

01236 {
01237   this->cleanup_queue_i ();
01238 }

virtual int TAO_Transport::send_message ( TAO_OutputCDR stream,
TAO_Stub stub = 0,
TAO_Message_Semantics  message_semantics = TAO_Transport::TAO_TWOWAY_REQUEST,
ACE_Time_Value max_time_wait = 0 
) [pure virtual]

Once the ORB is prepared to receive a reply (see send_request() above), and all the arguments have been marshaled the CDR stream must be 'formatted', i.e. the message_size field in the GIOP header can finally be set to the proper value.

Referenced by TAO_GIOP_Message_Base::make_send_locate_reply(), TAO_GIOP_Message_Base::process_request(), and TAO_GIOP_Message_Base::send_reply_exception().

int TAO_Transport::send_message_block_chain ( const ACE_Message_Block message_block,
size_t &  bytes_transferred,
ACE_Time_Value max_wait_time = 0 
)

Send a message block chain,.

Definition at line 525 of file Transport.cpp.

References ACE_GUARD_RETURN, and send_message_block_chain_i().

Referenced by TAO_GIOP_Message_Base::send_close_connection(), and TAO_GIOP_Message_Base::send_error().

00528 {
00529   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00530 
00531   return this->send_message_block_chain_i (mb,
00532                                            bytes_transferred,
00533                                            max_wait_time);
00534 }

int TAO_Transport::send_message_block_chain_i ( const ACE_Message_Block message_block,
size_t &  bytes_transferred,
ACE_Time_Value max_wait_time 
)

Send a message block chain, assuming the lock is held.

Definition at line 537 of file Transport.cpp.

References drain_queue_i(), TAO_Synch_Queued_Message::message_length(), TAO_Queued_Message::push_back(), TAO_Queued_Message::remove_from_list(), and ACE_Message_Block::total_length().

Referenced by send_asynchronous_message_i(), and send_message_block_chain().

00540 {
00541   size_t const total_length = mb->total_length ();
00542 
00543   // We are going to block, so there is no need to clone
00544   // the message block.
00545   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00546 
00547   synch_message.push_back (this->head_, this->tail_);
00548 
00549   int const n = this->drain_queue_i (max_wait_time);
00550 
00551   if (n == -1)
00552     {
00553       synch_message.remove_from_list (this->head_, this->tail_);
00554       return -1; // Error while sending...
00555     }
00556   else if (n == 1)
00557     {
00558       bytes_transferred = total_length;
00559       return 1;  // Empty queue, message was sent..
00560     }
00561 
00562   // Remove the temporary message from the queue...
00563   synch_message.remove_from_list (this->head_, this->tail_);
00564 
00565   bytes_transferred = total_length - synch_message.message_length ();
00566 
00567   return 0;
00568 }

int TAO_Transport::send_message_shared ( TAO_Stub stub,
TAO_Message_Semantics  message_semantics,
const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time 
) [virtual]

Sent the contents of message_block.

Parameters:
stub The object reference used for this operation, useful to obtain the current policies.
message_semantics If this is set to TAO_TWO_REQUEST this method will block until the operation is completely written on the wire. If it is set to other values this operation could return.
message_block The CDR encapsulation of the GIOP message that must be sent. The message may consist of multiple Message Blocks chained through the cont() field.
max_wait_time The maximum time that the operation can block, used in the implementation of timeouts.

Definition at line 293 of file Transport.cpp.

References ACE_GUARD_RETURN, close_connection(), and send_message_shared_i().

Referenced by TAO_IIOP_Transport::send_message().

00297 {
00298   int result = 0;
00299 
00300   {
00301     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00302 
00303     result =
00304       this->send_message_shared_i (stub, message_semantics,
00305                                    message_block, max_wait_time);
00306   }
00307 
00308   if (result == -1)
00309     {
00310       // The connection needs to be closed here.
00311       // In the case of a partially written message this is the only way to cleanup
00312       //  the physical connection as well as the Transport. An EOF on the remote end
00313       //  will cancel the partially received message.
00314       this->close_connection ();
00315     }
00316 
00317   return result;
00318 }

int TAO_Transport::send_message_shared_i ( TAO_Stub stub,
TAO_Message_Semantics  message_semantics,
const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time 
) [protected]

Implement send_message_shared() assuming the handler_lock_ is held.

Definition at line 1241 of file Transport.cpp.

References ACE_Message_Block::length(), TAO::Transport::Stats::messages_sent(), send_asynchronous_message_i(), send_reply_message_i(), send_synchronous_message_i(), stats_, TAO_ONEWAY_REQUEST, TAO_REPLY, and TAO_TWOWAY_REQUEST.

Referenced by send_message_shared().

01245 {
01246   int ret = 0;
01247 
01248 #if TAO_HAS_TRANSPORT_CURRENT == 1
01249   size_t const message_length = message_block->length ();
01250 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
01251 
01252   switch (message_semantics)
01253     {
01254       case TAO_Transport::TAO_TWOWAY_REQUEST:
01255         ret = this->send_synchronous_message_i (message_block, max_wait_time);
01256         break;
01257 
01258       case TAO_Transport::TAO_REPLY:
01259         ret = this->send_reply_message_i (message_block, max_wait_time);
01260         break;
01261 
01262       case TAO_Transport::TAO_ONEWAY_REQUEST:
01263         ret = this->send_asynchronous_message_i (stub,
01264                                                  message_block,
01265                                                  max_wait_time);
01266         break;
01267     }
01268 
01269 #if TAO_HAS_TRANSPORT_CURRENT == 1
01270   // "Count" the message, only if no error was encountered.
01271   if (ret != -1 && this->stats_ != 0)
01272     this->stats_->messages_sent (message_length);
01273 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
01274 
01275   return ret;
01276 }

int TAO_Transport::send_reply_message_i ( const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time 
) [private]

Send a reply message, i.e. do not block until the message is on the wire, but just return after adding them to the queue.

Definition at line 667 of file Transport.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT, TAO_Synch_Queued_Message::clone(), TAO_Queued_Message::destroy(), TAO_Flushing_Strategy::flush_message(), TAO_ORB_Core::flushing_strategy(), LM_DEBUG, TAO_Flushing_Strategy::MUST_FLUSH, orb_core(), TAO_Queued_Message::push_back(), TAO_Queued_Message::remove_from_list(), TAO_Flushing_Strategy::schedule_output(), send_synch_message_helper_i(), and TAO_debug_level.

Referenced by send_message_shared_i().

00669 {
00670   // Dont clone now.. We could be sent in one shot!
00671   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00672 
00673   synch_message.push_back (this->head_, this->tail_);
00674 
00675   int const n =
00676     this->send_synch_message_helper_i (synch_message, max_wait_time);
00677 
00678   // What about partially sent messages.
00679   if (n == -1 || n == 1)
00680     {
00681       return n;
00682     }
00683 
00684   if (TAO_debug_level > 3)
00685     {
00686       ACE_DEBUG ((LM_DEBUG,
00687          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ")
00688          ACE_TEXT ("preparing to add to queue before leaving\n"),
00689          this->id ()));
00690     }
00691 
00692   // Till this point we shouldn't have any copying and that is the
00693   // point anyway. Now, remove the node from the list
00694   synch_message.remove_from_list (this->head_, this->tail_);
00695 
00696   // Clone the node that we have.
00697   TAO_Queued_Message *msg =
00698     synch_message.clone (this->orb_core_->transport_message_buffer_allocator ());
00699 
00700   // Stick it in the queue
00701   msg->push_back (this->head_, this->tail_);
00702 
00703   TAO_Flushing_Strategy *flushing_strategy =
00704     this->orb_core ()->flushing_strategy ();
00705 
00706   int const result = flushing_strategy->schedule_output (this);
00707 
00708   if (result == -1)
00709     {
00710       if (TAO_debug_level > 5)
00711         {
00712           ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_"
00713                       "message_i, dequeuing msg due to schedule_output "
00714                       "failure\n", this->id ()));
00715         }
00716       msg->remove_from_list (this->head_, this->tail_);
00717       msg->destroy ();
00718     }
00719   else if (result == TAO_Flushing_Strategy::MUST_FLUSH)
00720     {
00721       typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00722       TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00723       ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00724       (void) flushing_strategy->flush_message (this, msg, 0);
00725     }
00726 
00727   return 1;
00728 }

virtual int TAO_Transport::send_request ( TAO_Stub stub,
TAO_ORB_Core orb_core,
TAO_OutputCDR stream,
TAO_Message_Semantics  message_semantics,
ACE_Time_Value max_time_wait 
) [pure virtual]

Preparing the ORB to receive the reply only once the request is completely sent opens the system to some subtle race conditions: suppose the ORB is running in a multi-threaded configuration, thread A makes a request while thread B is using the Reactor to process all incoming requests. Thread A could be implemented as follows: 1) send the request 2) setup the ORB to receive the reply 3) wait for the request

but in this case thread B may receive the reply between step (1) and (2), and drop it as an invalid or unexpected message. Consequently the correct implementation is: 1) setup the ORB to receive the reply 2) send the request 3) wait for the reply

The following method encapsulates this idiom.

Todo:
This is generic code, it should be factored out into the Transport class.

Referenced by TAO::Remote_Invocation::send_message().

int TAO_Transport::send_synch_message_helper_i ( TAO_Synch_Queued_Message s,
ACE_Time_Value max_wait_time 
) [private]

A helper method used by send_synchronous_message_i() and send_reply_message_i(). Reusable code that could be used by both the methods.

Definition at line 731 of file Transport.cpp.

References TAO_Synch_Queued_Message::all_data_sent(), drain_queue_i(), and TAO_Queued_Message::remove_from_list().

Referenced by send_reply_message_i(), and send_synchronous_message_i().

00733 {
00734   int const n = this->drain_queue_i (max_wait_time);
00735 
00736   if (n == -1)
00737     {
00738       synch_message.remove_from_list (this->head_, this->tail_);
00739       return -1; // Error while sending...
00740     }
00741   else if (n == 1)
00742     {
00743       return 1;  // Empty queue, message was sent..
00744     }
00745 
00746   if (synch_message.all_data_sent ())
00747     {
00748       return 1;
00749     }
00750 
00751   return 0;
00752 }

int TAO_Transport::send_synchronous_message_i ( const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time 
) [private]

Send a synchronous message, i.e. block until the message is on the wire

Definition at line 571 of file Transport.cpp.

References CORBA::SystemException::_tao_minor_code(), ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT, CORBA::COMPLETED_NO, TAO_Flushing_Strategy::flush_message(), TAO_ORB_Core::flushing_strategy(), LM_DEBUG, LM_ERROR, TAO_Synch_Queued_Message::message_length(), orb_core(), TAO_Queued_Message::push_back(), TAO_Queued_Message::remove_from_list(), TAO_Flushing_Strategy::schedule_output(), send_synch_message_helper_i(), TAO_debug_level, TAO_TIMEOUT_SEND_MINOR_CODE, and ACE_Message_Block::total_length().

Referenced by send_message_shared_i().

00573 {
00574   // We are going to block, so there is no need to clone
00575   // the message block.
00576   size_t const total_length = mb->total_length ();
00577   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00578 
00579   synch_message.push_back (this->head_, this->tail_);
00580 
00581   int const result = this->send_synch_message_helper_i (synch_message,
00582                                                         max_wait_time);
00583   if (result == -1 && errno == ETIME)
00584     {
00585       if (total_length == synch_message.message_length ()) //none was sent
00586         {
00587           if (TAO_debug_level > 2)
00588             {
00589               ACE_DEBUG ((LM_DEBUG,
00590                           ACE_TEXT ("TAO (%P|%t) - ")
00591                           ACE_TEXT ("Transport[%d]::send_synchronous_message_i, ")
00592                           ACE_TEXT ("timeout encountered before any bytes sent\n"),
00593                           this->id ()));
00594             }
00595           throw ::CORBA::TIMEOUT (
00596             CORBA::SystemException::_tao_minor_code (
00597               TAO_TIMEOUT_SEND_MINOR_CODE,
00598               ETIME),
00599             CORBA::COMPLETED_NO);
00600         }
00601       else
00602         {
00603           return -1;
00604         }
00605     }
00606   else if(result == -1 || result == 1)
00607     {
00608       return result;
00609     }
00610 
00611   TAO_Flushing_Strategy *flushing_strategy =
00612     this->orb_core ()->flushing_strategy ();
00613   if (flushing_strategy->schedule_output (this) == -1)
00614     {
00615       synch_message.remove_from_list (this->head_, this->tail_);
00616       if (TAO_debug_level > 0)
00617         {
00618           ACE_ERROR ((LM_ERROR,
00619                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
00620                       ACE_TEXT ("send_synchronous_message_i, ")
00621                       ACE_TEXT ("error while scheduling flush - %m\n"),
00622                       this->id ()));
00623         }
00624       return -1;
00625     }
00626 
00627   // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH,
00628   // because we're always going to flush anyway.
00629 
00630   // Release the mutex, other threads may modify the queue as we
00631   // block for a long time writing out data.
00632   int flush_result;
00633   {
00634     typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00635     TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00636     ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00637 
00638     flush_result = flushing_strategy->flush_message (this,
00639                                                      &synch_message,
00640                                                      max_wait_time);
00641   }
00642 
00643   if (flush_result == -1)
00644     {
00645       synch_message.remove_from_list (this->head_, this->tail_);
00646 
00647       // We don't need to do anything special for the timeout case.
00648       // The connection is going to get closed and the Transport destroyed.
00649       // The only thing to do maybe is to empty the queue.
00650 
00651       if (TAO_debug_level > 0)
00652         {
00653           ACE_ERROR ((LM_ERROR,
00654              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ")
00655              ACE_TEXT ("error while sending message - %m\n"),
00656              this->id ()));
00657         }
00658 
00659       return -1;
00660     }
00661 
00662   return 1;
00663 }

ACE_INLINE size_t TAO_Transport::sent_byte_count ( void   )  const

Accessor to sent_byte_count_.

Definition at line 193 of file Transport.inl.

References sent_byte_count_.

00194 {
00195   return this->sent_byte_count_;
00196 }

TAO::Transport::Stats* TAO_Transport::stats ( void   )  const

Transport statistics.

TAO_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE CORBA::ULong TAO_Transport::tag ( void   )  const

Return the protocol tag.

The OMG assigns unique tags (a 32-bit unsigned number) to each protocol. New protocol tags can be obtained free of charge from the OMG, check the documents in corbafwd.h for more details.

Definition at line 8 of file Transport.inl.

References tag_.

00009 {
00010   return this->tag_;
00011 }

int TAO_Transport::tear_listen_point_list ( TAO_InputCDR cdr  )  [virtual]

Extracts the list of listen points from the cdr stream. The list would have the protocol specific details of the ListenPoints

Reimplemented in TAO_IIOP_Transport.

Definition at line 287 of file Transport.cpp.

References ACE_NOTSUP_RETURN.

00288 {
00289   ACE_NOTSUP_RETURN (-1);
00290 }

ACE_INLINE TAO_Transport_Mux_Strategy * TAO_Transport::tms ( void   )  const

Get the TAO_Tranport_Mux_Strategy used by this object.

The role of the TAO_Transport_Mux_Strategy is described in more detail in that class' documentation. Enough is to say that the class is used to control how many threads can have pending requests over the same connection. Multiplexing multiple threads over the same connection conserves resources and is almost required for AMI, but having only one pending request per connection is more efficient and reduces the possibilities of priority inversions.

Definition at line 20 of file Transport.inl.

References tms_.

Referenced by idle_after_reply(), idle_after_send(), TAO::LocateRequest_Invocation::invoke(), TAO_GIOP_Message_Base::process_reply_message(), TAO::Synch_Twoway_Invocation::remote_twoway(), and send_connection_closed_notifications().

00021 {
00022   return tms_;
00023 }

TAO::Transport_Cache_Manager & TAO_Transport::transport_cache_manager ( void   )  [private]

Helper method that returns the Transport Cache Manager.

Definition at line 2546 of file Transport.cpp.

References TAO_ORB_Core::lane_resources(), orb_core_, and TAO_Thread_Lane_Resources::transport_cache().

Referenced by make_idle(), purge_entry(), recache_transport(), and update_transport().

02547 {
02548   return this->orb_core_->lane_resources ().transport_cache ();
02549 }

int TAO_Transport::update_transport ( void   ) 

Cache management.

Definition at line 476 of file Transport.cpp.

References transport_cache_manager().

Referenced by TAO_Connection_Handler::svc_i().

00477 {
00478   return this->transport_cache_manager ().update_entry (this->cache_map_entry_);
00479 }

ACE_INLINE TAO_Wait_Strategy * TAO_Transport::wait_strategy ( void   )  const

Return the TAO_Wait_Strategy used by this object.

The role of the TAO_Wait_Strategy is described in more detail in that class' documentation. Enough is to say that the ORB can wait for a reply blocking on read(), using the Reactor to wait for multiple events concurrently or using the Leader/Followers protocol.

Definition at line 27 of file Transport.inl.

References ws_.

Referenced by TAO_Connection_Handler::close_connection_eh(), TAO_IIOP_Connector::complete_connection(), TAO::Transport_Cache_Manager::find_transport(), and TAO::Synch_Twoway_Invocation::wait_for_reply().

00028 {
00029   return this->ws_;
00030 }

ACE_INLINE void TAO_Transport::wchar_translator ( TAO_Codeset_Translator_Base  ) 

CodeSet negotiation - Set the wchar codeset translator factory.

Definition at line 150 of file Transport.inl.

References tcs_set_, and wchar_translator_.

00151 {
00152   this->wchar_translator_ = tf;
00153   this->tcs_set_ = 1;
00154 }

ACE_INLINE TAO_Codeset_Translator_Base * TAO_Transport::wchar_translator ( void   )  const

CodeSet Negotiation - Get the wchar codeset translator factory.

Definition at line 137 of file Transport.inl.

References wchar_translator_.

00138 {
00139   return this->wchar_translator_;
00140 }


Friends And Related Function Documentation

friend class TAO_Leader_Follower_Flushing_Strategy [friend]

Definition at line 812 of file Transport.h.

friend class TAO_Reactive_Flushing_Strategy [friend]

These classes need privileged access to:

Definition at line 811 of file Transport.h.

friend class TAO_Thread_Per_Connection_Handler [friend]

Needs priveleged access to event_handler_i ()

Definition at line 816 of file Transport.h.


Member Data Documentation

int TAO_Transport::bidirectional_flag_ [protected]

Have we sent any info on bidirectional information or have we received any info regarding making the connection served by this transport bidirectional. The flag is used as follows: + We dont want to send the bidirectional context info more than once on the connection. Why? Waste of marshalling and demarshalling time on the client. + On the server side -- once a client that has established the connection asks the server to use the connection both ways, we *dont* want the server to pack service info to the client. That is not allowed. We need a flag to prevent such a things from happening.

The value of this flag will be 0 if the client sends info and 1 if the server receives the info.

Definition at line 967 of file Transport.h.

Referenced by bidirectional_flag().

TAO::Transport_Cache_Manager::HASH_MAP_ENTRY* TAO_Transport::cache_map_entry_ [protected]

Our entry in the cache. We don't own this. It is here for our convenience. We cannot just change things around.

Definition at line 939 of file Transport.h.

Referenced by cache_map_entry().

TAO_Codeset_Translator_Base* TAO_Transport::char_translator_ [private]

Additional member values required to support codeset translation.

@Phil, I think it would be nice if we could think of a way to do the following. We have been trying to use the transport for marking about translator factories and such! IMHO this is a wrong encapulation ie. trying to populate the transport object with these details. We should probably have a class something like TAO_Message_Property or TAO_Message_Translator or whatever (I am sure you get the idea) and encapsulate all these details. Coupling these seems odd. if I have to be more cynical we can move this to the connection_handler and it may more sense with the DSCP stuff around there. Do you agree?

Definition at line 1045 of file Transport.h.

Referenced by assign_translators(), and char_translator().

ACE_Time_Value TAO_Transport::current_deadline_ [protected]

The queue will start draining no later than <queeing_deadline_> if* the deadline is

Definition at line 984 of file Transport.h.

Referenced by check_buffering_constraints_i(), and reset_flush_timer().

CORBA::Boolean TAO_Transport::first_request_ [private]

First_request_ is true until the first request is sent or received. This is necessary since codeset context information is necessary only on the first request. After that, the translators are fixed for the life of the connection.

Definition at line 1057 of file Transport.h.

Referenced by first_request_sent().

long TAO_Transport::flush_timer_id_ [protected]

The timer ID.

Definition at line 987 of file Transport.h.

Referenced by check_buffering_constraints_i(), flush_timer_pending(), and reset_flush_timer().

ACE_Lock* TAO_Transport::handler_lock_ [mutable, protected]

This is an ACE_Lock that gets initialized from TAO_ORB_Core::resource_factory()->create_cached_connection_lock(). This way, one can use a lock appropriate for the type of system, i.e., a null lock for single-threaded systems, and a real lock for multi-threaded systems.

Definition at line 1001 of file Transport.h.

Referenced by ~TAO_Transport().

TAO_Queued_Message* TAO_Transport::head_ [protected]

Implement the outgoing data queue.

Definition at line 972 of file Transport.h.

Referenced by cleanup_queue(), cleanup_queue_i(), drain_queue_helper(), drain_queue_i(), and send_asynchronous_message_i().

size_t TAO_Transport::id_ [protected]

A unique identifier for the transport.

This never *never* changes over the lifespan, so we don't have to worry about locking it.

HINT: Protocol-specific transports that use connection handler might choose to set this to the handle for their connection.

Definition at line 1011 of file Transport.h.

Referenced by id(), and post_open().

TAO_Incoming_Message_Queue TAO_Transport::incoming_message_queue_ [protected]

Queue of the consolidated, incoming messages..

Definition at line 976 of file Transport.h.

Referenced by process_queue_head().

TAO::Incoming_Message_Stack TAO_Transport::incoming_message_stack_ [protected]

Stack of incoming fragments, consolidated messages are going to be enqueued in "incoming_message_queue_"

Definition at line 980 of file Transport.h.

Referenced by handle_input_parse_data(), and handle_input_parse_extra_messages().

bool TAO_Transport::is_connected_ [protected]

Is this transport really connected or not. In case of oneways with SYNC_NONE Policy we don't wait until the connection is ready and we buffer the requests in this transport until the connection is ready

Definition at line 1025 of file Transport.h.

Referenced by is_connected(), post_open(), and pre_close().

TAO_GIOP_Message_Base* TAO_Transport::messaging_object_ [protected]

Our messaging object.

Definition at line 1028 of file Transport.h.

Referenced by TAO_IIOP_Transport::generate_request_header(), messaging_object(), and ~TAO_Transport().

TAO::Connection_Role TAO_Transport::opening_connection_role_ [protected]

Definition at line 969 of file Transport.h.

Referenced by opened_as(), and provide_blockable_handler().

TAO_ORB_Core* const TAO_Transport::orb_core_ [protected]

Global orbcore resource.

Definition at line 935 of file Transport.h.

Referenced by handle_input_parse_data(), orb_core(), TAO_GIOP_Message_Base::process_locate_request(), TAO_GIOP_Message_Base::process_request(), register_handler(), TAO_Transport(), and transport_cache_manager().

ACE_Message_Block* TAO_Transport::partial_message_ [private]

Holds the partial GIOP message (if there is one).

Definition at line 1060 of file Transport.h.

Referenced by handle_input_parse_data().

unsigned long TAO_Transport::purging_order_ [protected]

Used by the LRU, LFU and FIFO Connection Purging Strategies.

Definition at line 1014 of file Transport.h.

Referenced by purging_order().

size_t TAO_Transport::recv_buffer_size_ [protected]

Size of the buffer received.

Definition at line 1017 of file Transport.h.

Referenced by handle_input_missing_data(), handle_input_parse_data(), and recv_buffer_size().

size_t TAO_Transport::sent_byte_count_ [protected]

Number of bytes sent.

Definition at line 1020 of file Transport.h.

Referenced by drain_queue_helper(), drain_queue_i(), send_asynchronous_message_i(), and sent_byte_count().

TAO::Transport::Stats* TAO_Transport::stats_ [private]

Statistics.

Definition at line 1073 of file Transport.h.

Referenced by process_parsed_messages(), send_message_shared_i(), and ~TAO_Transport().

CORBA::ULong const TAO_Transport::tag_ [protected]

IOP protocol tag.

Definition at line 932 of file Transport.h.

Referenced by report_invalid_event_handler(), and tag().

TAO_Queued_Message* TAO_Transport::tail_ [protected]

Definition at line 973 of file Transport.h.

CORBA::Boolean TAO_Transport::tcs_set_ [private]

The tcs_set_ flag indicates that negotiation has occured and so the translators are correct, since a null translator is valid if both ends are using the same codeset, whatever that codeset might be.

Definition at line 1051 of file Transport.h.

Referenced by char_translator(), is_tcs_set(), and wchar_translator().

TAO_Transport_Mux_Strategy* TAO_Transport::tms_ [protected]

Strategy to decide whether multiple requests can be sent over the same connection or the connection is exclusive for a request.

Definition at line 943 of file Transport.h.

Referenced by TAO_Transport(), tms(), and ~TAO_Transport().

TAO_Transport_Timer TAO_Transport::transport_timer_ [protected]

The adapter used to receive timeout callbacks from the Reactor.

Definition at line 990 of file Transport.h.

TAO_Codeset_Translator_Base* TAO_Transport::wchar_translator_ [private]

Definition at line 1046 of file Transport.h.

Referenced by assign_translators(), and wchar_translator().

TAO_Wait_Strategy* TAO_Transport::ws_ [protected]

Strategy for waiting for the reply after sending the request.

Definition at line 946 of file Transport.h.

Referenced by register_handler(), TAO_Transport(), wait_strategy(), and ~TAO_Transport().


The documentation for this class was generated from the following files:
Generated on Tue Feb 2 17:39:48 2010 for TAO by  doxygen 1.4.7