TAO_Transport Class Reference

Generic definitions for the Transport class. More...

#include <Transport.h>

Inheritance diagram for TAO_Transport:

Inheritance graph
[legend]
Collaboration diagram for TAO_Transport:

Collaboration graph
[legend]
List of all members.

Template methods

The Transport class uses the Template Method Pattern to implement the protocol specific functionality. Implementors of a pluggable protocol should override the following methods with the semantics documented below.

enum  { TAO_ONEWAY_REQUEST = 0, TAO_TWOWAY_REQUEST = 1, TAO_REPLY }
virtual ACE_Event_Handlerevent_handler_i (void)=0
bool is_connected (void) const
 Is this transport really connected.

bool post_open (size_t id)
 Perform all the actions when this transport get opened.

TAO_Connection_Handlerconnection_handler (void)
 Get the connection handler for this transport.

TAO_OutputCDRout_stream (void)
 Accessor for the output CDR stream.

int generate_locate_request (TAO_Target_Specification &spec, TAO_Operation_Details &opdetails, TAO_OutputCDR &output)
virtual int generate_request_header (TAO_Operation_Details &opd, TAO_Target_Specification &spec, TAO_OutputCDR &msg)
int recache_transport (TAO_Transport_Descriptor_Interface *desc)
 Recache ourselves in the cache.

virtual int handle_input (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time=0, int block=0)
 Callback to read incoming data.

virtual int send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, int message_semantics, ACE_Time_Value *max_time_wait)=0
virtual int send_message (TAO_OutputCDR &stream, TAO_Stub *stub=0, int message_semantics=TAO_Transport::TAO_TWOWAY_REQUEST, ACE_Time_Value *max_time_wait=0)=0
virtual int send_message_shared (TAO_Stub *stub, int message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
 Sent the contents of message_block.

int format_queue_message (TAO_OutputCDR &stream)
 Format and queue a message for stream.

int send_message_block_chain (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time=0)
 Send a message block chain,.

int send_message_block_chain_i (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time)
 Send a message block chain, assuming the lock is held.

int purge_entry (void)
 Cache management.

int make_idle (void)
 Cache management.

int update_transport (void)
 Cache management.

int handle_timeout (const ACE_Time_Value &current_time, const void *act)
size_t recv_buffer_size (void) const
 Accessor to recv_buffer_size_.

size_t sent_byte_count (void) const
 Accessor to sent_byte_count_.

TAO_Codeset_Translator_Basechar_translator (void) const
 CodeSet Negotiation - Get the char codeset translator factory.

TAO_Codeset_Translator_Basewchar_translator (void) const
 CodeSet Negotiation - Get the wchar codeset translator factory.

void char_translator (TAO_Codeset_Translator_Base *)
 CodeSet negotiation - Set the char codeset translator factory.

void wchar_translator (TAO_Codeset_Translator_Base *)
 CodeSet negotiation - Set the wchar codeset translator factory.

void assign_translators (TAO_InputCDR *, TAO_OutputCDR *)
void clear_translators (TAO_InputCDR *, TAO_OutputCDR *)
CORBA::Boolean is_tcs_set () const
 Return true if the tcs has been set.

void first_request_sent ()
 Set the state of the first_request_ flag to 0.

void send_connection_closed_notifications (void)
virtual TAO_Connection_Handlerconnection_handler_i (void)=0
int process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh)
int send_message_shared_i (TAO_Stub *stub, int message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
int queue_message_i (const ACE_Message_Block *message_block)
 Queue a message for message_block.

CORBA::ULong const  tag_
 IOP protocol tag.

TAO_ORB_Core *const  orb_core_
 Global orbcore resource.

TAO::Transport_Cache_Manager::HASH_MAP_ENTRYcache_map_entry_
TAO_Transport_Mux_Strategytms_
TAO_Wait_Strategyws_
 Strategy for waiting for the reply after sending the request.

int bidirectional_flag_
TAO::Connection_Role opening_connection_role_
TAO_Queued_Messagehead_
 Implement the outgoing data queue.

TAO_Queued_Messagetail_
TAO_Incoming_Message_Queue incoming_message_queue_
 Queue of the consolidated, incoming messages..

TAO::Incoming_Message_Stack incoming_message_stack_
ACE_Time_Value current_deadline_
long flush_timer_id_
 The timer ID.

TAO_Transport_Timer transport_timer_
 The adapter used to receive timeout callbacks from the Reactor.

ACE_Lockhandler_lock_
size_t id_
 A unique identifier for the transport.

unsigned long purging_order_
 Used by the LRU, LFU and FIFO Connection Purging Strategies.

size_t recv_buffer_size_
 Size of the buffer received.

size_t sent_byte_count_
 Number of bytes sent.

bool is_connected_
TAO::Transport_Cache_Managertransport_cache_manager (void)
 Helper method that returns the Transport Cache Manager.

int drain_queue (void)
 Send some of the data in the queue.

int drain_queue_i (void)
 Implement drain_queue() assuming the lock is held.

int queue_is_empty_i (void)
 Check if there are messages pending in the queue.

int drain_queue_helper (int &iovcnt, iovec iov[])
 A helper routine used in drain_queue_i().

int schedule_output_i (void)
 Schedule handle_output() callbacks.

int cancel_output_i (void)
 Cancel handle_output() callbacks.

void cleanup_queue (size_t byte_count)
 Cleanup the queue.

void cleanup_queue_i ()
 Cleanup the complete queue.

int check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush)
 Check if the buffering constraints have been reached.

int send_synchronous_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
int send_reply_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
int send_asynchronous_message_i (TAO_Stub *stub, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
int send_synch_message_helper_i (TAO_Synch_Queued_Message &s, ACE_Time_Value *max_wait_time)
int flush_timer_pending (void) const
 Check if the flush timer is still pending.

void reset_flush_timer (void)
void report_invalid_event_handler (const char *caller)
 Print out error messages if the event handler is not valid.

int handle_input_missing_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time, TAO_Queued_Data *q_data)
int handle_input_parse_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time)
int handle_input_parse_extra_messages (ACE_Message_Block &message_block)
int consolidate_enqueue_message (TAO_Queued_Data *qd)
 -1 error, otherwise 0

int consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh)
 -1 error, otherwise 0

int process_queue_head (TAO_Resume_Handle &rh)
int notify_reactor (void)
void send_connection_closed_notifications_i (void)
 Assume the lock is held.

void allocate_partial_message_block (void)
 TAO_Transport (const TAO_Transport &)
void operator= (const TAO_Transport &)
TAO_Codeset_Translator_Basechar_translator_
 Additional member values required to support codeset translation.

TAO_Codeset_Translator_Basewchar_translator_
CORBA::Boolean tcs_set_
CORBA::Boolean first_request_
ACE_Message_Blockpartial_message_
 Holds the partial GIOP message (if there is one).

class TAO_Block_Flushing_Strategy
class TAO_Reactive_Flushing_Strategy
class TAO_Leader_Follower_Flushing_Strategy
class TAO_Thread_Per_Connection_Handler

Public Types


Public Member Functions

 TAO_Transport (CORBA::ULong tag, TAO_ORB_Core *orb_core)
 Default creator, requires the tag value be supplied.

virtual ~TAO_Transport (void)
 Destructor.

CORBA::ULong tag (void) const
 Return the protocol tag.

TAO_ORB_Coreorb_core (void) const
 Access the ORB that owns this connection.

TAO_Transport_Mux_Strategytms (void) const
 Get the TAO_Tranport_Mux_Strategy used by this object.

TAO_Wait_Strategywait_strategy (void) const
 Return the TAO_Wait_Strategy used by this object.

int handle_output (void)
 Callback method to reactively drain the outgoing data queue.

int bidirectional_flag (void) const
 Get the bidirectional flag.

void bidirectional_flag (int flag)
 Set the bidirectional flag.

void cache_map_entry (TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *entry)
 Set the Cache Map entry.

TAO::Transport_Cache_Manager::HASH_MAP_ENTRYcache_map_entry (void)
 Get the Cache Map entry.

size_t id (void) const
 Set and Get the identifier for this transport instance.

void id (size_t id)
TAO::Connection_Role opened_as (void) const
void opened_as (TAO::Connection_Role)
unsigned long purging_order (void) const
void purging_order (unsigned long value)
int queue_is_empty (void)
 Check if there are messages pending in the queue.

void provide_handler (TAO::Connection_Handler_Set &handlers)
 Added event handler to the handlers set.

bool provide_blockable_handler (TAO::Connection_Handler_Set &handlers)
virtual int register_handler (void)
 Register the handler with the reactor.

virtual ssize_t send (iovec *iov, int iovcnt, size_t &bytes_transferred, const ACE_Time_Value *timeout=0)=0
 Write the complete Message_Block chain to the connection.

virtual ssize_t recv (char *buffer, size_t len, const ACE_Time_Value *timeout=0)=0
 Read len bytes from into buf.

Control connection lifecycle
These methods are routed through the TMS object. The TMS strategies implement them correctly.

bool idle_after_send (void)
bool idle_after_reply (void)
virtual void close_connection (void)
 Call the implementation method after obtaining the lock.

Template methods
The Transport class uses the Template Method Pattern to implement the protocol specific functionality. Implementors of a pluggable protocol should override the following methods with the semantics documented below.

virtual int messaging_init (CORBA::Octet major, CORBA::Octet minor)=0
virtual int tear_listen_point_list (TAO_InputCDR &cdr)
virtual bool post_connect_hook (void)
 Hooks that can be overridden in concrete transports.

ACE_Event_Handler::Reference_Count add_reference (void)
 Memory management routines.

ACE_Event_Handler::Reference_Count remove_reference (void)
virtual TAO_Pluggable_Messagingmessaging_object (void)=0

Detailed Description

Generic definitions for the Transport class.

The transport object is created in the Service handler constructor and deleted in the Service Handler's destructor!!

The main responsability of a Transport object is to encapsulate a connection, and provide a transport independent way to send and receive data. Since TAO is heavily based on the Reactor for all if not all its I/O the Transport class is usually implemented with a helper Connection Handler that adapts the generic Transport interface to the Reactor types.

The outgoing data path:

One of the responsibilities of the TAO_Transport class is to send out GIOP messages as efficiently as possible. In most cases messages are put out in FIFO order, the transport object will put out the message using a single system call and return control to the application. However, for oneways and AMI requests it may be more efficient (or required if the SYNC_NONE policy is in effect) to queue the messages until a large enough data set is available. Another reason to queue is that some applications cannot block for I/O, yet they want to send messages so large that a single write() operation would not be able to cope with them. In such cases we need to queue the data and use the Reactor to drain the queue.

Therefore, the Transport class may need to use a queue to temporarily hold the messages, and, in some configurations, it may need to use the Reactor to concurrently drain such queues.

Out of order messages:

TAO provides explicit policies to send 'urgent' messages. Such messages may put at the head of the queue. However, they cannot be sent immediately because the transport may already be sending another message in a reactive fashion.

Consequently, the Transport must also know if the head of the queue has been partially sent. In that case new messages can only follow the head. Only once the head is completely sent we can start sending new messages.

Waiting threads:

One or more threads can be blocked waiting for the connection to completely send the message. The thread should return as soon as its message has been sent, so a per-thread condition is required. This suggest that simply using a ACE_Message_Queue would not be enough: there is a significant amount of ancillary information, to keep on each message that the Message_Block class does not provide room for.

Blocking I/O is still attractive for some applications. First, my eliminating the Reactor overhead performance is improved when sending large blocks of data. Second, using the Reactor to send out data opens the door for nested upcalls, yet some applications cannot deal with the reentrancy issues in this case.

Timeouts:

Some or all messages could have a timeout period attached to them. The timeout source could either be some high-level policy or maybe some strategy to prevent denial of service attacks. In any case the timeouts are per-message, and later messages could have shorter timeouts. In fact, some kind of scheduling (such as EDF) could be required in a few applications.

Conclusions:

The outgoing data path consist in several components:

The Transport object provides a single method to send request messages (send_request_message ()).

The incoming data path:

One of the main responsibilities of the transport is to read and process the incoming GIOP message as quickly and efficiently as possible. There are other forces that needs to be given due consideration. They are

Parsing messages (GIOP) & processing the message:

The messages should be checked for validity and the right information should be sent to the higher layer for processing. The process of doing a sanity check and preparing the messages for the higher layers of the ORB are done by the messaging protocol.

Design forces and Challenges

To keep things as efficient as possible for medium sized requests, it would be good to minimise data copying and locking along the incoming path ie. from the time of reading the data from the handle to the application. We achieve this by creating a buffer on stack and reading the data from the handle into the buffer. We then pass the same data block (the buffer is encapsulated into a data block) to the higher layers of the ORB. The problems stem from the following (a) Data is bigger than the buffer that we have on stack (b) Transports like TCP do not guarantee availability of the whole chunk of data in one shot. Data could trickle in byte by byte. (c) Single read gives multiple messages

We solve the problems as follows

(a) First do a read with the buffer on stack. Query the underlying messaging object whether the message has any incomplete portion. If so, data will be copied into new buffer being able to hold full message and is queued; succeeding events will read data from socket and write directly into this buffer. Otherwise, if if the message in local buffer is complete, we free the handle and then send the message to the higher layers of the ORB for processing.

(b) If buffer with incomplete message has been enqueued, while trying to do the above, the reactor will call us back when the handle becomes read ready. The read-operation will copy data directly into the enqueued buffer. If the message has bee read completely the message is sent to the higher layers of the ORB for processing.

(c) If we get multiple messages (possible if the client connected to the server sends oneways or AMI requests), we parse and split the messages. Every message is put in the queue. Once the messages are queued, the thread picks up one message to send to the higher layers of the ORB. Before doing that, if it finds more messages, it sends a notify to the reactor without resuming the handle. The next thread picks up a message from the queue and processes that. Once the queue is drained the last thread resumes the handle.

Sending Replies

We could use the outgoing path of the ORB to send replies. This would allow us to reuse most of the code in the outgoing data path. We were doing this till TAO-1.2.3. We run in to problems. When writing the reply the ORB gets flow controlled, and the ORB tries to flush the message by going into the reactor. This resulted in unnecessary nesting. The thread that gets into the Reactor could potentially handle other messages (incoming or outgoing) and the stack starts growing leading to crashes.

Solution to the nesting problem

The solution that we (plan to) adopt is pretty straight forward. The thread sending replies will not block to send the replies but queue the replies and return to the Reactor. (Note the careful usages of the terms "blocking in the Reactor" as opposed to "return back to the Reactor".

See Also:

http://cvs.doc.wustl.edu/ace-latest.cgi/ACE_wrappers/TAO/docs/pluggable_protocols/index.html

Definition at line 237 of file Transport.h.


Member Enumeration Documentation

anonymous enum
 

Enumeration values:
TAO_ONEWAY_REQUEST 
TAO_TWOWAY_REQUEST 
TAO_REPLY 

Definition at line 592 of file Transport.h.

00593     {
00594       TAO_ONEWAY_REQUEST = 0,
00595       TAO_TWOWAY_REQUEST = 1,
00596       TAO_REPLY
00597     };


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_Transport::TAO_Transport CORBA::ULong  tag,
TAO_ORB_Core orb_core
 

Default creator, requires the tag value be supplied.

Definition at line 118 of file Transport.cpp.

References TAO_ORB_Core::client_factory(), TAO_Client_Strategy_Factory::create_transport_mux_strategy(), and TAO_Client_Strategy_Factory::create_wait_strategy().

00120   : tag_ (tag)
00121   , orb_core_ (orb_core)
00122   , cache_map_entry_ (0)
00123   , bidirectional_flag_ (-1)
00124   , opening_connection_role_ (TAO::TAO_UNSPECIFIED_ROLE)
00125   , head_ (0)
00126   , tail_ (0)
00127   , incoming_message_queue_ (orb_core)
00128   , current_deadline_ (ACE_Time_Value::zero)
00129   , flush_timer_id_ (-1)
00130   , transport_timer_ (this)
00131   , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ())
00132   , id_ ((size_t) this)
00133   , purging_order_ (0)
00134   , recv_buffer_size_ (0)
00135   , sent_byte_count_ (0)
00136   , is_connected_ (false)
00137   , char_translator_ (0)
00138   , wchar_translator_ (0)
00139   , tcs_set_ (0)
00140   , first_request_ (1)
00141   , partial_message_ (0)
00142 #ifdef ACE_HAS_SENDFILE
00143     // The ORB has been configured to use the MMAP allocator, meaning
00144     // we could/should use sendfile() to send data.  Cast once rather
00145     // here rather than during each send.  This assumes that all
00146     // TAO_OutputCDR instances are using the same TAO_MMAP_Allocator
00147     // instance as the underlying output CDR buffer allocator.
00148   , mmap_allocator_ (
00149       dynamic_cast<TAO_MMAP_Allocator *> (
00150         orb_core->output_cdr_buffer_allocator ()))
00151 #endif  /* ACE_HAS_SENDFILE */
00152 {
00153   TAO_Client_Strategy_Factory *cf =
00154     this->orb_core_->client_factory ();
00155 
00156   // Create WS now.
00157   this->ws_ = cf->create_wait_strategy (this);
00158 
00159   // Create TMS now.
00160   this->tms_ = cf->create_transport_mux_strategy (this);
00161 
00162   /*
00163    * Hook to add code that initializes components that
00164    * belong to the concrete protocol implementation.
00165    * Further additions to this Transport class will
00166    * need to add code *before* this hook.
00167    */
00168   //@@ TAO_TRANSPORT_SPL_CONSTRUCTOR_ADD_HOOK
00169 }

TAO_Transport::~TAO_Transport void   )  [virtual]
 

Destructor.

Definition at line 171 of file Transport.cpp.

References ACE_ASSERT, cleanup_queue_i(), handler_lock_, is_connected_, purge_entry(), and ACE_Message_Block::release().

00172 {
00173   delete this->ws_;
00174 
00175   delete this->tms_;
00176 
00177   delete this->handler_lock_;
00178 
00179   if (!this->is_connected_)
00180     {
00181       // When we have a not connected transport we could have buffered
00182       // messages on this transport which we have to cleanup now.
00183       this->cleanup_queue_i();
00184 
00185       // Cleanup our cache entry
00186       this->purge_entry();
00187     }
00188 
00189   // Release the partial message block, however we may
00190   // have never allocated one.
00191   ACE_Message_Block::release (this->partial_message_);
00192 
00193   // By the time the destructor is reached here all the connection stuff
00194   // *must* have been cleaned up.
00195 
00196   // The following assert is needed for the test "Bug_2494_Regression".
00197   // See the bugzilla bug #2494 for details.
00198   ACE_ASSERT (this->head_ == 0);
00199   ACE_ASSERT (this->cache_map_entry_ == 0);
00200 
00201   /*
00202    * Hook to add code that cleans up components
00203    * belong to the concrete protocol implementation.
00204    * Further additions to this Transport class will
00205    * need to add code *before* this hook.
00206    */
00207   //@@ TAO_TRANSPORT_SPL_DESTRUCTOR_ADD_HOOK
00208 }

TAO_Transport::TAO_Transport const TAO_Transport  )  [private]
 


Member Function Documentation

ACE_Event_Handler::Reference_Count TAO_Transport::add_reference void   ) 
 

Memory management routines.

Definition at line 2352 of file Transport.cpp.

References ACE_Event_Handler::add_reference(), and event_handler_i().

Referenced by TAO::Cache_IntId::Cache_IntId(), TAO::Cache_IntId::operator=(), provide_blockable_handler(), provide_handler(), TAO::Transport_Cache_Manager::purge(), TAO_Thread_Per_Connection_Handler::TAO_Thread_Per_Connection_Handler(), and TAO_Asynch_Reply_Dispatcher_Base::transport().

02353 {
02354   return this->event_handler_i ()->add_reference ();
02355 }

void TAO_Transport::allocate_partial_message_block void   )  [private]
 

Allocate a partial message block and store it in our partial_message_ data member.

Definition at line 2421 of file Transport.cpp.

References ACE_NEW, TAO_Pluggable_Messaging::header_length(), messaging_object(), and partial_message_.

Referenced by handle_input_parse_data().

02422 {
02423   if (this->partial_message_ == 0)
02424     {
02425       // This value must be at least large enough to hold a GIOP message
02426       // header plus a GIOP fragment header
02427       const size_t partial_message_size =
02428         this->messaging_object ()->header_length ();
02429        // + this->messaging_object ()->fragment_header_length ();
02430        // deprecated, conflicts with not-single_read_opt.
02431 
02432       ACE_NEW (this->partial_message_,
02433                ACE_Message_Block (partial_message_size));
02434     }
02435 }

void TAO_Transport::assign_translators TAO_InputCDR ,
TAO_OutputCDR
 

Use the Transport's codeset factories to set the translator for input and output CDRs.

Definition at line 2322 of file Transport.cpp.

References TAO_Codeset_Translator_Base::assign().

Referenced by TAO::LocateRequest_Invocation::check_reply(), TAO::Synch_Twoway_Invocation::check_reply_status(), TAO_ServerRequest::init_reply(), TAO_GIOP_Message_Lite::process_request(), TAO_GIOP_Message_Base::process_request(), TAO_GIOP_Message_Lite::process_request_message(), TAO_GIOP_Message_Base::process_request_message(), TAO_ServerRequest::send_cached_reply(), TAO_GIOP_Message_Lite::send_reply_exception(), TAO_ServerRequest::tao_send_reply_exception(), and TAO::Remote_Invocation::write_header().

02323 {
02324   if (this->char_translator_)
02325     {
02326       this->char_translator_->assign (inp);
02327       this->char_translator_->assign (outp);
02328     }
02329   if (this->wchar_translator_)
02330     {
02331       this->wchar_translator_->assign (inp);
02332       this->wchar_translator_->assign (outp);
02333     }
02334 }

ACE_INLINE void TAO_Transport::bidirectional_flag int  flag  ) 
 

Set the bidirectional flag.

Definition at line 39 of file Transport.inl.

References bidirectional_flag_.

00040 {
00041   this->bidirectional_flag_ = flag;
00042 }

ACE_INLINE int TAO_Transport::bidirectional_flag void   )  const
 

Get the bidirectional flag.

Definition at line 33 of file Transport.inl.

References bidirectional_flag_.

Referenced by TAO::Wait_On_LF_No_Upcall::can_process_upcalls(), TAO_IIOP_Transport::generate_request_header(), TAO_Muxed_TMS::request_id(), TAO_Exclusive_TMS::request_id(), and TAO_IIOP_Transport::tear_listen_point_list().

00034 {
00035   return this->bidirectional_flag_;
00036 }

ACE_INLINE TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * TAO_Transport::cache_map_entry void   ) 
 

Get the Cache Map entry.

Definition at line 57 of file Transport.inl.

References cache_map_entry_.

00058 {
00059   return this->cache_map_entry_;
00060 }

ACE_INLINE void TAO_Transport::cache_map_entry TAO::Transport_Cache_Manager::HASH_MAP_ENTRY entry  ) 
 

Set the Cache Map entry.

Definition at line 63 of file Transport.inl.

References cache_map_entry_, and TAO::Transport_Cache_Manager::HASH_MAP_ENTRY.

Referenced by TAO::Transport_Cache_Manager::bind_i().

00065 {
00066   this->cache_map_entry_ = entry;
00067 }

int TAO_Transport::cancel_output_i void   )  [private]
 

Cancel handle_output() callbacks.

Definition at line 759 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, ACE_Reactor::cancel_wakeup(), event_handler_i(), LM_DEBUG, ACE_Event_Handler::reactor(), and TAO_debug_level.

Referenced by TAO_Reactive_Flushing_Strategy::cancel_output(), and TAO_Leader_Follower_Flushing_Strategy::cancel_output().

00760 {
00761   ACE_Event_Handler * const eh = this->event_handler_i ();
00762   ACE_Reactor *const reactor = eh->reactor ();
00763 
00764   if (TAO_debug_level > 3)
00765     {
00766       ACE_DEBUG ((LM_DEBUG,
00767          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"),
00768          this->id ()));
00769     }
00770 
00771   return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00772 }

ACE_INLINE void TAO_Transport::char_translator TAO_Codeset_Translator_Base  ) 
 

CodeSet negotiation - Set the char codeset translator factory.

Definition at line 137 of file Transport.inl.

References tcs_set_.

00138 {
00139   this->char_translator_ = tf;
00140   this->tcs_set_ = 1;
00141 }

ACE_INLINE TAO_Codeset_Translator_Base * TAO_Transport::char_translator void   )  const
 

CodeSet Negotiation - Get the char codeset translator factory.

Definition at line 125 of file Transport.inl.

00126 {
00127   return this->char_translator_;
00128 }

int TAO_Transport::check_buffering_constraints_i TAO_Stub stub,
bool &  must_flush
[private]
 

Check if the buffering constraints have been reached.

Definition at line 1056 of file Transport.cpp.

References ACE_Reactor::cancel_timer(), current_deadline_, event_handler_i(), flush_timer_id_, flush_timer_pending(), ACE_OS::gettimeofday(), TAO_Queued_Message::message_length(), TAO_Queued_Message::next(), ACE_Event_Handler::reactor(), ACE_Reactor::schedule_timer(), and TAO_Stub::transport_queueing_strategy().

Referenced by send_asynchronous_message_i().

01058 {
01059   // First let's compute the size of the queue:
01060   size_t msg_count = 0;
01061   size_t total_bytes = 0;
01062 
01063   for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
01064     {
01065       ++msg_count;
01066       total_bytes += i->message_length ();
01067     }
01068 
01069   bool set_timer;
01070   ACE_Time_Value new_deadline;
01071 
01072   bool constraints_reached =
01073     stub->transport_queueing_strategy ().
01074       buffering_constraints_reached (stub,
01075                                      msg_count,
01076                                      total_bytes,
01077                                      must_flush,
01078                                      this->current_deadline_,
01079                                      set_timer,
01080                                      new_deadline);
01081 
01082   // ... set the new timer, also cancel any previous timers ...
01083   if (set_timer)
01084     {
01085       ACE_Event_Handler *eh = this->event_handler_i ();
01086       ACE_Reactor *reactor = eh->reactor ();
01087       this->current_deadline_ = new_deadline;
01088       ACE_Time_Value delay =
01089         new_deadline - ACE_OS::gettimeofday ();
01090 
01091       if (this->flush_timer_pending ())
01092         {
01093           reactor->cancel_timer (this->flush_timer_id_);
01094         }
01095 
01096       this->flush_timer_id_ =
01097         reactor->schedule_timer (&this->transport_timer_,
01098                                  &this->current_deadline_,
01099                                  delay);
01100     }
01101 
01102   return constraints_reached;
01103 }

void TAO_Transport::cleanup_queue size_t  byte_count  )  [private]
 

Cleanup the queue.

Exactly byte_count bytes have been sent, the queue must be cleaned up as potentially several messages have been completely sent out. It leaves on head_ the next message to send out.

Definition at line 1019 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, TAO_Queued_Message::all_data_sent(), TAO_Queued_Message::bytes_transferred(), TAO_Queued_Message::destroy(), LM_DEBUG, TAO_Queued_Message::message_length(), TAO_Queued_Message::remove_from_list(), and TAO_debug_level.

Referenced by drain_queue_helper().

01020 {
01021   while (this->head_ != 0 && byte_count > 0)
01022     {
01023       TAO_Queued_Message *i = this->head_;
01024 
01025       if (TAO_debug_level > 4)
01026         {
01027           ACE_DEBUG ((LM_DEBUG,
01028              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01029              ACE_TEXT ("byte_count = %d\n"),
01030              this->id (), byte_count));
01031         }
01032 
01033       // Update the state of the first message
01034       i->bytes_transferred (byte_count);
01035 
01036       if (TAO_debug_level > 4)
01037         {
01038           ACE_DEBUG ((LM_DEBUG,
01039              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01040              ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"),
01041              this->id (), byte_count, i->all_data_sent (),
01042              i->message_length ()));
01043         }
01044 
01045       // ... if all the data was sent the message must be removed from
01046       // the queue...
01047       if (i->all_data_sent ())
01048         {
01049           i->remove_from_list (this->head_, this->tail_);
01050           i->destroy ();
01051         }
01052     }
01053 }

void TAO_Transport::cleanup_queue_i  )  [private]
 

Cleanup the complete queue.

Definition at line 992 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, TAO_Queued_Message::destroy(), LM_DEBUG, TAO_Queued_Message::remove_from_list(), TAO_LF_Event::state_changed(), and TAO_debug_level.

Referenced by send_connection_closed_notifications_i(), and ~TAO_Transport().

00993 {
00994   if (TAO_debug_level > 4)
00995     {
00996       ACE_DEBUG ((LM_DEBUG,
00997          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
00998          ACE_TEXT ("cleaning up complete queue\n"),
00999          this->id ()));
01000     }
01001 
01002   // Cleanup all messages
01003   while (this->head_ != 0)
01004     {
01005       TAO_Queued_Message *i = this->head_;
01006 
01007        // @@ This is a good point to insert a flag to indicate that a
01008        //    CloseConnection message was successfully received.
01009       i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
01010                         this->orb_core_->leader_follower ());
01011 
01012       i->remove_from_list (this->head_, this->tail_);
01013 
01014       i->destroy ();
01015    }
01016 }

void TAO_Transport::clear_translators TAO_InputCDR ,
TAO_OutputCDR
 

It is necessary to clear the codeset translator when a CDR stream is used for more than one GIOP message. This is required since the header must not be translated, whereas the body must be.

Definition at line 2337 of file Transport.cpp.

References ACE_OutputCDR::char_translator(), ACE_InputCDR::char_translator(), ACE_OutputCDR::wchar_translator(), and ACE_InputCDR::wchar_translator().

Referenced by TAO::Remote_Invocation::write_header().

02338 {
02339   if (inp)
02340     {
02341       inp->char_translator (0);
02342       inp->wchar_translator (0);
02343     }
02344   if (outp)
02345     {
02346       outp->char_translator (0);
02347       outp->wchar_translator (0);
02348     }
02349 }

void TAO_Transport::close_connection void   )  [virtual]
 

Call the implementation method after obtaining the lock.

Definition at line 293 of file Transport.cpp.

References TAO_Connection_Handler::close_connection(), and connection_handler_i().

Referenced by TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), post_open(), TAO::Transport_Cache_Manager::purge(), TAO_GIOP_Message_Base::send_close_connection(), TAO::Remote_Invocation::send_message(), send_message_shared(), TAO_IIOP_Transport::send_message_shared(), and TAO_Wait_On_Read::wait().

00294 {
00295   this->connection_handler_i ()->close_connection ();
00296 }

ACE_INLINE TAO_Connection_Handler * TAO_Transport::connection_handler void   ) 
 

Get the connection handler for this transport.

Definition at line 175 of file Transport.inl.

References connection_handler_i().

Referenced by TAO_IIOP_Connector::complete_connection(), TAO::Remote_Invocation::send_message(), TAO_Connect_Strategy::wait(), and TAO_Connector::wait_for_connection_completion().

00176 {
00177   return this->connection_handler_i();
00178 }

virtual TAO_Connection_Handler* TAO_Transport::connection_handler_i void   )  [protected, pure virtual]
 

Implemented in TAO_IIOP_Transport.

Referenced by close_connection(), and connection_handler().

int TAO_Transport::consolidate_enqueue_message TAO_Queued_Data qd  )  [private]
 

-1 error, otherwise 0

Definition at line 1491 of file Transport.cpp.

References ACE_ERROR, ACE_TEXT, TAO_Pluggable_Messaging::consolidate_fragmented_message(), TAO_Incoming_Message_Queue::enqueue_tail(), incoming_message_queue_, LM_ERROR, messaging_object(), TAO_Queued_Data::missing_data_, TAO_Queued_Data::more_fragments_, TAO_Queued_Data::msg_type_, TAO_Queued_Data::release(), TAO_debug_level, and TAO_PLUGGABLE_MESSAGE_FRAGMENT.

Referenced by handle_input_parse_data(), and handle_input_parse_extra_messages().

01492 {
01493   // consolidate message on top of stack, only for fragmented messages
01494 
01495   // paranoid check
01496   if (q_data->missing_data_ != 0)
01497     {
01498        return -1;
01499     }
01500 
01501   if (q_data->more_fragments_ ||
01502       q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
01503     {
01504       TAO_Queued_Data *new_q_data = 0;
01505 
01506       switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01507         {
01508         case -1: // error
01509           return -1;
01510 
01511         case 0:  // returning consolidated message in new_q_data
01512           if (!new_q_data)
01513             {
01514               if (TAO_debug_level > 0)
01515                 {
01516                   ACE_ERROR ((LM_ERROR,
01517                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ")
01518                      ACE_TEXT ("error, consolidated message is NULL\n"),
01519                      this->id ()));
01520                 }
01521               return -1;
01522             }
01523 
01524           if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0)
01525             {
01526               TAO_Queued_Data::release (new_q_data);
01527               return -1;
01528             }
01529           break;
01530 
01531         case 1:  // fragment has been stored in messaging_oject()
01532           break;
01533         }
01534     }
01535   else
01536     {
01537       if (this->incoming_message_queue_.enqueue_tail (q_data) != 0)
01538         {
01539           TAO_Queued_Data::release (q_data);
01540           return -1;
01541         }
01542     }
01543 
01544   return 0; // success
01545 }

int TAO_Transport::consolidate_process_message TAO_Queued_Data qd,
TAO_Resume_Handle rh
[private]
 

-1 error, otherwise 0

Definition at line 1404 of file Transport.cpp.

References ACE_ERROR, ACE_TEXT, TAO_Pluggable_Messaging::consolidate_fragmented_message(), LM_ERROR, messaging_object(), TAO_Queued_Data::missing_data_, TAO_Queued_Data::more_fragments_, TAO_Queued_Data::msg_type_, process_parsed_messages(), TAO_Queued_Data::release(), TAO_debug_level, and TAO_PLUGGABLE_MESSAGE_FRAGMENT.

Referenced by handle_input_missing_data().

01406 {
01407   // paranoid check
01408   if (q_data->missing_data_ != 0)
01409     {
01410       if (TAO_debug_level > 0)
01411         {
01412            ACE_ERROR ((LM_ERROR,
01413               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01414               ACE_TEXT ("missing data\n"),
01415               this->id ()));
01416         }
01417        return -1;
01418     }
01419 
01420   if (q_data->more_fragments_ ||
01421       q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
01422     {
01423       // consolidate message on top of stack, only for fragmented messages
01424       TAO_Queued_Data *new_q_data = 0;
01425 
01426       switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01427         {
01428         case -1: // error
01429           return -1;
01430 
01431         case 0:  // returning consolidated message in q_data
01432           if (!new_q_data)
01433             {
01434               if (TAO_debug_level > 0)
01435                 {
01436                   ACE_ERROR ((LM_ERROR,
01437                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01438                      ACE_TEXT ("error, consolidated message is NULL\n"),
01439                      this->id ()));
01440                 }
01441               return -1;
01442             }
01443 
01444 
01445           if (this->process_parsed_messages (new_q_data, rh) == -1)
01446             {
01447               TAO_Queued_Data::release (new_q_data);
01448 
01449               if (TAO_debug_level > 0)
01450                 {
01451                   ACE_ERROR ((LM_ERROR,
01452                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01453                      ACE_TEXT ("error processing consolidated message\n"),
01454                      this->id ()));
01455                 }
01456               return -1;
01457             }
01458 
01459           TAO_Queued_Data::release (new_q_data);
01460 
01461           break;
01462 
01463         case 1:  // fragment has been stored in messaging_oject()
01464           break;
01465         }
01466     }
01467   else
01468     {
01469       if (this->process_parsed_messages (q_data, rh) == -1)
01470         {
01471           TAO_Queued_Data::release (q_data);
01472 
01473           if (TAO_debug_level > 0)
01474             {
01475               ACE_ERROR ((LM_ERROR,
01476                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01477                  ACE_TEXT ("error processing message\n"),
01478                  this->id ()));
01479             }
01480           return -1;
01481         }
01482 
01483       TAO_Queued_Data::release (q_data);
01484 
01485     }
01486 
01487   return 0;
01488 }

int TAO_Transport::drain_queue void   )  [private]
 

Send some of the data in the queue.

As the outgoing data is drained this method is invoked to send as much of the current message as possible.

Returns 0 if there is more data to send, -1 if there was an error and 1 if the message was completely sent.

Definition at line 807 of file Transport.cpp.

References ACE_GUARD_RETURN, TAO_Flushing_Strategy::cancel_output(), drain_queue_i(), TAO_ORB_Core::flushing_strategy(), and orb_core().

Referenced by handle_output().

00808 {
00809   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00810   int const retval = this->drain_queue_i ();
00811 
00812   if (retval == 1)
00813     {
00814       // ... there is no current message or it was completely
00815       // sent, cancel output...
00816       TAO_Flushing_Strategy *flushing_strategy =
00817         this->orb_core ()->flushing_strategy ();
00818 
00819       flushing_strategy->cancel_output (this);
00820 
00821       return 0;
00822     }
00823 
00824   return retval;
00825 }

int TAO_Transport::drain_queue_helper int &  iovcnt,
iovec  iov[]
[private]
 

A helper routine used in drain_queue_i().

Definition at line 828 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, cleanup_queue(), dump_iov(), EWOULDBLOCK, LM_DEBUG, send(), sent_byte_count_, ssize_t, and TAO_debug_level.

Referenced by drain_queue_i().

00829 {
00830   size_t byte_count = 0;
00831 
00832   // ... send the message ...
00833   ssize_t retval = -1;
00834 
00835 #ifdef ACE_HAS_SENDFILE
00836   if (this->mmap_allocator_)
00837     retval = this->sendfile (this->mmap_allocator_,
00838                              iov,
00839                              iovcnt,
00840                              byte_count);
00841   else
00842 #endif  /* ACE_HAS_SENDFILE */
00843     retval = this->send (iov, iovcnt, byte_count);
00844 
00845   if (TAO_debug_level == 5)
00846     {
00847       dump_iov (iov, iovcnt, this->id (),
00848                 byte_count, "drain_queue_helper");
00849     }
00850 
00851   // ... now we need to update the queue, removing elements
00852   // that have been sent, and updating the last element if it
00853   // was only partially sent ...
00854   this->cleanup_queue (byte_count);
00855   iovcnt = 0;
00856 
00857   if (retval == 0)
00858     {
00859       if (TAO_debug_level > 4)
00860         {
00861           ACE_DEBUG ((LM_DEBUG,
00862              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00863              ACE_TEXT ("send() returns 0\n"),
00864              this->id ()));
00865         }
00866       return -1;
00867     }
00868   else if (retval == -1)
00869     {
00870       if (TAO_debug_level > 4)
00871         {
00872           ACE_DEBUG ((LM_DEBUG,
00873              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00874              ACE_TEXT ("error during %p\n"),
00875              this->id (), ACE_TEXT ("send()")));
00876         }
00877 
00878       if (errno == EWOULDBLOCK || errno == EAGAIN)
00879         {
00880           return 0;
00881         }
00882 
00883       return -1;
00884     }
00885 
00886   // ... start over, how do we guarantee progress?  Because if
00887   // no bytes are sent send() can only return 0 or -1
00888 
00889   // Total no. of bytes sent for a send call
00890   this->sent_byte_count_ += byte_count;
00891 
00892   if (TAO_debug_level > 4)
00893     {
00894       ACE_DEBUG ((LM_DEBUG,
00895          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00896          ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"),
00897          this->id(), byte_count, (this->head_ == 0)));
00898     }
00899 
00900   return 1;
00901 }

int TAO_Transport::drain_queue_i void   )  [private]
 

Implement drain_queue() assuming the lock is held.

Definition at line 904 of file Transport.cpp.

References ACE_DEBUG, ACE_IOV_MAX, ACE_TEXT, ACE_Reactor::cancel_timer(), drain_queue_helper(), event_handler_i(), TAO_Queued_Message::fill_iov(), flush_timer_pending(), LM_DEBUG, TAO_Queued_Message::next(), ACE_Event_Handler::reactor(), reset_flush_timer(), sent_byte_count_, and TAO_debug_level.

Referenced by drain_queue(), TAO_Block_Flushing_Strategy::schedule_output(), send_message_block_chain_i(), and send_synch_message_helper_i().

00905 {
00906   // This is the vector used to send data, it must be declared outside
00907   // the loop because after the loop there may still be data to be
00908   // sent
00909   int iovcnt = 0;
00910 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00911   iovec iov[ACE_IOV_MAX] = { 0 , 0 };
00912 #else
00913   iovec iov[ACE_IOV_MAX];
00914 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
00915 
00916   // We loop over all the elements in the queue ...
00917   TAO_Queued_Message *i = this->head_;
00918 
00919   // Reset the value so that the counting is done for each new send
00920   // call.
00921   this->sent_byte_count_ = 0;
00922 
00923   while (i != 0)
00924     {
00925       // ... each element fills the iovector ...
00926       i->fill_iov (ACE_IOV_MAX, iovcnt, iov);
00927 
00928       // ... the vector is full, no choice but to send some data out.
00929       // We need to loop because a single message can span multiple
00930       // IOV_MAX elements ...
00931       if (iovcnt == ACE_IOV_MAX)
00932         {
00933           int const retval =
00934             this->drain_queue_helper (iovcnt, iov);
00935 
00936           if (TAO_debug_level > 4)
00937             {
00938               ACE_DEBUG ((LM_DEBUG,
00939                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
00940                  ACE_TEXT ("helper retval = %d\n"),
00941                  this->id (), retval));
00942             }
00943 
00944           if (retval != 1)
00945             {
00946               return retval;
00947             }
00948 
00949           i = this->head_;
00950           continue;
00951         }
00952       // ... notice that this line is only reached if there is still
00953       // room in the iovector ...
00954       i = i->next ();
00955     }
00956 
00957   if (iovcnt != 0)
00958     {
00959       int const retval = this->drain_queue_helper (iovcnt, iov);
00960 
00961       if (TAO_debug_level > 4)
00962         {
00963           ACE_DEBUG ((LM_DEBUG,
00964               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
00965               ACE_TEXT ("helper retval = %d\n"),
00966               this->id (), retval));
00967         }
00968 
00969       if (retval != 1)
00970         {
00971           return retval;
00972         }
00973     }
00974 
00975   if (this->head_ == 0)
00976     {
00977       if (this->flush_timer_pending ())
00978         {
00979           ACE_Event_Handler *eh = this->event_handler_i ();
00980           ACE_Reactor * const reactor = eh->reactor ();
00981           reactor->cancel_timer (this->flush_timer_id_);
00982           this->reset_flush_timer ();
00983         }
00984 
00985       return 1;
00986     }
00987 
00988   return 0;
00989 }

virtual ACE_Event_Handler* TAO_Transport::event_handler_i void   )  [pure virtual]
 

Normally a concrete TAO_Transport object has-a ACE_Event_Handler member that functions as an adapter between the ACE_Reactor framework and the TAO pluggable protocol framework. In all the protocols implemented so far this role is fullfilled by an instance of ACE_Svc_Handler.

Todo:
Since we only use a limited functionality of ACE_Svc_Handler we could probably implement a generic adapter class (TAO_Transport_Event_Handler or something), this will reduce footprint and simplify the process of implementing a pluggable protocol.

This method has to be renamed to event_handler()

Implemented in TAO_IIOP_Transport.

Referenced by add_reference(), cancel_output_i(), check_buffering_constraints_i(), drain_queue_i(), TAO::Transport_Cache_Manager::find_transport(), notify_reactor(), register_handler(), remove_reference(), schedule_output_i(), TAO_Connection_Handler::transport(), and TAO_Wait_On_Read::wait().

ACE_INLINE void TAO_Transport::first_request_sent  ) 
 

Set the state of the first_request_ flag to 0.

Definition at line 158 of file Transport.inl.

References first_request_.

Referenced by TAO_IIOP_Transport::send_request().

00159 {
00160   this->first_request_ = 0;
00161 }

ACE_INLINE int TAO_Transport::flush_timer_pending void   )  const [private]
 

Check if the flush timer is still pending.

Definition at line 108 of file Transport.inl.

References flush_timer_id_.

Referenced by check_buffering_constraints_i(), drain_queue_i(), and handle_timeout().

00109 {
00110   return this->flush_timer_id_ != -1;
00111 }

int TAO_Transport::format_queue_message TAO_OutputCDR stream  ) 
 

Format and queue a message for stream.

Definition at line 481 of file Transport.cpp.

References ACE_OutputCDR::begin(), TAO_Pluggable_Messaging::format_message(), messaging_object(), and queue_message_i().

Referenced by TAO::Synch_Oneway_Invocation::remote_oneway().

00482 {
00483   if (this->messaging_object ()->format_message (stream) != 0)
00484     return -1;
00485 
00486   return this->queue_message_i (stream.begin());
00487 }

int TAO_Transport::generate_locate_request TAO_Target_Specification spec,
TAO_Operation_Details opdetails,
TAO_OutputCDR output
 

This is a request for the transport object to write a LocateRequest header before it is sent out.

Definition at line 352 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, TAO_Pluggable_Messaging::generate_locate_request_header(), LM_DEBUG, messaging_object(), and TAO_debug_level.

Referenced by TAO::LocateRequest_Invocation::invoke().

00356 {
00357   if (this->messaging_object ()->generate_locate_request_header (opdetails,
00358                                                                  spec,
00359                                                                  output)
00360        == -1)
00361     {
00362       if (TAO_debug_level > 0)
00363         {
00364           ACE_DEBUG ((LM_DEBUG,
00365                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ")
00366                       ACE_TEXT ("error while marshalling the LocateRequest header\n"),
00367                       this->id ()));
00368         }
00369 
00370       return -1;
00371     }
00372 
00373   return 0;
00374 }

int TAO_Transport::generate_request_header TAO_Operation_Details opd,
TAO_Target_Specification spec,
TAO_OutputCDR msg
[virtual]
 

This is a request for the transport object to write a request header before it sends out the request

Reimplemented in TAO_IIOP_Transport.

Definition at line 377 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, TAO_ORB_Core::codeset_manager(), first_request_, TAO_Pluggable_Messaging::generate_request_header(), TAO_Codeset_Manager::generate_service_context(), LM_DEBUG, messaging_object(), orb_core(), and TAO_debug_level.

Referenced by TAO_IIOP_Transport::generate_request_header(), and TAO::Remote_Invocation::write_header().

00381 {
00382   // codeset service context is only supposed to be sent in the first request
00383   // on a particular connection.
00384   if (this->first_request_)
00385     {
00386       TAO_Codeset_Manager * const csm = this->orb_core ()->codeset_manager ();
00387       if (csm)
00388         csm->generate_service_context (opdetails,*this);
00389     }
00390 
00391   if (this->messaging_object ()->generate_request_header (opdetails,
00392                                                           spec,
00393                                                           output) == -1)
00394     {
00395       if (TAO_debug_level > 0)
00396         {
00397         ACE_DEBUG ((LM_DEBUG,
00398                    ACE_TEXT ("(%P|%t) - Transport[%d]::generate_request_header, ")
00399                    ACE_TEXT ("error while marshalling the Request header\n"),
00400                       this->id()));
00401         }
00402 
00403       return -1;
00404     }
00405 
00406   return 0;
00407 }

int TAO_Transport::handle_input TAO_Resume_Handle rh,
ACE_Time_Value max_wait_time = 0,
int  block = 0
[virtual]
 

Callback to read incoming data.

The ACE_Event_Handler adapter invokes this method as part of its handle_input() operation.

Todo:
the method name is confusing! Calling it handle_input() would probably make things easier to understand and follow!
Once a complete message is read the Transport class delegates on the Messaging layer to invoke the right upcall (on the server) or the TAO_Reply_Dispatcher (on the client side).

Parameters:
max_wait_time In some cases the I/O is synchronous, e.g. a thread-per-connection server or when Wait_On_Read is enabled. In those cases a maximum read time can be specified.
block Is deprecated and ignored.

Definition at line 1329 of file Transport.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT, handle_input_missing_data(), handle_input_parse_data(), incoming_message_stack_, LM_DEBUG, LM_ERROR, TAO_Queued_Data::missing_data_, process_queue_head(), TAO_debug_level, TAO_MISSING_DATA_UNDEFINED, and TAO::Incoming_Message_Stack::top().

Referenced by TAO_Connection_Handler::handle_input_internal(), TAO_Connection_Handler::svc_i(), and TAO_Wait_On_Read::wait().

01332 {
01333   if (TAO_debug_level > 3)
01334     {
01335       ACE_DEBUG ((LM_DEBUG,
01336          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"),
01337          this->id ()));
01338     }
01339 
01340   // First try to process messages of the head of the incoming queue.
01341   int const retval = this->process_queue_head (rh);
01342 
01343   if (retval <= 0)
01344     {
01345       if (retval == -1)
01346         {
01347           if (TAO_debug_level > 2)
01348             {
01349               ACE_DEBUG ((LM_DEBUG,
01350                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01351                  ACE_TEXT ("error while parsing the head of the queue\n"),
01352                  this->id()));
01353 
01354             }
01355           return -1;
01356         }
01357       else
01358         {
01359           // retval == 0
01360 
01361           // Processed a message in queue successfully. This
01362           // thread must return to thread-pool now.
01363           return 0;
01364         }
01365     }
01366 
01367   TAO_Queued_Data *q_data = 0;
01368 
01369   if (this->incoming_message_stack_.top (q_data) != -1
01370       && q_data->missing_data_ != TAO_MISSING_DATA_UNDEFINED)
01371     {
01372       /* PRE: q_data->missing_data_ > 0 as all QD on stack must be incomplete  */
01373       if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1)
01374         {
01375           if (TAO_debug_level > 0)
01376             {
01377               ACE_ERROR ((LM_ERROR,
01378                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01379                  ACE_TEXT ("error consolidating incoming message\n"),
01380                  this->id ()));
01381             }
01382           return -1;
01383         }
01384     }
01385   else
01386     {
01387       if (this->handle_input_parse_data (rh, max_wait_time) == -1)
01388         {
01389           if (TAO_debug_level > 0)
01390             {
01391               ACE_ERROR ((LM_ERROR,
01392                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01393                  ACE_TEXT ("error parsing incoming message\n"),
01394                  this->id ()));
01395             }
01396           return -1;
01397         }
01398     }
01399 
01400   return 0;
01401 }

int TAO_Transport::handle_input_missing_data TAO_Resume_Handle rh,
ACE_Time_Value max_wait_time,
TAO_Queued_Data q_data
[private]
 

Is invoked by handle_input operation. It consolidate message on top of incoming_message_stack. The amount of missing data is known and recv operation copies data directly into message buffer, as much as a single recv-invocation provides.

Definition at line 1548 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, consolidate_process_message(), ACE_CDR::grow(), incoming_message_stack_, ACE_Message_Block::length(), LM_DEBUG, TAO_Queued_Data::missing_data_, TAO_Queued_Data::msg_block_, TAO::Incoming_Message_Stack::pop(), recv(), ACE_Message_Block::space(), ssize_t, TAO_debug_level, and ACE_Message_Block::wr_ptr().

Referenced by handle_input().

01551 {
01552   // paranoid check
01553   if (q_data == 0)
01554     {
01555       return -1;
01556     }
01557 
01558   if (TAO_debug_level > 3)
01559     {
01560       ACE_DEBUG ((LM_DEBUG,
01561          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01562          ACE_TEXT ("enter (missing data == %d)\n"),
01563          this->id (), q_data->missing_data_));
01564     }
01565 
01566   size_t const recv_size = q_data->missing_data_;
01567 
01568   // make sure the message_block has enough space
01569   size_t const message_size =  recv_size
01570                                + q_data->msg_block_->length();
01571 
01572   if (q_data->msg_block_->space() < recv_size)
01573     {
01574       if (ACE_CDR::grow (q_data->msg_block_, message_size) == -1)
01575         {
01576           return -1;
01577         }
01578     }
01579 
01580   // Saving the size of the received buffer in case any one needs to
01581   // get the size of the message thats received in the
01582   // context. Obviously the value will be changed for each recv call
01583   // and the user is supposed to invoke the accessor only in the
01584   // invocation context to get meaningful information.
01585   this->recv_buffer_size_ = recv_size;
01586 
01587   // Read the message into the existing message block on heap
01588   ssize_t const n = this->recv (q_data->msg_block_->wr_ptr(),
01589                                 recv_size,
01590                                 max_wait_time);
01591 
01592 
01593   if (n <= 0)
01594     {
01595       return n;
01596     }
01597 
01598   if (TAO_debug_level > 3)
01599     {
01600       ACE_DEBUG ((LM_DEBUG,
01601          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01602          ACE_TEXT ("read bytes %d\n"),
01603          this->id (), n));
01604     }
01605 
01606   q_data->msg_block_->wr_ptr(n);
01607   q_data->missing_data_ -= n;
01608 
01609   if (q_data->missing_data_ == 0)
01610     {
01611       // paranoid check
01612       if (this->incoming_message_stack_.pop (q_data) == -1)
01613         {
01614           return -1;
01615         }
01616 
01617       if (this->consolidate_process_message (q_data, rh) == -1)
01618         {
01619           return -1;
01620         }
01621     }
01622 
01623   return 0;
01624 }

int TAO_Transport::handle_input_parse_data TAO_Resume_Handle rh,
ACE_Time_Value max_wait_time
[private]
 

Is invoked by handle_input operation. It parses new messages from input stream or consolidates messages whose header has been partially read, the message size being unknown so far. It parses as much data as a single recv-invocation provides.

Definition at line 1671 of file Transport.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT, allocate_partial_message_block(), consolidate_enqueue_message(), TAO_Pluggable_Messaging::consolidate_node(), ACE_Message_Block::copy(), TAO_Queued_Data::duplicate(), ACE_CDR::grow(), handle_input_parse_extra_messages(), TAO_Pluggable_Messaging::header_length(), incoming_message_queue_, incoming_message_stack_, TAO_ORB_Core::input_cdr_dblock_allocator(), ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO_ORB_Core::locking_strategy(), ACE_CDR::mb_align(), ACE_OS::memset(), messaging_object(), TAO_Queued_Data::missing_data_, TAO_Queued_Data::more_fragments_, TAO_Queued_Data::msg_block_, TAO_Queued_Data::msg_type_, notify_reactor(), TAO_ORB_Core::orb_params(), TAO_Pluggable_Messaging::parse_next_message(), partial_message_, TAO::Incoming_Message_Stack::pop(), process_parsed_messages(), process_queue_head(), TAO::Incoming_Message_Stack::push(), TAO_Incoming_Message_Queue::queue_length(), ACE_Message_Block::rd_ptr(), recv(), ACE_Message_Block::reset(), TAO_Resume_Handle::set_flag(), TAO_ORB_Parameters::single_read_optimization(), ACE_Message_Block::space(), ssize_t, TAO_debug_level, TAO_MAXBUFSIZE, TAO_MISSING_DATA_UNDEFINED, TAO_PLUGGABLE_MESSAGE_FRAGMENT, TAO::Incoming_Message_Stack::top(), and ACE_Message_Block::wr_ptr().

Referenced by handle_input().

01673 {
01674 
01675   if (TAO_debug_level > 3)
01676     {
01677       ACE_DEBUG ((LM_DEBUG,
01678          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01679          ACE_TEXT ("enter\n"),
01680          this->id ()));
01681     }
01682 
01683 
01684   // The buffer on the stack which will be used to hold the input
01685   // messages, ACE_CDR::MAX_ALIGNMENT compensates the
01686   // memory-alignment. This improves performance with SUN-Java-ORB-1.4
01687   // and higher that sends fragmented requests of size 1024 bytes.
01688   char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT];
01689 
01690 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
01691   (void) ACE_OS::memset (buf,
01692                          '\0',
01693                          sizeof buf);
01694 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
01695 
01696   // Create a data block
01697   ACE_Data_Block db (sizeof (buf),
01698                      ACE_Message_Block::MB_DATA,
01699                      buf,
01700                      this->orb_core_->input_cdr_buffer_allocator (),
01701                      this->orb_core_->locking_strategy (),
01702                      ACE_Message_Block::DONT_DELETE,
01703                      this->orb_core_->input_cdr_dblock_allocator ());
01704 
01705   // Create a message block
01706   ACE_Message_Block message_block (&db,
01707                                    ACE_Message_Block::DONT_DELETE,
01708                                    this->orb_core_->input_cdr_msgblock_allocator ());
01709 
01710 
01711   // Align the message block
01712   ACE_CDR::mb_align (&message_block);
01713 
01714   size_t recv_size = 0; // Note: unsigned integer
01715 
01716   // Pointer to newly parsed message
01717   TAO_Queued_Data *q_data = 0;
01718 
01719   // optimizing access of constants
01720   const size_t header_length =
01721             this->messaging_object ()->header_length ();
01722 
01723   // paranoid check
01724   if (header_length > message_block.space ())
01725     {
01726       return -1;
01727     }
01728 
01729   if (this->orb_core_->orb_params ()->single_read_optimization ())
01730     {
01731       recv_size =
01732         message_block.space ();
01733     }
01734   else
01735     {
01736       // Single read optimization has been de-activated. That means
01737       // that we need to read from transport the GIOP header first
01738       // before the payload. This codes first checks the incoming
01739       // stack for partial messages which needs to be
01740       // consolidated. Otherwise we are in new cycle, reading complete
01741       // GIOP header of new incoming message.
01742       if (this->incoming_message_stack_.top (q_data) != -1
01743            && q_data->missing_data_ == TAO_MISSING_DATA_UNDEFINED)
01744         {
01745           // There is a partial message on incoming_message_stack_
01746           // whose length is unknown so far. We need to consolidate
01747           // the GIOP header to get to know the payload size,
01748           recv_size = header_length - q_data->msg_block_->length ();
01749         }
01750       else
01751         {
01752           // Read amount of data forming GIOP header of new incoming
01753           // message.
01754           recv_size = header_length;
01755         }
01756       // POST: 0 <= recv_size <= header_length
01757     }
01758   // POST: 0 <= recv_size <= message_block->space ()
01759 
01760   // If we have a partial message, copy it into our message block and
01761   // clear out the partial message.
01762   if (this->partial_message_ != 0 && this->partial_message_->length () > 0)
01763     {
01764       // (*) Copy back the partial message into current read-buffer,
01765       // verify that the read-strategy of "recv_size" bytes is not
01766       // exceeded. The latter check guarantees that recv_size does not
01767       // roll-over and keeps in range
01768       // 0<=recv_size<=message_block->space()
01769       if (this->partial_message_->length () <= recv_size &&
01770           message_block.copy (this->partial_message_->rd_ptr (),
01771                               this->partial_message_->length ()) == 0)
01772         {
01773 
01774           recv_size -= this->partial_message_->length ();
01775           this->partial_message_->reset ();
01776         }
01777       else
01778         {
01779           return -1;
01780         }
01781     }
01782   // POST: 0 <= recv_size <= buffer_space
01783 
01784   if (0 >= recv_size) // paranoid: the check above (*) guarantees recv_size>=0
01785     {
01786       // This event would cause endless looping, trying frequently to
01787       // read zero bytes from stream.  This might happen, if TAOs
01788       // protocol implementation is not correct and tries to read data
01789       // beyond header without "single_read_optimazation" being
01790       // activated.
01791       if (TAO_debug_level > 0)
01792         {
01793           ACE_ERROR ((LM_ERROR,
01794              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01795              ACE_TEXT ("Error - endless loop detection, closing connection"),
01796              this->id ()));
01797         }
01798       return -1;
01799     }
01800 
01801   // Saving the size of the received buffer in case any one needs to
01802   // get the size of the message thats received in the
01803   // context. Obviously the value will be changed for each recv call
01804   // and the user is supposed to invoke the accessor only in the
01805   // invocation context to get meaningful information.
01806   this->recv_buffer_size_ = recv_size;
01807 
01808   // Read the message into the  message block that we have created on
01809   // the stack.
01810   const ssize_t n = this->recv (message_block.wr_ptr (),
01811                                 recv_size,
01812                                 max_wait_time);
01813 
01814   // If there is an error return to the reactor..
01815   if (n <= 0)
01816     {
01817       return n;
01818     }
01819 
01820   if (TAO_debug_level > 3)
01821     {
01822       ACE_DEBUG ((LM_DEBUG,
01823          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01824          ACE_TEXT ("read %d bytes\n"),
01825          this->id (), n));
01826     }
01827 
01828   // Set the write pointer in the stack buffer
01829   message_block.wr_ptr (n);
01830 
01831   //
01832   // STACK PROCESSING OR MESSAGE CONSOLIDATION
01833   //
01834 
01835   // PRE: data in buffer is aligned && message_block.length() > 0
01836 
01837   if (this->incoming_message_stack_.top (q_data) != -1
01838       && q_data->missing_data_ == TAO_MISSING_DATA_UNDEFINED)
01839     {
01840       //
01841       // MESSAGE CONSOLIDATION
01842       //
01843 
01844       // Partial message on incoming_message_stack_ needs to be
01845       // consolidated.  The message header could not be parsed so far
01846       // and therefor the message size is unknown yet. Consolidating
01847       // the message destroys the memory alignment of succeeding
01848       // messages sharing the buffer, for that reason consolidation
01849       // and stack based processing are mutial exclusive.
01850       if (this->messaging_object ()->consolidate_node (q_data,
01851                                                        message_block) == -1)
01852         {
01853            if (TAO_debug_level > 0)
01854             {
01855                 ACE_ERROR ((LM_ERROR,
01856                    ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01857                    ACE_TEXT ("error consolidating message from input buffer\n"),
01858                    this->id () ));
01859              }
01860            return -1;
01861         }
01862 
01863       // Complete message are to be enqueued and later processed
01864       if (q_data->missing_data_ == 0)
01865         {
01866           if (this->incoming_message_stack_.pop (q_data) == -1)
01867             {
01868               return -1;
01869             }
01870 
01871           if (this->consolidate_enqueue_message (q_data) == -1)
01872             {
01873               return -1;
01874             }
01875         }
01876 
01877       if (message_block.length () > 0
01878           && this->handle_input_parse_extra_messages (message_block) == -1)
01879         {
01880           return -1;
01881         }
01882 
01883       // In any case try to process the enqueued messages
01884       if (this->process_queue_head (rh) == -1)
01885         {
01886           return -1;
01887         }
01888     }
01889   else
01890     {
01891       //
01892       // STACK PROCESSING (critical path)
01893       //
01894 
01895       // Process the first message in buffer on stack
01896 
01897       // (PRE: first message resides in aligned memory) Make a node of
01898       // the message-block..
01899 
01900       TAO_Queued_Data qd (&message_block,
01901                           this->orb_core_->transport_message_buffer_allocator ());
01902 
01903       size_t mesg_length  = 0;
01904 
01905       if (this->messaging_object ()->parse_next_message (message_block,
01906                                                          qd,
01907                                                          mesg_length) == -1
01908           || (qd.missing_data_ == 0
01909               && mesg_length > message_block.length ()) )
01910         {
01911           // extracting message failed
01912           return -1;
01913         }
01914       // POST: qd.missing_data_ == 0 --> mesg_length <= message_block.length()
01915       // This prevents seeking rd_ptr behind the wr_ptr
01916 
01917       if (qd.missing_data_ != 0 ||
01918           qd.more_fragments_   ||
01919           qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
01920         {
01921           if (qd.missing_data_ == 0)
01922             {
01923               // Dealing with a fragment
01924               TAO_Queued_Data *nqd =
01925                 TAO_Queued_Data::duplicate (qd);
01926 
01927               if (nqd == 0)
01928                 {
01929                   return -1;
01930                 }
01931 
01932               // mark the end of message in new buffer
01933               char* end_mark = nqd->msg_block_->rd_ptr ()
01934                              + mesg_length;
01935               nqd->msg_block_->wr_ptr (end_mark);
01936 
01937               // move the read pointer forward in old buffer
01938               message_block.rd_ptr (mesg_length);
01939 
01940               // enqueue the message
01941               if (this->consolidate_enqueue_message (nqd) == -1)
01942                 {
01943                   return -1;
01944                 }
01945 
01946               if (message_block.length () > 0
01947                   && this->handle_input_parse_extra_messages (message_block) == -1)
01948                 {
01949                   return -1;
01950                 }
01951 
01952               // In any case try to process the enqueued messages
01953               if (this->process_queue_head (rh) == -1)
01954                 {
01955                   return -1;
01956                 }
01957             }
01958           else if (qd.missing_data_ != TAO_MISSING_DATA_UNDEFINED)
01959             {
01960               // Incomplete message, must be the last one in buffer
01961 
01962               if (qd.missing_data_ != TAO_MISSING_DATA_UNDEFINED &&
01963                   qd.missing_data_ > message_block.space ())
01964                 {
01965                   // Re-Allocate correct size on heap
01966                   if (ACE_CDR::grow (qd.msg_block_,
01967                                      message_block.length ()
01968                                      + qd.missing_data_) == -1)
01969                     {
01970                       return -1;
01971                     }
01972                 }
01973 
01974               TAO_Queued_Data *nqd =
01975                 TAO_Queued_Data::duplicate (qd);
01976 
01977               if (nqd == 0)
01978                 {
01979                   return -1;
01980                 }
01981 
01982               // move read-pointer to end of buffer
01983               message_block.rd_ptr (message_block.length());
01984 
01985               this->incoming_message_stack_.push (nqd);
01986             }
01987         }
01988       else
01989         {
01990           //
01991           // critical path
01992           //
01993 
01994           // We cant process the message on stack right now. First we
01995           // have got to parse extra messages from message_block,
01996           // putting them into queue.  When this is done we can return
01997           // to process this message, and notifying other threads to
01998           // process the messages in queue.
01999 
02000           char * end_marker = message_block.rd_ptr ()
02001                             + mesg_length;
02002 
02003           if (message_block.length () > mesg_length)
02004             {
02005               // There are more message in data stream to be parsed.
02006               // Safe the rd_ptr to restore later.
02007               char *rd_ptr_stack_mesg = message_block.rd_ptr ();
02008 
02009               // Skip parsed message, jump to next message in buffer
02010               // PRE: mesg_length <= message_block.length ()
02011               message_block.rd_ptr (mesg_length);
02012 
02013               // Extract remaining messages and enqueue them for later
02014               // heap processing
02015               if (this->handle_input_parse_extra_messages (message_block) == -1)
02016                 {
02017                   return -1;
02018                 }
02019 
02020               // correct the end_marker
02021               end_marker = message_block.rd_ptr ();
02022 
02023               // Restore rd_ptr
02024               message_block.rd_ptr (rd_ptr_stack_mesg);
02025             }
02026 
02027           // The following if-else has been copied from
02028           // process_queue_head().  While process_queue_head()
02029           // processes message on heap, here we will process a message
02030           // on stack.
02031 
02032           // Now that we have one message on stack to be processed,
02033           // check whether we have one more message in the queue...
02034           if (this->incoming_message_queue_.queue_length () > 0)
02035             {
02036               if (TAO_debug_level > 0)
02037                 {
02038                   ACE_DEBUG ((LM_DEBUG,
02039                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02040                      ACE_TEXT ("notify reactor\n"),
02041                      this->id ()));
02042 
02043                 }
02044 
02045               const int retval = this->notify_reactor ();
02046 
02047               if (retval == 1)
02048                 {
02049                   // Let the class know that it doesn't need to resume  the
02050                   // handle..
02051                   rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02052                 }
02053               else if (retval < 0)
02054                 return -1;
02055             }
02056           else
02057             {
02058               // As there are no further messages in queue just resume
02059               // the handle. Set the flag incase someone had reset the flag..
02060               rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02061             }
02062 
02063           // PRE: incoming_message_queue is empty
02064           if (this->process_parsed_messages (&qd,
02065                                              rh) == -1)
02066             {
02067               return -1;
02068             }
02069 
02070           // move the rd_ptr tp position of end_marker
02071           message_block.rd_ptr (end_marker);
02072         }
02073     }
02074 
02075   // Now that all cases have been processed, there might be kept some data
02076   // in buffer that needs to be safed for next "handle_input" invocations.
02077    if (message_block.length () > 0)
02078      {
02079        if (this->partial_message_ == 0)
02080          {
02081            this->allocate_partial_message_block ();
02082          }
02083 
02084        if (this->partial_message_ != 0 &&
02085            this->partial_message_->copy (message_block.rd_ptr (),
02086                                          message_block.length ()) == 0)
02087          {
02088            message_block.rd_ptr (message_block.length ());
02089          }
02090        else
02091          {
02092            return -1;
02093          }
02094      }
02095 
02096    return 0;
02097 }

int TAO_Transport::handle_input_parse_extra_messages ACE_Message_Block message_block  )  [private]
 

Is invoked by handle_input_parse_data. Parses all messages remaining in message_block.

Definition at line 1628 of file Transport.cpp.

References consolidate_enqueue_message(), TAO_Pluggable_Messaging::extract_next_message(), incoming_message_stack_, ACE_Message_Block::length(), messaging_object(), TAO_Queued_Data::missing_data_, and TAO::Incoming_Message_Stack::push().

Referenced by handle_input_parse_data().

01629 {
01630 
01631   // store buffer status of last extraction: -1 parse error, 0
01632   // incomplete message header in buffer, 1 complete messages header
01633   // parsed
01634   int buf_status = 0;
01635 
01636   TAO_Queued_Data *q_data = 0;     // init
01637 
01638   // parse buffer until all messages have been extracted, consolidate
01639   // and enqueue complete messages, if the last message being parsed
01640   // has missin data, it is stays on top of incoming_message_stack.
01641   while (message_block.length () > 0 &&
01642          (buf_status = this->messaging_object ()->extract_next_message
01643           (message_block, q_data)) != -1 &&
01644          q_data != 0) // paranoid check
01645     {
01646       if (q_data->missing_data_ == 0)
01647         {
01648           if (this->consolidate_enqueue_message (q_data) == -1)
01649             {
01650               return -1;
01651             }
01652         }
01653       else  // incomplete message read, probably the last message in buffer
01654         {
01655           // can not fail
01656           this->incoming_message_stack_.push (q_data);
01657         }
01658 
01659       q_data = 0; // reset
01660     } // while
01661 
01662   if (buf_status == -1)
01663     {
01664       return -1;
01665     }
01666 
01667   return 0;
01668 }

int TAO_Transport::handle_output void   ) 
 

Callback method to reactively drain the outgoing data queue.

Definition at line 453 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, drain_queue(), LM_DEBUG, and TAO_debug_level.

Referenced by TAO_Block_Flushing_Strategy::flush_message(), TAO_Block_Flushing_Strategy::flush_transport(), and TAO_Connection_Handler::handle_output_eh().

00454 {
00455   if (TAO_debug_level > 3)
00456     {
00457       ACE_DEBUG ((LM_DEBUG,
00458                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output\n"),
00459                   this->id ()));
00460     }
00461 
00462   // The flushing strategy (potentially via the Reactor) wants to send
00463   // more data, first check if there is a current message that needs
00464   // more sending...
00465   int const retval = this->drain_queue ();
00466 
00467   if (TAO_debug_level > 3)
00468     {
00469       ACE_DEBUG ((LM_DEBUG,
00470                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ")
00471                   ACE_TEXT ("drain_queue returns %d/%d\n"),
00472                   this->id (),
00473                   retval, errno));
00474     }
00475 
00476   // Any errors are returned directly to the Reactor
00477   return retval;
00478 }

int TAO_Transport::handle_timeout const ACE_Time_Value current_time,
const void *  act
 

Parameters:
current_time The current time as reported from the Reactor
act The Asynchronous Completion Token. Currently it is interpreted as follows:
  • If the ACT is the address of this->current_deadline_ the queueing timeout has expired and the queue should start flushing.
Returns:
Returns 0 if there are no problems, -1 if there is an error

Todo:
In the future this function could be used to expire messages (oneways) that have been sitting for too long on the queue.

Definition at line 775 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, current_deadline_, flush_timer_pending(), TAO_ORB_Core::flushing_strategy(), LM_DEBUG, orb_core(), reset_flush_timer(), TAO_Flushing_Strategy::schedule_output(), and TAO_debug_level.

Referenced by TAO_Transport_Timer::handle_timeout().

00777 {
00778   if (TAO_debug_level > 6)
00779     {
00780       ACE_DEBUG ((LM_DEBUG,
00781          ACE_TEXT ("TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, ")
00782          ACE_TEXT ("timer expired\n"),
00783          this->id ()));
00784     }
00785 
00786   /// This is the only legal ACT in the current configuration....
00787   if (act != &this->current_deadline_)
00788     {
00789       return -1;
00790     }
00791 
00792   if (this->flush_timer_pending ())
00793     {
00794       // The timer is always a oneshot timer, so mark is as not
00795       // pending.
00796       this->reset_flush_timer ();
00797 
00798       TAO_Flushing_Strategy *flushing_strategy =
00799         this->orb_core ()->flushing_strategy ();
00800       (void) flushing_strategy->schedule_output (this);
00801     }
00802 
00803   return 0;
00804 }

ACE_INLINE void TAO_Transport::id size_t  id  ) 
 

Definition at line 92 of file Transport.inl.

00093 {
00094   this->id_ = id;
00095 }

ACE_INLINE size_t TAO_Transport::id void   )  const
 

Set and Get the identifier for this transport instance.

If not set, this will return an integer representation of the this pointer for the instance on which it's called.

Definition at line 86 of file Transport.inl.

Referenced by TAO::Transport_Cache_Manager::bind_i(), TAO_Connection_Handler::close_connection_eh(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Transport_Cache_Manager::find_transport(), TAO_Connection_Handler::handle_input_internal(), TAO_Connector::parallel_connect(), TAO_GIOP_Message_Lite::parse_reply(), TAO_GIOP_Message_Base::process_reply_message(), TAO::Transport_Cache_Manager::purge(), TAO_GIOP_Utils::read_buffer(), TAO_GIOP_Message_Base::send_close_connection(), TAO_GIOP_Message_Lite::send_error(), TAO_GIOP_Message_Base::send_error(), TAO_GIOP_Message_Lite::send_reply_exception(), TAO_Connector::wait_for_connection_completion(), and TAO_Leader_Follower::wait_for_event().

00087 {
00088   return this->id_;
00089 }

bool TAO_Transport::idle_after_reply void   ) 
 

Request is sent and the reply is received. Idle the transport now.

Definition at line 239 of file Transport.cpp.

References TAO_Transport_Mux_Strategy::idle_after_reply(), and tms().

00240 {
00241   return this->tms ()->idle_after_reply ();
00242 }

bool TAO_Transport::idle_after_send void   ) 
 

Request has been just sent, but the reply is not received. Idle the transport now.

Definition at line 233 of file Transport.cpp.

References TAO_Transport_Mux_Strategy::idle_after_send(), and tms().

00234 {
00235   return this->tms ()->idle_after_send ();
00236 }

ACE_INLINE bool TAO_Transport::is_connected void   )  const
 

Is this transport really connected.

Definition at line 164 of file Transport.inl.

References ACE_GUARD_RETURN, and is_connected_.

Referenced by TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Synch_Oneway_Invocation::remote_oneway(), and TAO_Connector::wait_for_connection_completion().

00165 {
00166   ACE_GUARD_RETURN (ACE_Lock,
00167                     ace_mon,
00168                     *this->handler_lock_,
00169                     false);
00170 
00171   return this->is_connected_;
00172 }

ACE_INLINE CORBA::Boolean TAO_Transport::is_tcs_set  )  const
 

Return true if the tcs has been set.

Definition at line 152 of file Transport.inl.

References tcs_set_.

00153 {
00154   return tcs_set_;
00155 }

int TAO_Transport::make_idle void   ) 
 

Cache management.

Definition at line 429 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, LM_DEBUG, TAO::Transport_Cache_Manager::make_idle(), TAO_debug_level, and transport_cache_manager().

Referenced by TAO_Exclusive_TMS::idle_after_reply(), TAO_Muxed_TMS::idle_after_send(), TAO_IIOP_Connection_Handler::process_listen_point_list(), and TAO::Profile_Transport_Resolver::~Profile_Transport_Resolver().

00430 {
00431   if (TAO_debug_level > 3)
00432     {
00433       ACE_DEBUG ((LM_DEBUG,
00434                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"),
00435                   this->id ()));
00436     }
00437 
00438   return this->transport_cache_manager ().make_idle (this->cache_map_entry_);
00439 }

virtual int TAO_Transport::messaging_init CORBA::Octet  major,
CORBA::Octet  minor
[pure virtual]
 

Initialising the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects.

Implemented in TAO_IIOP_Transport.

virtual TAO_Pluggable_Messaging* TAO_Transport::messaging_object void   )  [pure virtual]
 

Return the messaging object that is used to format the data that needs to be sent.

Implemented in TAO_IIOP_Transport.

Referenced by allocate_partial_message_block(), consolidate_enqueue_message(), consolidate_process_message(), format_queue_message(), TAO_On_Demand_Fragmentation_Strategy::fragment(), generate_locate_request(), generate_request_header(), handle_input_parse_data(), handle_input_parse_extra_messages(), out_stream(), process_parsed_messages(), and send_connection_closed_notifications_i().

int TAO_Transport::notify_reactor void   )  [private]
 

Definition at line 2277 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, event_handler_i(), TAO_Wait_Strategy::is_registered(), LM_DEBUG, ACE_Reactor::notify(), orb_core(), TAO_ORB_Core::reactor(), and TAO_debug_level.

Referenced by handle_input_parse_data(), and process_queue_head().

02278 {
02279   if (!this->ws_->is_registered ())
02280     {
02281       return 0;
02282     }
02283 
02284   ACE_Event_Handler *eh = this->event_handler_i ();
02285 
02286   // Get the reactor associated with the event handler
02287   ACE_Reactor *reactor = this->orb_core ()->reactor ();
02288 
02289   if (TAO_debug_level > 0)
02290     {
02291       ACE_DEBUG ((LM_DEBUG,
02292          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02293          ACE_TEXT ("notify to Reactor\n"),
02294          this->id ()));
02295     }
02296 
02297 
02298   // Send a notification to the reactor...
02299   const int retval = reactor->notify (eh,
02300                                       ACE_Event_Handler::READ_MASK);
02301 
02302   if (retval < 0 && TAO_debug_level > 2)
02303     {
02304       // @@todo: need to think about what is the action that
02305       // we can take when we get here.
02306       ACE_DEBUG ((LM_DEBUG,
02307          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02308          ACE_TEXT ("notify to the reactor failed..\n"),
02309          this->id ()));
02310     }
02311 
02312   return 1;
02313 }

ACE_INLINE void TAO_Transport::opened_as TAO::Connection_Role   ) 
 

Definition at line 51 of file Transport.inl.

References opening_connection_role_.

00052 {
00053   this->opening_connection_role_ = role;
00054 }

ACE_INLINE TAO::Connection_Role TAO_Transport::opened_as void   )  const
 

Methods dealing with the role of the connection, e.g., CLIENT or SERVER. See CORBA 2.6 Specification, Section 15.5.1 for origin of definitions.

Definition at line 45 of file Transport.inl.

References opening_connection_role_.

Referenced by TAO::Wait_On_LF_No_Upcall::can_process_upcalls(), TAO_Connector::connect(), and TAO_IIOP_Connection_Handler::open().

00046 {
00047   return this->opening_connection_role_;
00048 }

void TAO_Transport::operator= const TAO_Transport  )  [private]
 

ACE_INLINE TAO_ORB_Core * TAO_Transport::orb_core void   )  const
 

Access the ORB that owns this connection.

Definition at line 14 of file Transport.inl.

Referenced by TAO::Wait_On_LF_No_Upcall::can_process_upcalls(), TAO_Connection_Handler::close_connection_eh(), drain_queue(), TAO::Transport_Cache_Manager::find_transport(), TAO_Reactive_Flushing_Strategy::flush_message(), TAO_Leader_Follower_Flushing_Strategy::flush_message(), TAO_Reactive_Flushing_Strategy::flush_transport(), TAO_Leader_Follower_Flushing_Strategy::flush_transport(), generate_request_header(), TAO_IIOP_Transport::generate_request_header(), handle_timeout(), TAO::Nested_Upcall_Guard::Nested_Upcall_Guard(), notify_reactor(), post_open(), send_asynchronous_message_i(), send_reply_message_i(), send_synchronous_message_i(), TAO_IIOP_Transport::set_bidir_context_info(), TAO_Wait_On_Read::wait(), TAO_Wait_On_Reactor::wait(), TAO_Wait_On_Leader_Follower::wait(), and TAO::Nested_Upcall_Guard::~Nested_Upcall_Guard().

00015 {
00016   return this->orb_core_;
00017 }

TAO_OutputCDR & TAO_Transport::out_stream void   ) 
 

Accessor for the output CDR stream.

Definition at line 2364 of file Transport.cpp.

References messaging_object(), and TAO_Pluggable_Messaging::out_stream().

Referenced by TAO::LocateRequest_Invocation::invoke(), and TAO::Synch_Oneway_Invocation::remote_oneway().

02365 {
02366   return this->messaging_object ()->out_stream ();
02367 }

bool TAO_Transport::post_connect_hook void   )  [virtual]
 

Hooks that can be overridden in concrete transports.

These hooks are invoked just after connection establishment (or after a connection is fetched from cache). The return value signifies whether the invoker should proceed with post connection establishment activities. Protocols like SSLIOP need this to verify whether connections already established have valid certificates. There are no pre_connect_hooks () since the transport doesn't exist before a connection establishment. :-)

Note:
The methods are not made const with a reason.

Definition at line 287 of file Transport.cpp.

Referenced by TAO_Connector::connect().

00288 {
00289   return true;
00290 }

bool TAO_Transport::post_open size_t  id  ) 
 

Perform all the actions when this transport get opened.

Definition at line 2370 of file Transport.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT, close_connection(), TAO_ORB_Core::flushing_strategy(), is_connected_, LM_ERROR, orb_core(), purge_entry(), queue_is_empty_i(), TAO_Wait_Strategy::register_handler(), TAO_Flushing_Strategy::schedule_output(), TAO_debug_level, and wait_strategy().

Referenced by TAO_IIOP_Connection_Handler::open().

02371 {
02372   this->id_ = id;
02373 
02374   {
02375     ACE_GUARD_RETURN (ACE_Lock,
02376                       ace_mon,
02377                       *this->handler_lock_,
02378                       false);
02379     this->is_connected_ = true;
02380   }
02381 
02382   // When we have data in our outgoing queue schedule ourselves
02383   // for output
02384   if (this->queue_is_empty_i ())
02385     return true;
02386 
02387   // If the wait strategy wants us to be registered with the reactor
02388   // then we do so. If registeration is required and it succeeds,
02389   // #REFCOUNT# becomes two.
02390   if (this->wait_strategy ()->register_handler () == 0)
02391     {
02392       TAO_Flushing_Strategy *flushing_strategy =
02393         this->orb_core ()->flushing_strategy ();
02394       (void) flushing_strategy->schedule_output (this);
02395     }
02396   else
02397     {
02398       // Registration failures.
02399 
02400       // Purge from the connection cache, if we are not in the cache, this
02401       // just does nothing.
02402       (void) this->purge_entry ();
02403 
02404       // Close the handler.
02405       (void) this->close_connection ();
02406 
02407       if (TAO_debug_level > 0)
02408         ACE_ERROR ((LM_ERROR,
02409            ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_connect , ")
02410            ACE_TEXT ("could not register the transport ")
02411            ACE_TEXT ("in the reactor.\n"),
02412            this->id ()));
02413 
02414       return false;
02415     }
02416 
02417   return true;
02418 }

int TAO_Transport::process_parsed_messages TAO_Queued_Data qd,
TAO_Resume_Handle rh
[protected]
 

Process the message by sending it to the higher layers of the ORB.

Definition at line 2101 of file Transport.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT, TAO_Pluggable_Messaging::discard_fragmented_message(), LM_DEBUG, LM_ERROR, messaging_object(), TAO_Queued_Data::missing_data_, TAO_Queued_Data::msg_type_, TAO_Pluggable_Messaging::process_reply_message(), TAO_Pluggable_Messaging::process_request_message(), TAO_Resume_Handle::resume_handle(), TAO_debug_level, TAO_PLUGGABLE_MESSAGE_CANCELREQUEST, TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION, TAO_PLUGGABLE_MESSAGE_LOCATEREPLY, TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST, TAO_PLUGGABLE_MESSAGE_MESSAGERROR, TAO_PLUGGABLE_MESSAGE_REPLY, TAO_PLUGGABLE_MESSAGE_REQUEST, and TAO_Pluggable_Message_Type.

Referenced by consolidate_process_message(), handle_input_parse_data(), and process_queue_head().

02103 {
02104   if (TAO_debug_level > 7)
02105     {
02106       ACE_DEBUG ((LM_DEBUG,
02107          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02108          ACE_TEXT ("entering (missing data == %d)\n"),
02109          this->id(), qd->missing_data_));
02110     }
02111 
02112   // Get the <message_type> that we have received
02113   const TAO_Pluggable_Message_Type t = qd->msg_type_;
02114 
02115   // int result = 0;
02116 
02117   if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION)
02118     {
02119       if (TAO_debug_level > 0)
02120         ACE_DEBUG ((LM_DEBUG,
02121            ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02122            ACE_TEXT ("received CloseConnection message - %m\n"),
02123            this->id()));
02124 
02125       // Return a "-1" so that the next stage can take care of
02126       // closing connection and the necessary memory management.
02127       return -1;
02128     }
02129   else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST ||
02130            t == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST)
02131     {
02132       // Let us resume the handle before we go ahead to process the
02133       // request. This will open up the handle for other threads.
02134       rh.resume_handle ();
02135 
02136       if (this->messaging_object ()->process_request_message (
02137             this,
02138             qd) == -1)
02139         {
02140           // Return a "-1" so that the next stage can take care of
02141           // closing connection and the necessary memory management.
02142           return -1;
02143         }
02144     }
02145   else if (t == TAO_PLUGGABLE_MESSAGE_REPLY ||
02146            t == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY)
02147     {
02148       rh.resume_handle ();
02149 
02150       TAO_Pluggable_Reply_Params params (this);
02151 
02152       if (this->messaging_object ()->process_reply_message (params,
02153                                                             qd) == -1)
02154         {
02155           if (TAO_debug_level > 0)
02156             ACE_DEBUG ((LM_DEBUG,
02157                ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02158                ACE_TEXT ("error in process_reply_message - %m\n"),
02159                this->id ()));
02160 
02161           return -1;
02162         }
02163 
02164     }
02165   else if (t == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST)
02166     {
02167       // The associated request might be incomplpete residing
02168       // fragmented in messaging object. We must make sure the
02169       // resources allocated by fragments are released.
02170 
02171       if (this->messaging_object ()->discard_fragmented_message (qd) == -1)
02172         {
02173           if (TAO_debug_level > 0)
02174             {
02175               ACE_ERROR ((LM_ERROR,
02176                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02177                  ACE_TEXT ("error processing CancelRequest\n"),
02178                  this->id ()));
02179             }
02180         }
02181 
02182       // We are not able to cancel requests being processed already;
02183       // this is declared as optional feature by CORBA, and TAO does
02184       // not support this currently.
02185 
02186       // Just continue processing, CancelRequest does not mean to cut
02187       // off the connection.
02188     }
02189   else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
02190     {
02191       if (TAO_debug_level > 0)
02192         {
02193           ACE_ERROR ((LM_ERROR,
02194              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02195              ACE_TEXT ("received MessageError, closing connection\n"),
02196              this->id ()));
02197         }
02198       return -1;
02199     }
02200 
02201   // If not, just return back..
02202   return 0;
02203 }

int TAO_Transport::process_queue_head TAO_Resume_Handle rh  )  [private]
 

Definition at line 2206 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, TAO_Incoming_Message_Queue::dequeue_head(), incoming_message_queue_, LM_DEBUG, notify_reactor(), process_parsed_messages(), TAO_Incoming_Message_Queue::queue_length(), TAO_Queued_Data::release(), TAO_Resume_Handle::set_flag(), and TAO_debug_level.

Referenced by handle_input(), and handle_input_parse_data().

02207 {
02208   if (TAO_debug_level > 3)
02209     {
02210       ACE_DEBUG ((LM_DEBUG,
02211          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"),
02212          this->id (), this->incoming_message_queue_.queue_length () ));
02213     }
02214 
02215   // See if  message in queue ...
02216   if (this->incoming_message_queue_.queue_length () > 0)
02217     {
02218       // Get the message on the head of the queue..
02219       TAO_Queued_Data *qd =
02220         this->incoming_message_queue_.dequeue_head ();
02221 
02222       if (TAO_debug_level > 3)
02223         {
02224           ACE_DEBUG ((LM_DEBUG,
02225              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02226              ACE_TEXT ("the size of the queue is [%d]\n"),
02227              this->id (),
02228              this->incoming_message_queue_.queue_length()));
02229         }
02230       // Now that we have pulled out out one message out of the queue,
02231       // check whether we have one more message in the queue...
02232       if (this->incoming_message_queue_.queue_length () > 0)
02233         {
02234           if (TAO_debug_level > 0)
02235             {
02236               ACE_DEBUG ((LM_DEBUG,
02237                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02238                  ACE_TEXT ("notify reactor\n"),
02239                  this->id ()));
02240 
02241             }
02242 
02243           const int retval = this->notify_reactor ();
02244 
02245           if (retval == 1)
02246             {
02247               // Let the class know that it doesn't need to resume  the
02248               // handle..
02249               rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02250             }
02251           else if (retval < 0)
02252             return -1;
02253         }
02254       else
02255         {
02256           // As we are ready to process the last message just resume
02257           // the handle. Set the flag incase someone had reset the flag..
02258           rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02259         }
02260 
02261       // Process the message...
02262       if (this->process_parsed_messages (qd, rh) == -1)
02263         {
02264           return -1;
02265         }
02266 
02267       // Delete the Queued_Data..
02268       TAO_Queued_Data::release (qd);
02269 
02270       return 0;
02271     }
02272 
02273   return 1;
02274 }

bool TAO_Transport::provide_blockable_handler TAO::Connection_Handler_Set handlers  ) 
 

Called by the cache when the ORB is shuting down.

Parameters:
handlers The TAO_Connection_Handler_Set into which the transport should place its handler if the transport has RW strategy on.
Returns:
true indicates a handler was added to the handler set. false indocates that the transport did not have a blockable handler that could be added.

Definition at line 219 of file Transport.cpp.

References add_reference(), TAO::Connection_Handler_Set, ACE_Unbounded_Set< T >::insert(), TAO_Wait_Strategy::non_blocking(), and opening_connection_role_.

00220 {
00221   if (this->ws_->non_blocking () ||
00222       this->opening_connection_role_ == TAO::TAO_SERVER_ROLE)
00223     return false;
00224 
00225   (void) this->add_reference ();
00226 
00227   h.insert (this->connection_handler_i ());
00228 
00229   return true;
00230 }

void TAO_Transport::provide_handler TAO::Connection_Handler_Set handlers  ) 
 

Added event handler to the handlers set.

Called by the cache when the cache is closing.

Parameters:
handlers The TAO_Connection_Handler_Set into which the transport should place its handler

Definition at line 211 of file Transport.cpp.

References add_reference(), TAO::Connection_Handler_Set, and ACE_Unbounded_Set< T >::insert().

00212 {
00213   (void) this->add_reference ();
00214 
00215   handlers.insert (this->connection_handler_i ());
00216 }

int TAO_Transport::purge_entry void   ) 
 

Cache management.

Definition at line 423 of file Transport.cpp.

References TAO::Transport_Cache_Manager::purge_entry(), and transport_cache_manager().

Referenced by TAO_Connection_Handler::close_connection_eh(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), post_open(), recache_transport(), and ~TAO_Transport().

00424 {
00425   return this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
00426 }

ACE_INLINE void TAO_Transport::purging_order unsigned long  value  ) 
 

Definition at line 76 of file Transport.inl.

References purging_order_.

00077 {
00078   // This should only be called by the Transport Cache Manager when
00079   // it is holding it's lock.
00080   // The transport should still be here since the cache manager still
00081   // has a reference to it.
00082   this->purging_order_ = value;
00083 }

ACE_INLINE unsigned long TAO_Transport::purging_order void   )  const
 

Get and Set the purging order. The purging strategy uses the set version to set the purging order.

Definition at line 70 of file Transport.inl.

References purging_order_.

Referenced by TAO_LRU_Connection_Purging_Strategy::update_item().

00071 {
00072   return this->purging_order_;
00073 }

ACE_INLINE int TAO_Transport::queue_is_empty void   ) 
 

Check if there are messages pending in the queue.

Returns:
1 if the queue is empty

Definition at line 98 of file Transport.inl.

References ACE_GUARD_RETURN, and queue_is_empty_i().

Referenced by TAO_Reactive_Flushing_Strategy::flush_transport(), TAO_Leader_Follower_Flushing_Strategy::flush_transport(), and TAO_Block_Flushing_Strategy::flush_transport().

00099 {
00100   ACE_GUARD_RETURN (ACE_Lock,
00101                     ace_mon,
00102                     *this->handler_lock_,
00103                     -1);
00104   return this->queue_is_empty_i ();
00105 }

int TAO_Transport::queue_is_empty_i void   )  [private]
 

Check if there are messages pending in the queue.

This version assumes that the lock is already held. Use with care!

Returns:
1 if the queue is empty

Definition at line 715 of file Transport.cpp.

Referenced by post_open(), queue_is_empty(), and TAO_Block_Flushing_Strategy::schedule_output().

00716 {
00717   return (this->head_ == 0);
00718 }

int TAO_Transport::queue_message_i const ACE_Message_Block message_block  )  [protected]
 

Queue a message for message_block.

Definition at line 1308 of file Transport.cpp.

References ACE_NEW_RETURN, and TAO_Queued_Message::push_back().

Referenced by format_queue_message(), and send_asynchronous_message_i().

01309 {
01310   TAO_Queued_Message *queued_message = 0;
01311   ACE_NEW_RETURN (queued_message,
01312                   TAO_Asynch_Queued_Message (message_block,
01313                                              this->orb_core_,
01314                                              0,
01315                                              1),
01316                   -1);
01317   queued_message->push_back (this->head_, this->tail_);
01318 
01319   return 0;
01320 }

int TAO_Transport::recache_transport TAO_Transport_Descriptor_Interface desc  ) 
 

Recache ourselves in the cache.

Todo:
Ideally the following should be inline.

purge_entry has a return value, use it

Definition at line 412 of file Transport.cpp.

References TAO::Transport_Cache_Manager::cache_transport(), purge_entry(), and transport_cache_manager().

Referenced by TAO_IIOP_Connection_Handler::process_listen_point_list().

00413 {
00414   // First purge our entry
00415   this->purge_entry ();
00416 
00417   // Then add ourselves to the cache
00418   return this->transport_cache_manager ().cache_transport (desc,
00419                                                            this);
00420 }

virtual ssize_t TAO_Transport::recv char *  buffer,
size_t  len,
const ACE_Time_Value timeout = 0
[pure virtual]
 

Read len bytes from into buf.

This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.

Parameters:
buffer ORB allocated buffer where the data should be @ The ACE_Time_Value *s is just a place holder for now. It is not clear this this is the best place to specify this. The actual timeout values will be kept in the Policies.

Implemented in TAO_IIOP_Transport.

Referenced by handle_input_missing_data(), handle_input_parse_data(), TAO_GIOP_Utils::read_buffer(), and TAO_GIOP_Utils::read_bytes_input().

ACE_INLINE size_t TAO_Transport::recv_buffer_size void   )  const
 

Accessor to recv_buffer_size_.

Definition at line 181 of file Transport.inl.

00182 {
00183   return this->recv_buffer_size_;
00184 }

int TAO_Transport::register_handler void   )  [virtual]
 

Register the handler with the reactor.

Register the handler with the reactor. This method is used by the Wait_On_Reactor strategy. The transport must register its event handler with the ORB's Reactor.

Todo:
I think this method is pretty much useless, the connections are *always* registered with the Reactor, except in thread-per-connection mode. In that case putting the connection in the Reactor would produce unpredictable results anyway.

Definition at line 299 of file Transport.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT, event_handler_i(), TAO_Wait_Strategy::is_registered(), LM_DEBUG, ACE_Event_Handler::reactor(), TAO_ORB_Core::reactor(), ACE_Reactor::register_handler(), and TAO_debug_level.

Referenced by TAO_Wait_On_Reactor::register_handler(), TAO_Wait_On_Leader_Follower::register_handler(), and TAO_Wait_On_Leader_Follower::sending_request().

00300 {
00301   if (TAO_debug_level > 4)
00302     {
00303       ACE_DEBUG ((LM_DEBUG,
00304                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"),
00305                   this->id ()));
00306     }
00307 
00308   ACE_Reactor * const r = this->orb_core_->reactor ();
00309 
00310   // @@note: This should be okay since the register handler call will
00311   // not make a nested call into the transport.
00312   ACE_GUARD_RETURN (ACE_Lock,
00313                     ace_mon,
00314                     *this->handler_lock_,
00315                     false);
00316 
00317   if (r == this->event_handler_i ()->reactor ())
00318     {
00319       return 0;
00320     }
00321 
00322   // Set the flag in the Connection Handler and in the Wait Strategy
00323   // @@Maybe we should set these flags after registering with the
00324   // reactor. What if the  registration fails???
00325   this->ws_->is_registered (true);
00326 
00327   // Register the handler with the reactor
00328   return r->register_handler (this->event_handler_i (),
00329                               ACE_Event_Handler::READ_MASK);
00330 }

ACE_Event_Handler::Reference_Count TAO_Transport::remove_reference void   ) 
 

Initialising the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects.

Definition at line 2358 of file Transport.cpp.

References event_handler_i(), and ACE_Event_Handler::remove_reference().

Referenced by TAO_Connection_Handler::close_handler(), TAO::Cache_IntId::operator=(), TAO::Transport_Cache_Manager::purge(), TAO_Asynch_Reply_Dispatcher_Base::transport(), TAO::Cache_IntId::~Cache_IntId(), TAO::Profile_Transport_Resolver::~Profile_Transport_Resolver(), TAO_Asynch_Reply_Dispatcher_Base::~TAO_Asynch_Reply_Dispatcher_Base(), and TAO_Thread_Per_Connection_Handler::~TAO_Thread_Per_Connection_Handler().

02359 {
02360   return this->event_handler_i ()->remove_reference ();
02361 }

void TAO_Transport::report_invalid_event_handler const char *  caller  )  [private]
 

Print out error messages if the event handler is not valid.

Definition at line 1106 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, ACE_TEXT_CHAR_TO_TCHAR, LM_DEBUG, and TAO_debug_level.

01107 {
01108   if (TAO_debug_level > 0)
01109     {
01110       ACE_DEBUG ((LM_DEBUG,
01111          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler")
01112          ACE_TEXT ("(%s) no longer associated with handler [tag=%d]\n"),
01113          this->id (), ACE_TEXT_CHAR_TO_TCHAR (caller), this->tag_));
01114     }
01115 }

ACE_INLINE void TAO_Transport::reset_flush_timer void   )  [private]
 

The flush timer expired or was explicitly cancelled, mark it as not pending

Definition at line 114 of file Transport.inl.

References current_deadline_, and flush_timer_id_.

Referenced by drain_queue_i(), and handle_timeout().

00115 {
00116   this->flush_timer_id_ = -1;
00117   this->current_deadline_ = ACE_Time_Value::zero;
00118 }

int TAO_Transport::schedule_output_i void   )  [private]
 

Schedule handle_output() callbacks.

Definition at line 722 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, event_handler_i(), ACE_Reactor::find_handler(), ACE_Event_Handler::get_handle(), LM_DEBUG, ACE_Event_Handler::reactor(), ACE_Event_Handler::remove_reference(), ACE_Reactor::schedule_wakeup(), and TAO_debug_level.

Referenced by TAO_Reactive_Flushing_Strategy::schedule_output(), and TAO_Leader_Follower_Flushing_Strategy::schedule_output().

00723 {
00724   ACE_Event_Handler *eh = this->event_handler_i ();
00725   ACE_Reactor *reactor = eh->reactor ();
00726 
00727   // Check to see if our event handler is still registered with the
00728   // reactor.  It's possible for another thread to have run close_connection()
00729   // since we last used the event handler.
00730   ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ());
00731   if (found != eh)
00732     {
00733       if(TAO_debug_level > 3)
00734         {
00735           ACE_DEBUG ((LM_DEBUG,
00736                       "TAO (%P|%t) - Transport[%d]::schedule_output_i "
00737                       "event handler not found in reactor, returning -1\n",
00738                       this->id ()));
00739         }
00740       if (found)
00741         {
00742           found->remove_reference ();
00743         }
00744       return -1;
00745     }
00746   found->remove_reference ();
00747 
00748   if (TAO_debug_level > 3)
00749     {
00750       ACE_DEBUG ((LM_DEBUG,
00751          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"),
00752          this->id ()));
00753     }
00754 
00755   return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00756 }

virtual ssize_t TAO_Transport::send iovec iov,
int  iovcnt,
size_t &  bytes_transferred,
const ACE_Time_Value timeout = 0
[pure virtual]
 

Write the complete Message_Block chain to the connection.

This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.

Often the implementation simply forwards the arguments to the underlying ACE_Svc_Handler class. Using the code factored out into ACE.

Be careful with protocols that perform non-trivial transformations of the data, such as SSLIOP or protocols that compress the stream.

Parameters:
iov contains the data that must be sent.
timeout is the maximum time that the application is willing to wait for the data to be sent, useful in platforms that implement timed writes. The timeout value is obtained from the policies set by the application.
bytes_transferred should return the total number of bytes successfully transferred before the connection blocked. This is required because in some platforms and/or protocols multiple system calls may be required to send the chain of message blocks. The first few calls can work successfully, but the final one can fail or signal a flow control situation (via EAGAIN). In this case the ORB expects the function to return -1, errno to be appropriately set and this argument to return the number of bytes already on the OS I/O subsystem.
This call can also fail if the transport instance is no longer associated with a connection (e.g., the connection handler closed down). In that case, it returns -1 and sets errno to ENOENT.

Implemented in TAO_IIOP_Transport.

Referenced by drain_queue_helper().

int TAO_Transport::send_asynchronous_message_i TAO_Stub stub,
const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time
[private]
 

Send an asynchronous message, i.e. do not block until the message is on the wire

Definition at line 1161 of file Transport.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT, check_buffering_constraints_i(), ACE_Message_Block::cont(), ETIME, EWOULDBLOCK, TAO_Flushing_Strategy::flush_transport(), TAO_ORB_Core::flushing_strategy(), ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO::Transport_Queueing_Strategy::must_queue(), orb_core(), queue_message_i(), TAO_Flushing_Strategy::schedule_output(), send_message_block_chain_i(), ssize_t, TAO_debug_level, ACE_Message_Block::total_length(), and TAO_Stub::transport_queueing_strategy().

Referenced by send_message_shared_i().

01164 {
01165   // Let's figure out if the message should be queued without trying
01166   // to send first:
01167   bool try_sending_first = true;
01168 
01169   const bool queue_empty = (this->head_ == 0);
01170 
01171   if (!queue_empty)
01172     {
01173       try_sending_first = false;
01174     }
01175   else if (stub->transport_queueing_strategy ().must_queue (queue_empty))
01176     {
01177       try_sending_first = false;
01178     }
01179 
01180   if (try_sending_first)
01181     {
01182       ssize_t n = 0;
01183       size_t byte_count = 0;
01184       // ... in this case we must try to send the message first ...
01185 
01186       const size_t total_length = message_block->total_length ();
01187 
01188       if (TAO_debug_level > 6)
01189         {
01190           ACE_DEBUG ((LM_DEBUG,
01191              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01192              ACE_TEXT ("trying to send the message (ml = %d)\n"),
01193              this->id (), total_length));
01194         }
01195 
01196       // @@ I don't think we want to hold the mutex here, however if
01197       // we release it we need to recheck the status of the transport
01198       // after we return... once I understand the final form for this
01199       // code I will re-visit this decision
01200       n = this->send_message_block_chain_i (message_block,
01201                                             byte_count,
01202                                             max_wait_time);
01203       if (n == -1)
01204         {
01205           // ... if this is just an EWOULDBLOCK we must schedule the
01206           // message for later, if it is ETIME we still have to send
01207           // the complete message, because cutting off the message at
01208           // this point will destroy the synchronization with the
01209           // server ...
01210           if (errno != EWOULDBLOCK && errno != ETIME)
01211             {
01212               if (TAO_debug_level > 0)
01213                 {
01214                   ACE_ERROR ((LM_ERROR,
01215                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01216                      ACE_TEXT ("fatal error in ")
01217                      ACE_TEXT ("send_message_block_chain_i - %m\n"),
01218                      this->id ()));
01219                 }
01220               return -1;
01221             }
01222         }
01223 
01224       // ... let's figure out if the complete message was sent ...
01225       if (total_length == byte_count)
01226         {
01227           // Done, just return.  Notice that there are no allocations
01228           // or copies up to this point (though some fancy calling
01229           // back and forth).
01230           // This is the common case for the critical path, it should
01231           // be fast.
01232           return 0;
01233         }
01234 
01235       if (TAO_debug_level > 6)
01236         {
01237           ACE_DEBUG ((LM_DEBUG,
01238              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01239              ACE_TEXT ("partial send %d / %d bytes\n"),
01240              this->id (), byte_count, total_length));
01241         }
01242 
01243       // ... part of the data was sent, need to figure out what piece
01244       // of the message block chain must be queued ...
01245       while (message_block != 0 && message_block->length () == 0)
01246         {
01247           message_block = message_block->cont ();
01248         }
01249 
01250       // ... at least some portion of the message block chain should
01251       // remain ...
01252     }
01253 
01254   // ... either the message must be queued or we need to queue it
01255   // because it was not completely sent out ...
01256 
01257   if (TAO_debug_level > 6)
01258     {
01259       ACE_DEBUG ((LM_DEBUG,
01260          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01261          ACE_TEXT ("message is queued\n"),
01262          this->id ()));
01263     }
01264 
01265   if (this->queue_message_i(message_block) == -1)
01266   {
01267     if (TAO_debug_level > 0)
01268     ACE_DEBUG ((LM_DEBUG,
01269        ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01270        ACE_TEXT ("cannot queue message for ")
01271        ACE_TEXT (" - %m\n"),
01272        this->id ()));
01273     return -1;
01274   }
01275 
01276   // ... if the queue is full we need to activate the output on the
01277   // queue ...
01278   bool must_flush = false;
01279   const bool constraints_reached =
01280     this->check_buffering_constraints_i (stub,
01281                                          must_flush);
01282 
01283   // ... but we also want to activate it if the message was partially
01284   // sent.... Plus, when we use the blocking flushing strategy the
01285   // queue is flushed as a side-effect of 'schedule_output()'
01286 
01287   TAO_Flushing_Strategy *flushing_strategy =
01288     this->orb_core ()->flushing_strategy ();
01289 
01290   if (constraints_reached || try_sending_first)
01291     {
01292       (void) flushing_strategy->schedule_output (this);
01293     }
01294 
01295   if (must_flush)
01296     {
01297       typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
01298       TAO_REVERSE_LOCK reverse (*this->handler_lock_);
01299       ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
01300 
01301       (void) flushing_strategy->flush_transport (this);
01302     }
01303 
01304   return 0;
01305 }

void TAO_Transport::send_connection_closed_notifications void   ) 
 

Notify all the components inside a Transport when the underlying connection is closed.

Definition at line 1118 of file Transport.cpp.

References ACE_GUARD, TAO_Transport_Mux_Strategy::connection_closed(), send_connection_closed_notifications_i(), and tms().

Referenced by TAO_Connection_Handler::close_connection_eh().

01119 {
01120   {
01121     ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
01122 
01123     this->send_connection_closed_notifications_i ();
01124   }
01125 
01126   this->tms ()->connection_closed ();
01127 }

void TAO_Transport::send_connection_closed_notifications_i void   )  [private]
 

Assume the lock is held.

Definition at line 1130 of file Transport.cpp.

References cleanup_queue_i(), messaging_object(), and TAO_Pluggable_Messaging::reset().

Referenced by send_connection_closed_notifications().

01131 {
01132   this->cleanup_queue_i ();
01133 
01134   this->messaging_object ()->reset ();
01135 }

virtual int TAO_Transport::send_message TAO_OutputCDR stream,
TAO_Stub stub = 0,
int  message_semantics = TAO_Transport::TAO_TWOWAY_REQUEST,
ACE_Time_Value max_time_wait = 0
[pure virtual]
 

Once the ORB is prepared to receive a reply (see send_request() above), and all the arguments have been marshaled the CDR stream must be 'formatted', i.e. the message_size field in the GIOP header can finally be set to the proper value.

Implemented in TAO_IIOP_Transport.

Referenced by TAO_On_Demand_Fragmentation_Strategy::fragment(), TAO_GIOP_Message_Lite::make_send_locate_reply(), TAO_GIOP_Message_Base::make_send_locate_reply(), TAO_GIOP_Message_Lite::process_request(), TAO_GIOP_Message_Base::process_request(), TAO_ServerRequest::send_cached_reply(), TAO_ServerRequest::send_no_exception_reply(), TAO_GIOP_Message_Lite::send_reply_exception(), TAO_GIOP_Message_Base::send_reply_exception(), TAO_ServerRequest::tao_send_reply(), and TAO_ServerRequest::tao_send_reply_exception().

int TAO_Transport::send_message_block_chain const ACE_Message_Block message_block,
size_t &  bytes_transferred,
ACE_Time_Value max_wait_time = 0
 

Send a message block chain,.

Definition at line 490 of file Transport.cpp.

References ACE_GUARD_RETURN, and send_message_block_chain_i().

Referenced by TAO_GIOP_Message_Base::send_close_connection(), TAO_GIOP_Message_Lite::send_error(), and TAO_GIOP_Message_Base::send_error().

00493 {
00494   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00495 
00496   return this->send_message_block_chain_i (mb,
00497                                            bytes_transferred,
00498                                            max_wait_time);
00499 }

int TAO_Transport::send_message_block_chain_i const ACE_Message_Block message_block,
size_t &  bytes_transferred,
ACE_Time_Value max_wait_time
 

Send a message block chain, assuming the lock is held.

Definition at line 502 of file Transport.cpp.

References drain_queue_i(), TAO_Synch_Queued_Message::message_length(), TAO_Queued_Message::push_back(), TAO_Queued_Message::remove_from_list(), and ACE_Message_Block::total_length().

Referenced by send_asynchronous_message_i(), and send_message_block_chain().

00505 {
00506   size_t const total_length = mb->total_length ();
00507 
00508   // We are going to block, so there is no need to clone
00509   // the message block.
00510   TAO_Synch_Queued_Message synch_message (mb,
00511                                           this->orb_core_);
00512 
00513   synch_message.push_back (this->head_, this->tail_);
00514 
00515   int const n = this->drain_queue_i ();
00516 
00517   if (n == -1)
00518     {
00519       synch_message.remove_from_list (this->head_, this->tail_);
00520       return -1; // Error while sending...
00521     }
00522   else if (n == 1)
00523     {
00524       bytes_transferred = total_length;
00525       return 1;  // Empty queue, message was sent..
00526     }
00527 
00528   // Remove the temporary message from the queue...
00529   synch_message.remove_from_list (this->head_, this->tail_);
00530 
00531   bytes_transferred =
00532     total_length - synch_message.message_length ();
00533 
00534   return 0;
00535 }

int TAO_Transport::send_message_shared TAO_Stub stub,
int  message_semantics,
const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time
[virtual]
 

Sent the contents of message_block.

Parameters:
stub The object reference used for this operation, useful to obtain the current policies.
message_semantics If this is set to TAO_TWO_REQUEST this method will block until the operation is completely written on the wire. If it is set to other values this operation could return.
message_block The CDR encapsulation of the GIOP message that must be sent. The message may consist of multiple Message Blocks chained through the cont() field.
max_wait_time The maximum time that the operation can block, used in the implementation of timeouts.

Reimplemented in TAO_IIOP_Transport.

Definition at line 261 of file Transport.cpp.

References ACE_GUARD_RETURN, close_connection(), and send_message_shared_i().

00265 {
00266   int result = 0;
00267 
00268   {
00269     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00270 
00271     result =
00272       this->send_message_shared_i (stub, message_semantics,
00273                                    message_block, max_wait_time);
00274   }
00275 
00276   if (result == -1)
00277     {
00278       this->close_connection ();
00279     }
00280 
00281   return result;
00282 }

int TAO_Transport::send_message_shared_i TAO_Stub stub,
int  message_semantics,
const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time
[protected]
 

Implement send_message_shared() assuming the handler_lock_ is held.

Definition at line 1138 of file Transport.cpp.

References send_asynchronous_message_i(), send_reply_message_i(), send_synchronous_message_i(), TAO_ONEWAY_REQUEST, TAO_REPLY, and TAO_TWOWAY_REQUEST.

Referenced by send_message_shared(), and TAO_IIOP_Transport::send_message_shared().

01142 {
01143   switch (message_semantics)
01144     {
01145       case TAO_Transport::TAO_TWOWAY_REQUEST:
01146         return this->send_synchronous_message_i (message_block,
01147                                                  max_wait_time);
01148       case TAO_Transport::TAO_REPLY:
01149         return this->send_reply_message_i (message_block,
01150                                            max_wait_time);
01151       case TAO_Transport::TAO_ONEWAY_REQUEST:
01152         return this->send_asynchronous_message_i (stub,
01153                                                   message_block,
01154                                                   max_wait_time);
01155     }
01156 
01157   return -1;
01158 }

int TAO_Transport::send_reply_message_i const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time
[private]
 

Send a reply message, i.e. do not block until the message is on the wire, but just return after adding them to the queue.

Definition at line 630 of file Transport.cpp.

References ACE_DEBUG, ACE_TEXT, TAO_Synch_Queued_Message::clone(), TAO_Queued_Message::destroy(), TAO_ORB_Core::flushing_strategy(), LM_DEBUG, orb_core(), TAO_Queued_Message::push_back(), TAO_Queued_Message::remove_from_list(), TAO_Flushing_Strategy::schedule_output(), send_synch_message_helper_i(), and TAO_debug_level.

Referenced by send_message_shared_i().

00632 {
00633   // Dont clone now.. We could be sent in one shot!
00634   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00635 
00636   synch_message.push_back (this->head_,
00637                            this->tail_);
00638 
00639   int const n =
00640     this->send_synch_message_helper_i (synch_message,
00641                                        max_wait_time);
00642 
00643   if (n == -1 || n == 1)
00644     {
00645       return n;
00646     }
00647 
00648   if (TAO_debug_level > 3)
00649     {
00650       ACE_DEBUG ((LM_DEBUG,
00651          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ")
00652          ACE_TEXT ("preparing to add to queue before leaving \n"),
00653          this->id ()));
00654     }
00655 
00656   // Till this point we shouldn't have any copying and that is the
00657   // point anyway. Now, remove the node from the list
00658   synch_message.remove_from_list (this->head_,
00659                                   this->tail_);
00660 
00661   // Clone the node that we have.
00662   TAO_Queued_Message *msg =
00663     synch_message.clone (this->orb_core_->transport_message_buffer_allocator ());
00664 
00665   // Stick it in the queue
00666   msg->push_back (this->head_,
00667                   this->tail_);
00668 
00669   TAO_Flushing_Strategy *flushing_strategy =
00670     this->orb_core ()->flushing_strategy ();
00671 
00672   int result = flushing_strategy->schedule_output (this);
00673 
00674   if (result == -1)
00675     {
00676       if (TAO_debug_level > 5)
00677         {
00678           ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_"
00679                       "message_i dequeuing msg due to schedule_output "
00680                       "failure\n", this->id ()));
00681         }
00682       msg->remove_from_list (this->head_, this->tail_);
00683       msg->destroy ();
00684     }
00685 
00686   return 1;
00687 }

virtual int TAO_Transport::send_request TAO_Stub stub,
TAO_ORB_Core orb_core,
TAO_OutputCDR stream,
int  message_semantics,
ACE_Time_Value max_time_wait
[pure virtual]
 

Preparing the ORB to receive the reply only once the request is completely sent opens the system to some subtle race conditions: suppose the ORB is running in a multi-threaded configuration, thread A makes a request while thread B is using the Reactor to process all incoming requests. Thread A could be implemented as follows: 1) send the request 2) setup the ORB to receive the reply 3) wait for the request

but in this case thread B may receive the reply between step (1) and (2), and drop it as an invalid or unexpected message. Consequently the correct implementation is: 1) setup the ORB to receive the reply 2) send the request 3) wait for the reply

The following method encapsulates this idiom.

Todo:
This is generic code, it should be factored out into the Transport class.

Implemented in TAO_IIOP_Transport.

Referenced by TAO::Remote_Invocation::send_message().

int TAO_Transport::send_synch_message_helper_i TAO_Synch_Queued_Message s,
ACE_Time_Value max_wait_time
[private]
 

A helper method used by send_synchronous_message_i() and send_reply_message_i(). Reusable code that could be used by both the methods.

Definition at line 690 of file Transport.cpp.

References TAO_Synch_Queued_Message::all_data_sent(), drain_queue_i(), and TAO_Queued_Message::remove_from_list().

Referenced by send_reply_message_i(), and send_synchronous_message_i().

00692 {
00693   // @@todo: Need to send timeouts for writing..
00694   int const n = this->drain_queue_i ();
00695 
00696   if (n == -1)
00697     {
00698       synch_message.remove_from_list (this->head_, this->tail_);
00699       return -1; // Error while sending...
00700     }
00701   else if (n == 1)
00702     {
00703       return 1;  // Empty queue, message was sent..
00704     }
00705 
00706   if (synch_message.all_data_sent ())
00707     {
00708       return 1;
00709     }
00710 
00711   return 0;
00712 }

int TAO_Transport::send_synchronous_message_i const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time
[private]
 

Send a synchronous message, i.e. block until the message is on the wire

Definition at line 538 of file Transport.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, ACE_NEW_RETURN, ACE_TEXT, TAO_Synch_Queued_Message::current_block(), ETIME, TAO_Flushing_Strategy::flush_message(), TAO_ORB_Core::flushing_strategy(), LM_ERROR, orb_core(), TAO_Queued_Message::push_back(), TAO_Queued_Message::push_front(), TAO_Queued_Message::remove_from_list(), TAO_Flushing_Strategy::schedule_output(), send_synch_message_helper_i(), and TAO_debug_level.

Referenced by send_message_shared_i().

00540 {
00541   // We are going to block, so there is no need to clone
00542   // the message block.
00543   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00544 
00545   // Push synch_message on to the back of the queue.
00546   synch_message.push_back (this->head_, this->tail_);
00547 
00548   int const n =
00549     this->send_synch_message_helper_i (synch_message,
00550                                        max_wait_time);
00551 
00552   if (n == -1 || n == 1)
00553     {
00554       return n;
00555     }
00556 
00557   // @todo: Check for timeouts!
00558   // if (max_wait_time != 0 && errno == ETIME) return -1;
00559   TAO_Flushing_Strategy *flushing_strategy =
00560     this->orb_core ()->flushing_strategy ();
00561   (void) flushing_strategy->schedule_output (this);
00562 
00563   // Release the mutex, other threads may modify the queue as we
00564   // block for a long time writing out data.
00565   int result;
00566   {
00567     typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00568     TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00569     ACE_GUARD_RETURN (TAO_REVERSE_LOCK,
00570                       ace_mon,
00571                       reverse,
00572                       -1);
00573 
00574     result = flushing_strategy->flush_message (this,
00575                                                &synch_message,
00576                                                max_wait_time);
00577   }
00578 
00579   if (result == -1)
00580     {
00581       synch_message.remove_from_list (this->head_, this->tail_);
00582 
00583       if (errno == ETIME)
00584         {
00585           if (this->head_ == &synch_message)
00586             {
00587               // This is a timeout, there is only one nasty case: the
00588               // message has been partially sent!  We simply cannot take
00589               // the message out of the queue, because that would corrupt
00590               // the connection.
00591               //
00592               // What we do is replace the queued message with an
00593               // asynchronous message, that contains only what remains of
00594               // the timed out request.  If you think about sending
00595               // CancelRequests in this case: there is no much point in
00596               // doing that: the receiving ORB would probably ignore it,
00597               // and figuring out the request ID would be a bit of a
00598               // nightmare.
00599               //
00600 
00601               synch_message.remove_from_list (this->head_, this->tail_);
00602               TAO_Queued_Message *queued_message = 0;
00603               ACE_NEW_RETURN (queued_message,
00604                               TAO_Asynch_Queued_Message (
00605                                   synch_message.current_block (),
00606                                   this->orb_core_,
00607                                   0,
00608                                   1),
00609                               -1);
00610               queued_message->push_front (this->head_, this->tail_);
00611             }
00612         }
00613 
00614       if (TAO_debug_level > 0)
00615         {
00616           ACE_ERROR ((LM_ERROR,
00617              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ")
00618              ACE_TEXT ("error while flushing message - %m\n"),
00619              this->id ()));
00620         }
00621 
00622       return -1;
00623     }
00624 
00625   return 1;
00626 }

ACE_INLINE size_t TAO_Transport::sent_byte_count void   )  const
 

Accessor to sent_byte_count_.

Definition at line 187 of file Transport.inl.

References sent_byte_count_.

00188 {
00189   return this->sent_byte_count_;
00190 }

TAO_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE CORBA::ULong TAO_Transport::tag void   )  const
 

Return the protocol tag.

The OMG assigns unique tags (a 32-bit unsigned number) to each protocol. New protocol tags can be obtained free of charge from the OMG, check the documents in corbafwd.h for more details.

Definition at line 8 of file Transport.inl.

00009 {
00010   return this->tag_;
00011 }

int TAO_Transport::tear_listen_point_list TAO_InputCDR cdr  )  [virtual]
 

Extracts the list of listen points from the cdr stream. The list would have the protocol specific details of the ListenPoints

Reimplemented in TAO_IIOP_Transport.

Definition at line 255 of file Transport.cpp.

References ACE_NOTSUP_RETURN.

Referenced by TAO_GIOP_Message_Generator_Parser_12::process_bidir_context().

00256 {
00257   ACE_NOTSUP_RETURN (-1);
00258 }

ACE_INLINE TAO_Transport_Mux_Strategy * TAO_Transport::tms void   )  const
 

Get the TAO_Tranport_Mux_Strategy used by this object.

The role of the TAO_Transport_Mux_Strategy is described in more detail in that class' documentation. Enough is to say that the class is used to control how many threads can have pending requests over the same connection. Multiplexing multiple threads over the same connection conserves resources and is almost required for AMI, but having only one pending request per connection is more efficient and reduces the possibilities of priority inversions.

Definition at line 20 of file Transport.inl.

Referenced by idle_after_reply(), idle_after_send(), TAO::LocateRequest_Invocation_Adapter::invoke(), TAO::Invocation_Adapter::invoke_remote_i(), TAO_GIOP_Message_Lite::parse_reply(), TAO_GIOP_Message_Base::process_reply_message(), and send_connection_closed_notifications().

00021 {
00022   return tms_;
00023 }

TAO::Transport_Cache_Manager & TAO_Transport::transport_cache_manager void   )  [private]
 

Helper method that returns the Transport Cache Manager.

Definition at line 2316 of file Transport.cpp.

References TAO_ORB_Core::lane_resources(), and TAO_Thread_Lane_Resources::transport_cache().

Referenced by make_idle(), purge_entry(), recache_transport(), and update_transport().

02317 {
02318   return this->orb_core_->lane_resources ().transport_cache ();
02319 }

int TAO_Transport::update_transport void   ) 
 

Cache management.

Definition at line 442 of file Transport.cpp.

References transport_cache_manager(), and TAO::Transport_Cache_Manager::update_entry().

Referenced by TAO_Connection_Handler::handle_input_internal(), TAO_Connection_Handler::handle_output_eh(), and TAO_Connection_Handler::svc_i().

00443 {
00444   return this->transport_cache_manager ().update_entry (this->cache_map_entry_);
00445 }

ACE_INLINE TAO_Wait_Strategy * TAO_Transport::wait_strategy void   )  const
 

Return the TAO_Wait_Strategy used by this object.

The role of the TAO_Wait_Strategy is described in more detail in that class' documentation. Enough is to say that the ORB can wait for a reply blocking on read(), using the Reactor to wait for multiple events concurrently or using the Leader/Followers protocol.

Definition at line 27 of file Transport.inl.

Referenced by TAO_Connection_Handler::close_connection_eh(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Transport_Cache_Manager::find_transport(), TAO_Connection_Handler::handle_input_eh(), TAO_IIOP_Connection_Handler::open(), and post_open().

00028 {
00029   return this->ws_;
00030 }

ACE_INLINE void TAO_Transport::wchar_translator TAO_Codeset_Translator_Base  ) 
 

CodeSet negotiation - Set the wchar codeset translator factory.

Definition at line 144 of file Transport.inl.

References tcs_set_.

00145 {
00146   this->wchar_translator_ = tf;
00147   this->tcs_set_ = 1;
00148 }

ACE_INLINE TAO_Codeset_Translator_Base * TAO_Transport::wchar_translator void   )  const
 

CodeSet Negotiation - Get the wchar codeset translator factory.

Definition at line 131 of file Transport.inl.

00132 {
00133   return this->wchar_translator_;
00134 }


Friends And Related Function Documentation

friend class TAO_Block_Flushing_Strategy [friend]
 

This class needs priviledged access to

Definition at line 785 of file Transport.h.

friend class TAO_Leader_Follower_Flushing_Strategy [friend]
 

Definition at line 803 of file Transport.h.

friend class TAO_Reactive_Flushing_Strategy [friend]
 

These classes need privileged access to:

Definition at line 802 of file Transport.h.

friend class TAO_Thread_Per_Connection_Handler [friend]
 

Needs priveleged access to event_handler_i ()

Definition at line 807 of file Transport.h.


Member Data Documentation

int TAO_Transport::bidirectional_flag_ [protected]
 

Have we sent any info on bidirectional information or have we received any info regarding making the connection served by this transport bidirectional. The flag is used as follows: + We dont want to send the bidirectional context info more than once on the connection. Why? Waste of marshalling and demarshalling time on the client. + On the server side -- once a client that has established the connection asks the server to use the connection both ways, we *dont* want the server to pack service info to the client. That is not allowed. We need a flag to prevent such a things from happening.

The value of this flag will be 0 if the client sends info and 1 if the server receives the info.

Definition at line 958 of file Transport.h.

Referenced by bidirectional_flag().

TAO::Transport_Cache_Manager::HASH_MAP_ENTRY* TAO_Transport::cache_map_entry_ [protected]
 

Our entry in the cache. We don't own this. It is here for our convenience. We cannot just change things around.

Definition at line 930 of file Transport.h.

Referenced by cache_map_entry().

TAO_Codeset_Translator_Base* TAO_Transport::char_translator_ [private]
 

Additional member values required to support codeset translation.

@Phil, I think it would be nice if we could think of a way to do the following. We have been trying to use the transport for marking about translator factories and such! IMHO this is a wrong encapulation ie. trying to populate the transport object with these details. We should probably have a class something like TAO_Message_Property or TAO_Message_Translator or whatever (I am sure you get the idea) and encapsulate all these details. Coupling these seems odd. if I have to be more cynical we can move this to the connection_handler and it may more sense with the DSCP stuff around there. Do you agree?

Definition at line 1033 of file Transport.h.

ACE_Time_Value TAO_Transport::current_deadline_ [protected]
 

The queue will start draining no later than if* the deadline is

Definition at line 975 of file Transport.h.

Referenced by check_buffering_constraints_i(), handle_timeout(), and reset_flush_timer().

CORBA::Boolean TAO_Transport::first_request_ [private]
 

First_request_ is true until the first request is sent or received. This is necessary since codeset context information is necessary only on the first request. After that, the translators are fixed for the life of the connection.

Definition at line 1045 of file Transport.h.

Referenced by first_request_sent(), and generate_request_header().

long TAO_Transport::flush_timer_id_ [protected]
 

The timer ID.

Definition at line 978 of file Transport.h.

Referenced by check_buffering_constraints_i(), flush_timer_pending(), and reset_flush_timer().

ACE_Lock* TAO_Transport::handler_lock_ [mutable, protected]
 

This is an ACE_Lock that gets initialized from TAO_ORB_Core::resource_factory()->create_cached_connection_lock(). This way, one can use a lock appropriate for the type of system, i.e., a null lock for single-threaded systems, and a real lock for multi-threaded systems.

Definition at line 992 of file Transport.h.

Referenced by ~TAO_Transport().

TAO_Queued_Message* TAO_Transport::head_ [protected]
 

Implement the outgoing data queue.

Definition at line 963 of file Transport.h.

size_t TAO_Transport::id_ [protected]
 

A unique identifier for the transport.

This never *never* changes over the lifespan, so we don't have to worry about locking it.

HINT: Protocol-specific transports that use connection handler might choose to set this to the handle for their connection.

Definition at line 1002 of file Transport.h.

TAO_Incoming_Message_Queue TAO_Transport::incoming_message_queue_ [protected]
 

Queue of the consolidated, incoming messages..

Definition at line 967 of file Transport.h.

Referenced by consolidate_enqueue_message(), handle_input_parse_data(), and process_queue_head().

TAO::Incoming_Message_Stack TAO_Transport::incoming_message_stack_ [protected]
 

Stack of incoming fragments, consolidated messages are going to be enqueued in "incoming_message_queue_"

Definition at line 971 of file Transport.h.

Referenced by handle_input(), handle_input_missing_data(), handle_input_parse_data(), and handle_input_parse_extra_messages().

bool TAO_Transport::is_connected_ [protected]
 

Is this transport really connected or not. In case of oneways with SYNC_NONE Policy we don't wait until the connection is ready and we buffer the requests in this transport until the connection is ready

Definition at line 1016 of file Transport.h.

Referenced by is_connected(), post_open(), and ~TAO_Transport().

TAO::Connection_Role TAO_Transport::opening_connection_role_ [protected]
 

Definition at line 960 of file Transport.h.

Referenced by opened_as(), and provide_blockable_handler().

TAO_ORB_Core* const TAO_Transport::orb_core_ [protected]
 

Global orbcore resource.

Definition at line 926 of file Transport.h.

Referenced by TAO_GIOP_Message_Lite::process_locate_request(), TAO_GIOP_Message_Base::process_locate_request(), TAO_GIOP_Message_Lite::process_request(), and TAO_GIOP_Message_Base::process_request().

ACE_Message_Block* TAO_Transport::partial_message_ [private]
 

Holds the partial GIOP message (if there is one).

Definition at line 1048 of file Transport.h.

Referenced by allocate_partial_message_block(), and handle_input_parse_data().

unsigned long TAO_Transport::purging_order_ [protected]
 

Used by the LRU, LFU and FIFO Connection Purging Strategies.

Definition at line 1005 of file Transport.h.

Referenced by purging_order().

size_t TAO_Transport::recv_buffer_size_ [protected]
 

Size of the buffer received.

Definition at line 1008 of file Transport.h.

size_t TAO_Transport::sent_byte_count_ [protected]
 

Number of bytes sent.

Definition at line 1011 of file Transport.h.

Referenced by drain_queue_helper(), drain_queue_i(), and sent_byte_count().

CORBA::ULong const TAO_Transport::tag_ [protected]
 

IOP protocol tag.

Definition at line 923 of file Transport.h.

TAO_Queued_Message* TAO_Transport::tail_ [protected]
 

Definition at line 964 of file Transport.h.

CORBA::Boolean TAO_Transport::tcs_set_ [private]
 

The tcs_set_ flag indicates that negotiation has occured and so the translators are correct, since a null translator is valid if both ends are using the same codeset, whatever that codeset might be.

Definition at line 1039 of file Transport.h.

Referenced by char_translator(), is_tcs_set(), and wchar_translator().

TAO_Transport_Mux_Strategy* TAO_Transport::tms_ [protected]
 

Strategy to decide whether multiple requests can be sent over the same connection or the connection is exclusive for a request.

Definition at line 934 of file Transport.h.

TAO_Transport_Timer TAO_Transport::transport_timer_ [protected]
 

The adapter used to receive timeout callbacks from the Reactor.

Definition at line 981 of file Transport.h.

TAO_Codeset_Translator_Base* TAO_Transport::wchar_translator_ [private]
 

Definition at line 1034 of file Transport.h.

TAO_Wait_Strategy* TAO_Transport::ws_ [protected]
 

Strategy for waiting for the reply after sending the request.

Definition at line 937 of file Transport.h.


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 12:24:06 2006 for TAO by doxygen 1.3.6