#include <Transport.h>
Inheritance diagram for TAO_Transport:


Template methods | |
| The Transport class uses the Template Method Pattern to implement the protocol specific functionality. Implementors of a pluggable protocol should override the following methods with the semantics documented below. | |
| enum | { TAO_ONEWAY_REQUEST = 0, TAO_TWOWAY_REQUEST = 1, TAO_REPLY } |
| virtual ACE_Event_Handler * | event_handler_i (void)=0 |
| bool | is_connected (void) const |
| Is this transport really connected. | |
| bool | post_open (size_t id) |
| Perform all the actions when this transport get opened. | |
| void | pre_close (void) |
| do what needs to be done when closing the transport | |
| TAO_Connection_Handler * | connection_handler (void) |
| Get the connection handler for this transport. | |
| TAO_OutputCDR & | out_stream (void) |
| Accessor for the output CDR stream. | |
| int | generate_locate_request (TAO_Target_Specification &spec, TAO_Operation_Details &opdetails, TAO_OutputCDR &output) |
| virtual int | generate_request_header (TAO_Operation_Details &opd, TAO_Target_Specification &spec, TAO_OutputCDR &msg) |
| int | recache_transport (TAO_Transport_Descriptor_Interface *desc) |
| Recache ourselves in the cache. | |
| virtual int | handle_input (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time=0) |
| Callback to read incoming data. | |
| virtual int | send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, int message_semantics, ACE_Time_Value *max_time_wait)=0 |
| virtual int | send_message (TAO_OutputCDR &stream, TAO_Stub *stub=0, int message_semantics=TAO_Transport::TAO_TWOWAY_REQUEST, ACE_Time_Value *max_time_wait=0)=0 |
| virtual int | send_message_shared (TAO_Stub *stub, int message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
| Sent the contents of message_block. | |
| int | format_queue_message (TAO_OutputCDR &stream, ACE_Time_Value *max_wait_time) |
| int | send_message_block_chain (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time=0) |
| Send a message block chain,. | |
| int | send_message_block_chain_i (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time) |
| Send a message block chain, assuming the lock is held. | |
| int | purge_entry (void) |
| Cache management. | |
| int | make_idle (void) |
| Cache management. | |
| int | update_transport (void) |
| Cache management. | |
| int | handle_timeout (const ACE_Time_Value ¤t_time, const void *act) |
| size_t | recv_buffer_size (void) const |
| Accessor to recv_buffer_size_. | |
| size_t | sent_byte_count (void) const |
| Accessor to sent_byte_count_. | |
| TAO_Codeset_Translator_Base * | char_translator (void) const |
| CodeSet Negotiation - Get the char codeset translator factory. | |
| TAO_Codeset_Translator_Base * | wchar_translator (void) const |
| CodeSet Negotiation - Get the wchar codeset translator factory. | |
| void | char_translator (TAO_Codeset_Translator_Base *) |
| CodeSet negotiation - Set the char codeset translator factory. | |
| void | wchar_translator (TAO_Codeset_Translator_Base *) |
| CodeSet negotiation - Set the wchar codeset translator factory. | |
| void | assign_translators (TAO_InputCDR *, TAO_OutputCDR *) |
| void | clear_translators (TAO_InputCDR *, TAO_OutputCDR *) |
| CORBA::Boolean | is_tcs_set () const |
| Return true if the tcs has been set. | |
| void | first_request_sent () |
| Set the state of the first_request_ flag to 0. | |
| void | send_connection_closed_notifications (void) |
| TAO::Transport::Stats * | stats (void) const |
| Transport statistics. | |
| virtual TAO_Connection_Handler * | connection_handler_i (void)=0 |
| int | process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) |
| int | send_message_shared_i (TAO_Stub *stub, int message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
| int | queue_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
| CORBA::ULong const | tag_ |
| IOP protocol tag. | |
| TAO_ORB_Core *const | orb_core_ |
| Global orbcore resource. | |
| TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * | cache_map_entry_ |
| TAO_Transport_Mux_Strategy * | tms_ |
| TAO_Wait_Strategy * | ws_ |
| Strategy for waiting for the reply after sending the request. | |
| int | bidirectional_flag_ |
| TAO::Connection_Role | opening_connection_role_ |
| TAO_Queued_Message * | head_ |
| Implement the outgoing data queue. | |
| TAO_Queued_Message * | tail_ |
| TAO_Incoming_Message_Queue | incoming_message_queue_ |
| Queue of the consolidated, incoming messages.. | |
| TAO::Incoming_Message_Stack | incoming_message_stack_ |
| ACE_Time_Value | current_deadline_ |
| long | flush_timer_id_ |
| The timer ID. | |
| TAO_Transport_Timer | transport_timer_ |
| The adapter used to receive timeout callbacks from the Reactor. | |
| ACE_Lock * | handler_lock_ |
| size_t | id_ |
| A unique identifier for the transport. | |
| unsigned long | purging_order_ |
| Used by the LRU, LFU and FIFO Connection Purging Strategies. | |
| size_t | recv_buffer_size_ |
| Size of the buffer received. | |
| size_t | sent_byte_count_ |
| Number of bytes sent. | |
| bool | is_connected_ |
| TAO::Transport_Cache_Manager & | transport_cache_manager (void) |
| Helper method that returns the Transport Cache Manager. | |
| int | drain_queue (void) |
| Send some of the data in the queue. | |
| int | drain_queue_i (void) |
| Implement drain_queue() assuming the lock is held. | |
| bool | queue_is_empty_i (void) |
| Check if there are messages pending in the queue. | |
| int | drain_queue_helper (int &iovcnt, iovec iov[]) |
| A helper routine used in drain_queue_i(). | |
| int | schedule_output_i (void) |
| Schedule handle_output() callbacks. | |
| int | cancel_output_i (void) |
| Cancel handle_output() callbacks. | |
| void | cleanup_queue (size_t byte_count) |
| Cleanup the queue. | |
| void | cleanup_queue_i () |
| Cleanup the complete queue. | |
| int | check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush) |
| Check if the buffering constraints have been reached. | |
| int | send_synchronous_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
| int | send_reply_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
| int | send_asynchronous_message_i (TAO_Stub *stub, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
| int | send_synch_message_helper_i (TAO_Synch_Queued_Message &s, ACE_Time_Value *max_wait_time) |
| int | flush_timer_pending (void) const |
| Check if the flush timer is still pending. | |
| void | reset_flush_timer (void) |
| void | report_invalid_event_handler (const char *caller) |
| Print out error messages if the event handler is not valid. | |
| int | handle_input_missing_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time, TAO_Queued_Data *q_data) |
| int | handle_input_parse_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time) |
| int | handle_input_parse_extra_messages (ACE_Message_Block &message_block) |
| int | consolidate_enqueue_message (TAO_Queued_Data *qd) |
| -1 error, otherwise 0 | |
| int | consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) |
| -1 error, otherwise 0 | |
| int | process_queue_head (TAO_Resume_Handle &rh) |
| int | notify_reactor (void) |
| void | send_connection_closed_notifications_i (void) |
| Assume the lock is held. | |
| void | allocate_partial_message_block (void) |
| TAO_Transport (const TAO_Transport &) | |
| void | operator= (const TAO_Transport &) |
| TAO_Codeset_Translator_Base * | char_translator_ |
| Additional member values required to support codeset translation. | |
| TAO_Codeset_Translator_Base * | wchar_translator_ |
| CORBA::Boolean | tcs_set_ |
| CORBA::Boolean | first_request_ |
| ACE_Message_Block * | partial_message_ |
| Holds the partial GIOP message (if there is one). | |
| TAO::Transport::Stats * | stats_ |
| Statistics. | |
| class | TAO_Reactive_Flushing_Strategy |
| class | TAO_Leader_Follower_Flushing_Strategy |
| class | TAO_Thread_Per_Connection_Handler |
Public Types | |
Public Member Functions | |
| TAO_Transport (CORBA::ULong tag, TAO_ORB_Core *orb_core) | |
| Default creator, requires the tag value be supplied. | |
| virtual | ~TAO_Transport (void) |
| Destructor. | |
| CORBA::ULong | tag (void) const |
| Return the protocol tag. | |
| TAO_ORB_Core * | orb_core (void) const |
| Access the ORB that owns this connection. | |
| TAO_Transport_Mux_Strategy * | tms (void) const |
| Get the TAO_Tranport_Mux_Strategy used by this object. | |
| TAO_Wait_Strategy * | wait_strategy (void) const |
| Return the TAO_Wait_Strategy used by this object. | |
| int | handle_output (void) |
| Callback method to reactively drain the outgoing data queue. | |
| int | bidirectional_flag (void) const |
| Get the bidirectional flag. | |
| void | bidirectional_flag (int flag) |
| Set the bidirectional flag. | |
| void | cache_map_entry (TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *entry) |
| Set the Cache Map entry. | |
| TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * | cache_map_entry (void) |
| Get the Cache Map entry. | |
| size_t | id (void) const |
| Set and Get the identifier for this transport instance. | |
| void | id (size_t id) |
| TAO::Connection_Role | opened_as (void) const |
| void | opened_as (TAO::Connection_Role) |
| unsigned long | purging_order (void) const |
| void | purging_order (unsigned long value) |
| bool | queue_is_empty (void) |
| Check if there are messages pending in the queue. | |
| void | provide_handler (TAO::Connection_Handler_Set &handlers) |
| Added event handler to the handlers set. | |
| bool | provide_blockable_handler (TAO::Connection_Handler_Set &handlers) |
| virtual int | register_handler (void) |
| Register the handler with the reactor. | |
| virtual ssize_t | send (iovec *iov, int iovcnt, size_t &bytes_transferred, const ACE_Time_Value *timeout=0)=0 |
| Write the complete Message_Block chain to the connection. | |
| virtual ssize_t | recv (char *buffer, size_t len, const ACE_Time_Value *timeout=0)=0 |
| Read len bytes from into buf. | |
Control connection lifecycle | |
These methods are routed through the TMS object. The TMS strategies implement them correctly. | |
| bool | idle_after_send (void) |
| bool | idle_after_reply (void) |
| virtual void | close_connection (void) |
| Call the implementation method after obtaining the lock. | |
Template methods | |
The Transport class uses the Template Method Pattern to implement the protocol specific functionality. Implementors of a pluggable protocol should override the following methods with the semantics documented below. | |
| virtual int | messaging_init (CORBA::Octet major, CORBA::Octet minor)=0 |
| virtual int | tear_listen_point_list (TAO_InputCDR &cdr) |
| virtual bool | post_connect_hook (void) |
| Hooks that can be overridden in concrete transports. | |
| ACE_Event_Handler::Reference_Count | add_reference (void) |
| Memory management routines. | |
| ACE_Event_Handler::Reference_Count | remove_reference (void) |
| virtual TAO_Pluggable_Messaging * | messaging_object (void)=0 |
The transport object is created in the Service handler constructor and deleted in the Service Handler's destructor!!
The main responsability of a Transport object is to encapsulate a connection, and provide a transport independent way to send and receive data. Since TAO is heavily based on the Reactor for all if not all its I/O the Transport class is usually implemented with a helper Connection Handler that adapts the generic Transport interface to the Reactor types.
One of the responsibilities of the TAO_Transport class is to send out GIOP messages as efficiently as possible. In most cases messages are put out in FIFO order, the transport object will put out the message using a single system call and return control to the application. However, for oneways and AMI requests it may be more efficient (or required if the SYNC_NONE policy is in effect) to queue the messages until a large enough data set is available. Another reason to queue is that some applications cannot block for I/O, yet they want to send messages so large that a single write() operation would not be able to cope with them. In such cases we need to queue the data and use the Reactor to drain the queue.
Therefore, the Transport class may need to use a queue to temporarily hold the messages, and, in some configurations, it may need to use the Reactor to concurrently drain such queues.
TAO provides explicit policies to send 'urgent' messages. Such messages may put at the head of the queue. However, they cannot be sent immediately because the transport may already be sending another message in a reactive fashion.
Consequently, the Transport must also know if the head of the queue has been partially sent. In that case new messages can only follow the head. Only once the head is completely sent we can start sending new messages.
One or more threads can be blocked waiting for the connection to completely send the message. The thread should return as soon as its message has been sent, so a per-thread condition is required. This suggest that simply using a ACE_Message_Queue would not be enough: there is a significant amount of ancillary information, to keep on each message that the Message_Block class does not provide room for.
Blocking I/O is still attractive for some applications. First, my eliminating the Reactor overhead performance is improved when sending large blocks of data. Second, using the Reactor to send out data opens the door for nested upcalls, yet some applications cannot deal with the reentrancy issues in this case.
Some or all messages could have a timeout period attached to them. The timeout source could either be some high-level policy or maybe some strategy to prevent denial of service attacks. In any case the timeouts are per-message, and later messages could have shorter timeouts. In fact, some kind of scheduling (such as EDF) could be required in a few applications.
The outgoing data path consist in several components:
The Transport object provides a single method to send request messages (send_request_message ()).
One of the main responsibilities of the transport is to read and process the incoming GIOP message as quickly and efficiently as possible. There are other forces that needs to be given due consideration. They are
The messages should be checked for validity and the right information should be sent to the higher layer for processing. The process of doing a sanity check and preparing the messages for the higher layers of the ORB are done by the messaging protocol.
To keep things as efficient as possible for medium sized requests, it would be good to minimise data copying and locking along the incoming path ie. from the time of reading the data from the handle to the application. We achieve this by creating a buffer on stack and reading the data from the handle into the buffer. We then pass the same data block (the buffer is encapsulated into a data block) to the higher layers of the ORB. The problems stem from the following (a) Data is bigger than the buffer that we have on stack (b) Transports like TCP do not guarantee availability of the whole chunk of data in one shot. Data could trickle in byte by byte. (c) Single read gives multiple messages
We solve the problems as follows
(a) First do a read with the buffer on stack. Query the underlying messaging object whether the message has any incomplete portion. If so, data will be copied into new buffer being able to hold full message and is queued; succeeding events will read data from socket and write directly into this buffer. Otherwise, if if the message in local buffer is complete, we free the handle and then send the message to the higher layers of the ORB for processing.
(b) If buffer with incomplete message has been enqueued, while trying to do the above, the reactor will call us back when the handle becomes read ready. The read-operation will copy data directly into the enqueued buffer. If the message has bee read completely the message is sent to the higher layers of the ORB for processing.
(c) If we get multiple messages (possible if the client connected to the server sends oneways or AMI requests), we parse and split the messages. Every message is put in the queue. Once the messages are queued, the thread picks up one message to send to the higher layers of the ORB. Before doing that, if it finds more messages, it sends a notify to the reactor without resuming the handle. The next thread picks up a message from the queue and processes that. Once the queue is drained the last thread resumes the handle.
We could use the outgoing path of the ORB to send replies. This would allow us to reuse most of the code in the outgoing data path. We were doing this till TAO-1.2.3. We run in to problems. When writing the reply the ORB gets flow controlled, and the ORB tries to flush the message by going into the reactor. This resulted in unnecessary nesting. The thread that gets into the Reactor could potentially handle other messages (incoming or outgoing) and the stack starts growing leading to crashes.
The solution that we (plan to) adopt is pretty straight forward. The thread sending replies will not block to send the replies but queue the replies and return to the Reactor. (Note the careful usages of the terms "blocking in the Reactor" as opposed to "return back to the Reactor".
See Also:
Definition at line 245 of file Transport.h.
|
|
Definition at line 597 of file Transport.h.
00598 {
00599 TAO_ONEWAY_REQUEST = 0,
00600 TAO_TWOWAY_REQUEST = 1,
00601 TAO_REPLY
00602 };
|
|
||||||||||||
|
Default creator, requires the tag value be supplied.
Definition at line 126 of file Transport.cpp. References ACE_NEW_THROW_EX, TAO_ORB_Core::client_factory(), TAO_Client_Strategy_Factory::create_transport_mux_strategy(), TAO_Client_Strategy_Factory::create_wait_strategy(), TAO_HAS_SENDFILE, and ws_.
00128 : tag_ (tag) 00129 , orb_core_ (orb_core) 00130 , cache_map_entry_ (0) 00131 , bidirectional_flag_ (-1) 00132 , opening_connection_role_ (TAO::TAO_UNSPECIFIED_ROLE) 00133 , head_ (0) 00134 , tail_ (0) 00135 , incoming_message_queue_ (orb_core) 00136 , current_deadline_ (ACE_Time_Value::zero) 00137 , flush_timer_id_ (-1) 00138 , transport_timer_ (this) 00139 , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ()) 00140 , id_ ((size_t) this) 00141 , purging_order_ (0) 00142 , recv_buffer_size_ (0) 00143 , sent_byte_count_ (0) 00144 , is_connected_ (false) 00145 , char_translator_ (0) 00146 , wchar_translator_ (0) 00147 , tcs_set_ (0) 00148 , first_request_ (1) 00149 , partial_message_ (0) 00150 #if TAO_HAS_SENDFILE == 1 00151 // The ORB has been configured to use the MMAP allocator, meaning 00152 // we could/should use sendfile() to send data. Cast once rather 00153 // here rather than during each send. This assumes that all 00154 // TAO_OutputCDR instances are using the same TAO_MMAP_Allocator 00155 // instance as the underlying output CDR buffer allocator. 00156 , mmap_allocator_ ( 00157 dynamic_cast<TAO_MMAP_Allocator *> ( 00158 orb_core->output_cdr_buffer_allocator ())) 00159 #endif /* TAO_HAS_SENDFILE==1 */ 00160 { 00161 TAO_Client_Strategy_Factory *cf = 00162 this->orb_core_->client_factory (); 00163 00164 // Create WS now. 00165 this->ws_ = cf->create_wait_strategy (this); 00166 00167 // Create TMS now. 00168 this->tms_ = cf->create_transport_mux_strategy (this); 00169 00170 #if TAO_HAS_TRANSPORT_CURRENT == 1 00171 // Allocate stats 00172 ACE_NEW_THROW_EX (this->stats_, 00173 TAO::Transport::Stats, 00174 CORBA::NO_MEMORY ()); 00175 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 00176 00177 /* 00178 * Hook to add code that initializes components that 00179 * belong to the concrete protocol implementation. 00180 * Further additions to this Transport class will 00181 * need to add code *before* this hook. 00182 */ 00183 //@@ TAO_TRANSPORT_SPL_CONSTRUCTOR_ADD_HOOK 00184 } |
|
|
Destructor.
Definition at line 186 of file Transport.cpp. References ACE_ASSERT, cleanup_queue_i(), handler_lock_, purge_entry(), ACE_Message_Block::release(), stats_, and ws_.
00187 {
00188 delete this->ws_;
00189
00190 delete this->tms_;
00191
00192 delete this->handler_lock_;
00193
00194 if (!this->is_connected_)
00195 {
00196 // When we have a not connected transport we could have buffered
00197 // messages on this transport which we have to cleanup now.
00198 this->cleanup_queue_i();
00199
00200 // Cleanup our cache entry
00201 this->purge_entry();
00202 }
00203
00204 // Release the partial message block, however we may
00205 // have never allocated one.
00206 ACE_Message_Block::release (this->partial_message_);
00207
00208 // By the time the destructor is reached here all the connection stuff
00209 // *must* have been cleaned up.
00210
00211 // The following assert is needed for the test "Bug_2494_Regression".
00212 // See the bugzilla bug #2494 for details.
00213 ACE_ASSERT (this->head_ == 0);
00214 ACE_ASSERT (this->cache_map_entry_ == 0);
00215
00216 #if TAO_HAS_TRANSPORT_CURRENT == 1
00217 delete this->stats_;
00218 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
00219
00220 /*
00221 * Hook to add code that cleans up components
00222 * belong to the concrete protocol implementation.
00223 * Further additions to this Transport class will
00224 * need to add code *before* this hook.
00225 */
00226 //@@ TAO_TRANSPORT_SPL_DESTRUCTOR_ADD_HOOK
00227 }
|
|
|
|
|
|
Memory management routines.
Definition at line 2484 of file Transport.cpp. References ACE_Event_Handler::add_reference(), and event_handler_i(). Referenced by TAO::Cache_IntId::Cache_IntId(), TAO_Connection_Handler::connection_pending(), TAO::Cache_IntId::operator=(), provide_blockable_handler(), provide_handler(), TAO::Transport_Cache_Manager::purge(), TAO_Thread_Per_Connection_Handler::TAO_Thread_Per_Connection_Handler(), and TAO_Asynch_Reply_Dispatcher_Base::transport().
02485 {
02486 return this->event_handler_i ()->add_reference ();
02487 }
|
|
|
Allocate a partial message block and store it in our partial_message_ data member. Definition at line 2558 of file Transport.cpp. References ACE_NEW, TAO_Pluggable_Messaging::header_length(), messaging_object(), and partial_message_. Referenced by handle_input_parse_data().
02559 {
02560 if (this->partial_message_ == 0)
02561 {
02562 // This value must be at least large enough to hold a GIOP message
02563 // header plus a GIOP fragment header
02564 size_t const partial_message_size =
02565 this->messaging_object ()->header_length ();
02566 // + this->messaging_object ()->fragment_header_length ();
02567 // deprecated, conflicts with not-single_read_opt.
02568
02569 ACE_NEW (this->partial_message_,
02570 ACE_Message_Block (partial_message_size));
02571 }
02572 }
|
|
||||||||||||
|
Use the Transport's codeset factories to set the translator for input and output CDRs. Definition at line 2454 of file Transport.cpp. References TAO_Codeset_Translator_Base::assign(). Referenced by TAO::LocateRequest_Invocation::check_reply(), TAO::Synch_Twoway_Invocation::check_reply_status(), TAO_GIOP_Message_Base::process_reply_message(), TAO_GIOP_Message_Base::process_request(), TAO_GIOP_Message_Base::process_request_message(), and TAO::Remote_Invocation::write_header().
02455 {
02456 if (this->char_translator_)
02457 {
02458 this->char_translator_->assign (inp);
02459 this->char_translator_->assign (outp);
02460 }
02461 if (this->wchar_translator_)
02462 {
02463 this->wchar_translator_->assign (inp);
02464 this->wchar_translator_->assign (outp);
02465 }
02466 }
|
|
|
Set the bidirectional flag.
Definition at line 39 of file Transport.inl. References bidirectional_flag_.
00040 {
00041 this->bidirectional_flag_ = flag;
00042 }
|
|
|
Get the bidirectional flag.
Definition at line 33 of file Transport.inl. References bidirectional_flag_. Referenced by TAO::Wait_On_LF_No_Upcall::can_process_upcalls(), TAO_IIOP_Transport::generate_request_header(), TAO_Muxed_TMS::request_id(), TAO_Exclusive_TMS::request_id(), and TAO_IIOP_Transport::tear_listen_point_list().
00034 {
00035 return this->bidirectional_flag_;
00036 }
|
|
|
Get the Cache Map entry.
Definition at line 57 of file Transport.inl. References cache_map_entry_.
00058 {
00059 return this->cache_map_entry_;
00060 }
|
|
|
Set the Cache Map entry.
Definition at line 63 of file Transport.inl. References cache_map_entry_, and TAO::Transport_Cache_Manager::HASH_MAP_ENTRY. Referenced by TAO::Transport_Cache_Manager::bind_i().
00065 {
00066 this->cache_map_entry_ = entry;
00067 }
|
|
|
Cancel handle_output() callbacks.
Definition at line 794 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, ACE_Reactor::cancel_wakeup(), event_handler_i(), LM_DEBUG, ACE_Event_Handler::reactor(), and TAO_debug_level. Referenced by TAO_Reactive_Flushing_Strategy::cancel_output(), and TAO_Leader_Follower_Flushing_Strategy::cancel_output().
00795 {
00796 ACE_Event_Handler * const eh = this->event_handler_i ();
00797 ACE_Reactor *const reactor = eh->reactor ();
00798
00799 if (TAO_debug_level > 3)
00800 {
00801 ACE_DEBUG ((LM_DEBUG,
00802 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"),
00803 this->id ()));
00804 }
00805
00806 return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00807 }
|
|
|
CodeSet negotiation - Set the char codeset translator factory.
Definition at line 137 of file Transport.inl. References tcs_set_.
00138 {
00139 this->char_translator_ = tf;
00140 this->tcs_set_ = 1;
00141 }
|
|
|
CodeSet Negotiation - Get the char codeset translator factory.
Definition at line 125 of file Transport.inl.
00126 {
00127 return this->char_translator_;
00128 }
|
|
||||||||||||
|
Check if the buffering constraints have been reached.
Definition at line 1138 of file Transport.cpp. References TAO::Transport_Queueing_Strategy::buffering_constraints_reached(), ACE_Reactor::cancel_timer(), current_deadline_, event_handler_i(), flush_timer_id_, flush_timer_pending(), ACE_OS::gettimeofday(), TAO_Queued_Message::message_length(), TAO_Queued_Message::next(), ACE_Event_Handler::reactor(), ACE_Reactor::schedule_timer(), and TAO_Stub::transport_queueing_strategy(). Referenced by send_asynchronous_message_i().
01139 {
01140 // First let's compute the size of the queue:
01141 size_t msg_count = 0;
01142 size_t total_bytes = 0;
01143
01144 for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
01145 {
01146 ++msg_count;
01147 total_bytes += i->message_length ();
01148 }
01149
01150 bool set_timer = false;
01151 ACE_Time_Value new_deadline;
01152
01153 TAO::Transport_Queueing_Strategy *queue_strategy =
01154 stub->transport_queueing_strategy ();
01155
01156 bool constraints_reached = true;
01157
01158 if (queue_strategy)
01159 {
01160 constraints_reached =
01161 queue_strategy->buffering_constraints_reached (stub,
01162 msg_count,
01163 total_bytes,
01164 must_flush,
01165 this->current_deadline_,
01166 set_timer,
01167 new_deadline);
01168 }
01169 else
01170 {
01171 must_flush = false;
01172 }
01173
01174 // ... set the new timer, also cancel any previous timers ...
01175 if (set_timer)
01176 {
01177 ACE_Event_Handler *eh = this->event_handler_i ();
01178 ACE_Reactor * const reactor = eh->reactor ();
01179 this->current_deadline_ = new_deadline;
01180 ACE_Time_Value delay = new_deadline - ACE_OS::gettimeofday ();
01181
01182 if (this->flush_timer_pending ())
01183 {
01184 reactor->cancel_timer (this->flush_timer_id_);
01185 }
01186
01187 this->flush_timer_id_ =
01188 reactor->schedule_timer (&this->transport_timer_,
01189 &this->current_deadline_,
01190 delay);
01191 }
01192
01193 return constraints_reached;
01194 }
|
|
|
Cleanup the queue. Exactly byte_count bytes have been sent, the queue must be cleaned up as potentially several messages have been completely sent out. It leaves on head_ the next message to send out. Definition at line 1101 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_Queued_Message::all_data_sent(), TAO_Queued_Message::bytes_transferred(), TAO_Queued_Message::destroy(), LM_DEBUG, TAO_Queued_Message::message_length(), TAO_Queued_Message::remove_from_list(), and TAO_debug_level. Referenced by drain_queue_helper().
01102 {
01103 while (this->head_ != 0 && byte_count > 0)
01104 {
01105 TAO_Queued_Message *i = this->head_;
01106
01107 if (TAO_debug_level > 4)
01108 {
01109 ACE_DEBUG ((LM_DEBUG,
01110 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01111 ACE_TEXT ("byte_count = %d\n"),
01112 this->id (), byte_count));
01113 }
01114
01115 // Update the state of the first message
01116 i->bytes_transferred (byte_count);
01117
01118 if (TAO_debug_level > 4)
01119 {
01120 ACE_DEBUG ((LM_DEBUG,
01121 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01122 ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"),
01123 this->id (), byte_count, i->all_data_sent (),
01124 i->message_length ()));
01125 }
01126
01127 // ... if all the data was sent the message must be removed from
01128 // the queue...
01129 if (i->all_data_sent ())
01130 {
01131 i->remove_from_list (this->head_, this->tail_);
01132 i->destroy ();
01133 }
01134 }
01135 }
|
|
|
Cleanup the complete queue.
Definition at line 1058 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_Queued_Message::destroy(), LM_DEBUG, TAO_Queued_Message::message_length(), TAO_Queued_Message::remove_from_list(), TAO_LF_Event::state_changed(), and TAO_debug_level. Referenced by pre_close(), send_connection_closed_notifications_i(), and ~TAO_Transport().
01059 {
01060 if (TAO_debug_level > 4)
01061 {
01062 ACE_DEBUG ((LM_DEBUG,
01063 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
01064 ACE_TEXT ("cleaning up complete queue\n"),
01065 this->id ()));
01066 }
01067
01068 size_t byte_count = 0;
01069 int msg_count = 0;
01070
01071 // Cleanup all messages
01072 while (this->head_ != 0)
01073 {
01074 TAO_Queued_Message *i = this->head_;
01075
01076 if (TAO_debug_level > 4)
01077 {
01078 byte_count += i->message_length();
01079 ++msg_count;
01080 }
01081 // @@ This is a good point to insert a flag to indicate that a
01082 // CloseConnection message was successfully received.
01083 i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
01084 this->orb_core_->leader_follower ());
01085
01086 i->remove_from_list (this->head_, this->tail_);
01087
01088 i->destroy ();
01089 }
01090
01091 if (TAO_debug_level > 4)
01092 {
01093 ACE_DEBUG ((LM_DEBUG,
01094 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
01095 ACE_TEXT ("discarded %d messages, %u bytes.\n"),
01096 this->id (), msg_count, byte_count));
01097 }
01098 }
|
|
||||||||||||
|
It is necessary to clear the codeset translator when a CDR stream is used for more than one GIOP message. This is required since the header must not be translated, whereas the body must be. Definition at line 2469 of file Transport.cpp. References ACE_OutputCDR::char_translator(), ACE_InputCDR::char_translator(), ACE_OutputCDR::wchar_translator(), and ACE_InputCDR::wchar_translator(). Referenced by TAO::Remote_Invocation::write_header().
02470 {
02471 if (inp)
02472 {
02473 inp->char_translator (0);
02474 inp->wchar_translator (0);
02475 }
02476 if (outp)
02477 {
02478 outp->char_translator (0);
02479 outp->wchar_translator (0);
02480 }
02481 }
|
|
|
Call the implementation method after obtaining the lock.
Definition at line 312 of file Transport.cpp. References TAO_Connection_Handler::close_connection(), and connection_handler_i(). Referenced by TAO_IIOP_Connector::complete_connection(), TAO::LocateRequest_Invocation::invoke(), post_open(), TAO::Transport_Cache_Manager::purge(), TAO::Synch_Twoway_Invocation::remote_twoway(), TAO_GIOP_Message_Base::send_close_connection(), TAO::Remote_Invocation::send_message(), send_message_shared(), TAO_IIOP_Transport::send_message_shared(), TAO_Wait_On_Read::wait(), TAO::Synch_Twoway_Invocation::wait_for_reply(), and TransportCleanupGuard::~TransportCleanupGuard().
00313 {
00314 this->connection_handler_i ()->close_connection ();
00315 }
|
|
|
Get the connection handler for this transport.
Definition at line 175 of file Transport.inl. References connection_handler_i(). Referenced by TAO_IIOP_Connector::complete_connection(), TAO::Remote_Invocation::send_message(), TAO_Connect_Strategy::wait(), and TAO_Connector::wait_for_connection_completion().
00176 {
00177 return this->connection_handler_i();
00178 }
|
|
|
Implemented in TAO_IIOP_Transport. Referenced by close_connection(), and connection_handler(). |
|
|
-1 error, otherwise 0
Definition at line 1616 of file Transport.cpp. References ACE_ERROR, ACE_TEXT, TAO_Pluggable_Messaging::consolidate_fragmented_message(), TAO_Incoming_Message_Queue::enqueue_tail(), incoming_message_queue_, LM_ERROR, messaging_object(), TAO_Queued_Data::missing_data(), TAO_Queued_Data::more_fragments(), TAO_Queued_Data::msg_type(), TAO_Queued_Data::release(), TAO_debug_level, and TAO_PLUGGABLE_MESSAGE_FRAGMENT. Referenced by handle_input_parse_data(), and handle_input_parse_extra_messages().
01617 {
01618 // consolidate message on top of stack, only for fragmented messages
01619
01620 // paranoid check
01621 if (q_data->missing_data () != 0)
01622 {
01623 return -1;
01624 }
01625
01626 if (q_data->more_fragments () ||
01627 q_data->msg_type () == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
01628 {
01629 TAO_Queued_Data *new_q_data = 0;
01630
01631 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01632 {
01633 case -1: // error
01634 return -1;
01635
01636 case 0: // returning consolidated message in new_q_data
01637 if (!new_q_data)
01638 {
01639 if (TAO_debug_level > 0)
01640 {
01641 ACE_ERROR ((LM_ERROR,
01642 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ")
01643 ACE_TEXT ("error, consolidated message is NULL\n"),
01644 this->id ()));
01645 }
01646 return -1;
01647 }
01648
01649 if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0)
01650 {
01651 TAO_Queued_Data::release (new_q_data);
01652 return -1;
01653 }
01654 break;
01655
01656 case 1: // fragment has been stored in messaging_oject()
01657 break;
01658 }
01659 }
01660 else
01661 {
01662 if (this->incoming_message_queue_.enqueue_tail (q_data) != 0)
01663 {
01664 TAO_Queued_Data::release (q_data);
01665 return -1;
01666 }
01667 }
01668
01669 return 0; // success
01670 }
|
|
||||||||||||
|
-1 error, otherwise 0
Definition at line 1529 of file Transport.cpp. References ACE_ERROR, ACE_TEXT, TAO_Pluggable_Messaging::consolidate_fragmented_message(), LM_ERROR, messaging_object(), TAO_Queued_Data::missing_data(), TAO_Queued_Data::more_fragments(), TAO_Queued_Data::msg_type(), process_parsed_messages(), TAO_Queued_Data::release(), TAO_debug_level, and TAO_PLUGGABLE_MESSAGE_FRAGMENT. Referenced by handle_input_missing_data().
01531 {
01532 // paranoid check
01533 if (q_data->missing_data () != 0)
01534 {
01535 if (TAO_debug_level > 0)
01536 {
01537 ACE_ERROR ((LM_ERROR,
01538 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01539 ACE_TEXT ("missing data\n"),
01540 this->id ()));
01541 }
01542 return -1;
01543 }
01544
01545 if (q_data->more_fragments () ||
01546 q_data->msg_type () == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
01547 {
01548 // consolidate message on top of stack, only for fragmented messages
01549 TAO_Queued_Data *new_q_data = 0;
01550
01551 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01552 {
01553 case -1: // error
01554 return -1;
01555
01556 case 0: // returning consolidated message in q_data
01557 if (!new_q_data)
01558 {
01559 if (TAO_debug_level > 0)
01560 {
01561 ACE_ERROR ((LM_ERROR,
01562 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01563 ACE_TEXT ("error, consolidated message is NULL\n"),
01564 this->id ()));
01565 }
01566 return -1;
01567 }
01568
01569
01570 if (this->process_parsed_messages (new_q_data, rh) == -1)
01571 {
01572 TAO_Queued_Data::release (new_q_data);
01573
01574 if (TAO_debug_level > 0)
01575 {
01576 ACE_ERROR ((LM_ERROR,
01577 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01578 ACE_TEXT ("error processing consolidated message\n"),
01579 this->id ()));
01580 }
01581 return -1;
01582 }
01583
01584 TAO_Queued_Data::release (new_q_data);
01585
01586 break;
01587
01588 case 1: // fragment has been stored in messaging_oject()
01589 break;
01590 }
01591 }
01592 else
01593 {
01594 if (this->process_parsed_messages (q_data, rh) == -1)
01595 {
01596 TAO_Queued_Data::release (q_data);
01597
01598 if (TAO_debug_level > 0)
01599 {
01600 ACE_ERROR ((LM_ERROR,
01601 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01602 ACE_TEXT ("error processing message\n"),
01603 this->id ()));
01604 }
01605 return -1;
01606 }
01607
01608 TAO_Queued_Data::release (q_data);
01609
01610 }
01611
01612 return 0;
01613 }
|
|
|
Send some of the data in the queue. As the outgoing data is drained this method is invoked to send as much of the current message as possible. Returns 0 if there is more data to send, -1 if there was an error and 1 if the message was completely sent. Definition at line 849 of file Transport.cpp. References ACE_GUARD_RETURN, TAO_Flushing_Strategy::cancel_output(), drain_queue_i(), TAO_ORB_Core::flushing_strategy(), and orb_core(). Referenced by handle_output().
00850 {
00851 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00852 int const retval = this->drain_queue_i ();
00853
00854 if (retval == 1)
00855 {
00856 // ... there is no current message or it was completely
00857 // sent, cancel output...
00858 TAO_Flushing_Strategy *flushing_strategy =
00859 this->orb_core ()->flushing_strategy ();
00860
00861 flushing_strategy->cancel_output (this);
00862
00863 return 0;
00864 }
00865
00866 return retval;
00867 }
|
|
||||||||||||
|
A helper routine used in drain_queue_i().
Definition at line 870 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, cleanup_queue(), dump_iov(), EWOULDBLOCK, LM_DEBUG, send(), sent_byte_count_, ssize_t, and TAO_debug_level. Referenced by drain_queue_i().
00871 {
00872 size_t byte_count = 0;
00873
00874 // ... send the message ...
00875 ssize_t retval = -1;
00876
00877 #if TAO_HAS_SENDFILE == 1
00878 if (this->mmap_allocator_)
00879 retval = this->sendfile (this->mmap_allocator_,
00880 iov,
00881 iovcnt,
00882 byte_count);
00883 else
00884 #endif /* TAO_HAS_SENDFILE==1 */
00885 retval = this->send (iov, iovcnt, byte_count);
00886
00887 if (TAO_debug_level == 5)
00888 {
00889 dump_iov (iov, iovcnt, this->id (),
00890 byte_count, "drain_queue_helper");
00891 }
00892
00893 // ... now we need to update the queue, removing elements
00894 // that have been sent, and updating the last element if it
00895 // was only partially sent ...
00896 this->cleanup_queue (byte_count);
00897 iovcnt = 0;
00898
00899 if (retval == 0)
00900 {
00901 if (TAO_debug_level > 4)
00902 {
00903 ACE_DEBUG ((LM_DEBUG,
00904 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00905 ACE_TEXT ("send() returns 0\n"),
00906 this->id ()));
00907 }
00908 return -1;
00909 }
00910 else if (retval == -1)
00911 {
00912 if (TAO_debug_level > 4)
00913 {
00914 ACE_DEBUG ((LM_DEBUG,
00915 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00916 ACE_TEXT ("error during %p\n"),
00917 this->id (), ACE_TEXT ("send()")));
00918 }
00919
00920 if (errno == EWOULDBLOCK || errno == EAGAIN)
00921 {
00922 return 0;
00923 }
00924
00925 return -1;
00926 }
00927
00928 // ... start over, how do we guarantee progress? Because if
00929 // no bytes are sent send() can only return 0 or -1
00930
00931 // Total no. of bytes sent for a send call
00932 this->sent_byte_count_ += byte_count;
00933
00934 if (TAO_debug_level > 4)
00935 {
00936 ACE_DEBUG ((LM_DEBUG,
00937 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00938 ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"),
00939 this->id(), byte_count, (this->head_ == 0)));
00940 }
00941
00942 return 1;
00943 }
|
|
|
Implement drain_queue() assuming the lock is held.
Definition at line 946 of file Transport.cpp. References ACE_DEBUG, ACE_IOV_MAX, ACE_TEXT, ACE_Reactor::cancel_timer(), TAO_Queued_Message::destroy(), drain_queue_helper(), event_handler_i(), TAO_Queued_Message::fill_iov(), flush_timer_pending(), ACE_High_Res_Timer::gettimeofday_hr(), TAO_Queued_Message::is_expired(), LM_DEBUG, TAO_Queued_Message::next(), ACE_Event_Handler::reactor(), TAO_Queued_Message::remove_from_list(), reset_flush_timer(), sent_byte_count_, TAO_LF_Event::state_changed(), and TAO_debug_level. Referenced by drain_queue(), send_message_block_chain_i(), and send_synch_message_helper_i().
00947 {
00948 // This is the vector used to send data, it must be declared outside
00949 // the loop because after the loop there may still be data to be
00950 // sent
00951 int iovcnt = 0;
00952 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00953 iovec iov[ACE_IOV_MAX] = { 0 , 0 };
00954 #else
00955 iovec iov[ACE_IOV_MAX];
00956 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
00957
00958 // We loop over all the elements in the queue ...
00959 TAO_Queued_Message *i = this->head_;
00960
00961 // Reset the value so that the counting is done for each new send
00962 // call.
00963 this->sent_byte_count_ = 0;
00964
00965 // Avoid calling this expensive function each time through the loop. Instead
00966 // we'll assume that the time is unlikely to change much during the loop.
00967 // If we are forced to send in the loop then we'll recompute the time.
00968 ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr ();
00969
00970 while (i != 0)
00971 {
00972 if (i->is_expired (now))
00973 {
00974 if (TAO_debug_level > 3)
00975 {
00976 ACE_DEBUG ((LM_DEBUG,
00977 ACE_TEXT ("TAO (%P|%t - Transport[%d]::drain_queue_i, ")
00978 ACE_TEXT ("Discarding expired queued message.\n"),
00979 this->id ()));
00980 }
00981 TAO_Queued_Message *next = i->next ();
00982 i->state_changed (TAO_LF_Event::LFS_TIMEOUT,
00983 this->orb_core_->leader_follower ());
00984 i->remove_from_list (this->head_, this->tail_);
00985 i->destroy ();
00986 i = next;
00987 continue;
00988 }
00989 // ... each element fills the iovector ...
00990 i->fill_iov (ACE_IOV_MAX, iovcnt, iov);
00991
00992 // ... the vector is full, no choice but to send some data out.
00993 // We need to loop because a single message can span multiple
00994 // IOV_MAX elements ...
00995 if (iovcnt == ACE_IOV_MAX)
00996 {
00997 int const retval =
00998 this->drain_queue_helper (iovcnt, iov);
00999
01000 now = ACE_High_Res_Timer::gettimeofday_hr ();
01001
01002 if (TAO_debug_level > 4)
01003 {
01004 ACE_DEBUG ((LM_DEBUG,
01005 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01006 ACE_TEXT ("helper retval = %d\n"),
01007 this->id (), retval));
01008 }
01009
01010 if (retval != 1)
01011 {
01012 return retval;
01013 }
01014
01015 i = this->head_;
01016 continue;
01017 }
01018 // ... notice that this line is only reached if there is still
01019 // room in the iovector ...
01020 i = i->next ();
01021 }
01022
01023 if (iovcnt != 0)
01024 {
01025 int const retval = this->drain_queue_helper (iovcnt, iov);
01026
01027 if (TAO_debug_level > 4)
01028 {
01029 ACE_DEBUG ((LM_DEBUG,
01030 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01031 ACE_TEXT ("helper retval = %d\n"),
01032 this->id (), retval));
01033 }
01034
01035 if (retval != 1)
01036 {
01037 return retval;
01038 }
01039 }
01040
01041 if (this->head_ == 0)
01042 {
01043 if (this->flush_timer_pending ())
01044 {
01045 ACE_Event_Handler *eh = this->event_handler_i ();
01046 ACE_Reactor * const reactor = eh->reactor ();
01047 reactor->cancel_timer (this->flush_timer_id_);
01048 this->reset_flush_timer ();
01049 }
01050
01051 return 1;
01052 }
01053
01054 return 0;
01055 }
|
|
|
Normally a concrete TAO_Transport object has-a ACE_Event_Handler member that functions as an adapter between the ACE_Reactor framework and the TAO pluggable protocol framework. In all the protocols implemented so far this role is fullfilled by an instance of ACE_Svc_Handler.
Implemented in TAO_IIOP_Transport. Referenced by add_reference(), cancel_output_i(), check_buffering_constraints_i(), drain_queue_i(), TAO::Transport_Cache_Manager::find_transport(), notify_reactor(), register_handler(), remove_reference(), schedule_output_i(), TAO_Connection_Handler::transport(), and TAO_Wait_On_Read::wait(). |
|
|
Set the state of the first_request_ flag to 0.
Definition at line 158 of file Transport.inl. References first_request_. Referenced by TAO_IIOP_Transport::send_request().
00159 {
00160 this->first_request_ = 0;
00161 }
|
|
|
Check if the flush timer is still pending.
Definition at line 108 of file Transport.inl. References flush_timer_id_. Referenced by check_buffering_constraints_i(), drain_queue_i(), and handle_timeout().
00109 {
00110 return this->flush_timer_id_ != -1;
00111 }
|
|
||||||||||||
|
Format and queue a message for stream
Definition at line 499 of file Transport.cpp. References ACE_OutputCDR::begin(), TAO_Pluggable_Messaging::format_message(), messaging_object(), and queue_message_i(). Referenced by TAO::Synch_Oneway_Invocation::remote_oneway().
00501 {
00502 if (this->messaging_object ()->format_message (stream) != 0)
00503 return -1;
00504
00505 return this->queue_message_i (stream.begin (), max_wait_time);
00506 }
|
|
||||||||||||||||
|
This is a request for the transport object to write a LocateRequest header before it is sent out. Definition at line 371 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_Pluggable_Messaging::generate_locate_request_header(), LM_DEBUG, messaging_object(), and TAO_debug_level. Referenced by TAO::LocateRequest_Invocation::invoke().
00375 {
00376 if (this->messaging_object ()->generate_locate_request_header (opdetails,
00377 spec,
00378 output) == -1)
00379 {
00380 if (TAO_debug_level > 0)
00381 {
00382 ACE_DEBUG ((LM_DEBUG,
00383 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ")
00384 ACE_TEXT ("error while marshalling the LocateRequest header\n"),
00385 this->id ()));
00386 }
00387
00388 return -1;
00389 }
00390
00391 return 0;
00392 }
|
|
||||||||||||||||
|
This is a request for the transport object to write a request header before it sends out the request Reimplemented in TAO_IIOP_Transport. Definition at line 395 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_ORB_Core::codeset_manager(), first_request_, TAO_Pluggable_Messaging::generate_request_header(), TAO_Codeset_Manager::generate_service_context(), LM_DEBUG, messaging_object(), orb_core(), and TAO_debug_level. Referenced by TAO_IIOP_Transport::generate_request_header(), and TAO::Remote_Invocation::write_header().
00399 {
00400 // codeset service context is only supposed to be sent in the first request
00401 // on a particular connection.
00402 if (this->first_request_)
00403 {
00404 TAO_Codeset_Manager * const csm = this->orb_core ()->codeset_manager ();
00405 if (csm)
00406 csm->generate_service_context (opdetails,*this);
00407 }
00408
00409 if (this->messaging_object ()->generate_request_header (opdetails,
00410 spec,
00411 output) == -1)
00412 {
00413 if (TAO_debug_level > 0)
00414 {
00415 ACE_DEBUG ((LM_DEBUG,
00416 ACE_TEXT ("(%P|%t) - Transport[%d]::generate_request_header, ")
00417 ACE_TEXT ("error while marshalling the Request header\n"),
00418 this->id()));
00419 }
00420
00421 return -1;
00422 }
00423
00424 return 0;
00425 }
|
|
||||||||||||
|
Callback to read incoming data. The ACE_Event_Handler adapter invokes this method as part of its handle_input() operation.
Definition at line 1455 of file Transport.cpp. References ACE_DEBUG, ACE_ERROR, ACE_TEXT, handle_input_missing_data(), handle_input_parse_data(), incoming_message_stack_, LM_DEBUG, LM_ERROR, TAO_Queued_Data::missing_data(), process_queue_head(), TAO_debug_level, TAO_MISSING_DATA_UNDEFINED, and TAO::Incoming_Message_Stack::top(). Referenced by TAO_Connection_Handler::handle_input_internal(), TAO_Connection_Handler::svc_i(), and TAO_Wait_On_Read::wait().
01457 {
01458 if (TAO_debug_level > 3)
01459 {
01460 ACE_DEBUG ((LM_DEBUG,
01461 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"),
01462 this->id ()));
01463 }
01464
01465 // First try to process messages of the head of the incoming queue.
01466 int const retval = this->process_queue_head (rh);
01467
01468 if (retval <= 0)
01469 {
01470 if (retval == -1)
01471 {
01472 if (TAO_debug_level > 2)
01473 {
01474 ACE_DEBUG ((LM_DEBUG,
01475 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01476 ACE_TEXT ("error while parsing the head of the queue\n"),
01477 this->id()));
01478
01479 }
01480 return -1;
01481 }
01482 else
01483 {
01484 // retval == 0
01485
01486 // Processed a message in queue successfully. This
01487 // thread must return to thread-pool now.
01488 return 0;
01489 }
01490 }
01491
01492 TAO_Queued_Data *q_data = 0;
01493
01494 if (this->incoming_message_stack_.top (q_data) != -1
01495 && q_data->missing_data () != TAO_MISSING_DATA_UNDEFINED)
01496 {
01497 /* PRE: q_data->missing_data_ > 0 as all QD on stack must be incomplete */
01498 if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1)
01499 {
01500 if (TAO_debug_level > 0)
01501 {
01502 ACE_ERROR ((LM_ERROR,
01503 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01504 ACE_TEXT ("error consolidating incoming message\n"),
01505 this->id ()));
01506 }
01507 return -1;
01508 }
01509 }
01510 else
01511 {
01512 if (this->handle_input_parse_data (rh, max_wait_time) == -1)
01513 {
01514 if (TAO_debug_level > 0)
01515 {
01516 ACE_ERROR ((LM_ERROR,
01517 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01518 ACE_TEXT ("error parsing incoming message\n"),
01519 this->id ()));
01520 }
01521 return -1;
01522 }
01523 }
01524
01525 return 0;
01526 }
|
|
||||||||||||||||
|
Is invoked by handle_input operation. It consolidate message on top of incoming_message_stack. The amount of missing data is known and recv operation copies data directly into message buffer, as much as a single recv-invocation provides. Definition at line 1673 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, consolidate_process_message(), ACE_CDR::grow(), incoming_message_stack_, ACE_Message_Block::length(), LM_DEBUG, TAO_Queued_Data::missing_data(), TAO_Queued_Data::msg_block(), TAO::Incoming_Message_Stack::pop(), recv(), ACE_Message_Block::space(), ssize_t, TAO_debug_level, and ACE_Message_Block::wr_ptr(). Referenced by handle_input().
01676 {
01677 // paranoid check
01678 if (q_data == 0)
01679 {
01680 return -1;
01681 }
01682
01683 if (TAO_debug_level > 3)
01684 {
01685 ACE_DEBUG ((LM_DEBUG,
01686 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01687 ACE_TEXT ("enter (missing data == %d)\n"),
01688 this->id (), q_data->missing_data ()));
01689 }
01690
01691 size_t const recv_size = q_data->missing_data ();
01692
01693 if (q_data->msg_block ()->space() < recv_size)
01694 {
01695 // make sure the message_block has enough space
01696 size_t const message_size = recv_size + q_data->msg_block ()->length();
01697
01698 if (ACE_CDR::grow (q_data->msg_block (), message_size) == -1)
01699 {
01700 return -1;
01701 }
01702 }
01703
01704 // Saving the size of the received buffer in case any one needs to
01705 // get the size of the message thats received in the
01706 // context. Obviously the value will be changed for each recv call
01707 // and the user is supposed to invoke the accessor only in the
01708 // invocation context to get meaningful information.
01709 this->recv_buffer_size_ = recv_size;
01710
01711 // Read the message into the existing message block on heap
01712 ssize_t const n = this->recv (q_data->msg_block ()->wr_ptr(),
01713 recv_size,
01714 max_wait_time);
01715
01716
01717 if (n <= 0)
01718 {
01719 return n;
01720 }
01721
01722 if (TAO_debug_level > 3)
01723 {
01724 ACE_DEBUG ((LM_DEBUG,
01725 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01726 ACE_TEXT ("read bytes %d\n"),
01727 this->id (), n));
01728 }
01729
01730 q_data->msg_block ()->wr_ptr(n);
01731 q_data->missing_data (q_data->missing_data () - n);
01732
01733 if (q_data->missing_data () == 0)
01734 {
01735 // paranoid check
01736 if (this->incoming_message_stack_.pop (q_data) == -1)
01737 {
01738 return -1;
01739 }
01740
01741 if (this->consolidate_process_message (q_data, rh) == -1)
01742 {
01743 return -1;
01744 }
01745 }
01746
01747 return 0;
01748 }
|
|
||||||||||||
|
Is invoked by handle_input operation. It parses new messages from input stream or consolidates messages whose header has been partially read, the message size being unknown so far. It parses as much data as a single recv-invocation provides. Definition at line 1795 of file Transport.cpp. References ACE_DEBUG, ACE_ERROR, ACE_TEXT, allocate_partial_message_block(), consolidate_enqueue_message(), TAO_Pluggable_Messaging::consolidate_node(), ACE_Message_Block::copy(), TAO_Queued_Data::duplicate(), ACE_CDR::grow(), handle_input_parse_extra_messages(), TAO_Pluggable_Messaging::header_length(), incoming_message_queue_, incoming_message_stack_, TAO_ORB_Core::input_cdr_dblock_allocator(), ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO_ORB_Core::locking_strategy(), ACE_CDR::mb_align(), ACE_OS::memset(), messaging_object(), TAO_Queued_Data::missing_data(), TAO_Queued_Data::more_fragments(), TAO_Queued_Data::msg_block(), TAO_Queued_Data::msg_type(), notify_reactor(), TAO_ORB_Core::orb_params(), TAO_Pluggable_Messaging::parse_next_message(), partial_message_, TAO::Incoming_Message_Stack::pop(), process_parsed_messages(), process_queue_head(), TAO::Incoming_Message_Stack::push(), TAO_Incoming_Message_Queue::queue_length(), ACE_Message_Block::rd_ptr(), recv(), ACE_Message_Block::reset(), TAO_Resume_Handle::set_flag(), TAO_ORB_Parameters::single_read_optimization(), ACE_Message_Block::space(), ssize_t, TAO_debug_level, TAO_MAXBUFSIZE, TAO_MISSING_DATA_UNDEFINED, TAO_PLUGGABLE_MESSAGE_FRAGMENT, TAO::Incoming_Message_Stack::top(), and ACE_Message_Block::wr_ptr(). Referenced by handle_input().
01797 {
01798
01799 if (TAO_debug_level > 3)
01800 {
01801 ACE_DEBUG ((LM_DEBUG,
01802 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01803 ACE_TEXT ("enter\n"),
01804 this->id ()));
01805 }
01806
01807
01808 // The buffer on the stack which will be used to hold the input
01809 // messages, ACE_CDR::MAX_ALIGNMENT compensates the
01810 // memory-alignment. This improves performance with SUN-Java-ORB-1.4
01811 // and higher that sends fragmented requests of size 1024 bytes.
01812 char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT];
01813
01814 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
01815 (void) ACE_OS::memset (buf,
01816 '\0',
01817 sizeof buf);
01818 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
01819
01820 // Create a data block
01821 ACE_Data_Block db (sizeof (buf),
01822 ACE_Message_Block::MB_DATA,
01823 buf,
01824 this->orb_core_->input_cdr_buffer_allocator (),
01825 this->orb_core_->locking_strategy (),
01826 ACE_Message_Block::DONT_DELETE,
01827 this->orb_core_->input_cdr_dblock_allocator ());
01828
01829 // Create a message block
01830 ACE_Message_Block message_block (&db,
01831 ACE_Message_Block::DONT_DELETE,
01832 this->orb_core_->input_cdr_msgblock_allocator ());
01833
01834
01835 // Align the message block
01836 ACE_CDR::mb_align (&message_block);
01837
01838 size_t recv_size = 0; // Note: unsigned integer
01839
01840 // Pointer to newly parsed message
01841 TAO_Queued_Data *q_data = 0;
01842
01843 // optimizing access of constants
01844 size_t const header_length = this->messaging_object ()->header_length ();
01845
01846 // paranoid check
01847 if (header_length > message_block.space ())
01848 {
01849 return -1;
01850 }
01851
01852 if (this->orb_core_->orb_params ()->single_read_optimization ())
01853 {
01854 recv_size = message_block.space ();
01855 }
01856 else
01857 {
01858 // Single read optimization has been de-activated. That means
01859 // that we need to read from transport the GIOP header first
01860 // before the payload. This codes first checks the incoming
01861 // stack for partial messages which needs to be
01862 // consolidated. Otherwise we are in new cycle, reading complete
01863 // GIOP header of new incoming message.
01864 if (this->incoming_message_stack_.top (q_data) != -1
01865 && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
01866 {
01867 // There is a partial message on incoming_message_stack_
01868 // whose length is unknown so far. We need to consolidate
01869 // the GIOP header to get to know the payload size,
01870 recv_size = header_length - q_data->msg_block ()->length ();
01871 }
01872 else
01873 {
01874 // Read amount of data forming GIOP header of new incoming
01875 // message.
01876 recv_size = header_length;
01877 }
01878 // POST: 0 <= recv_size <= header_length
01879 }
01880 // POST: 0 <= recv_size <= message_block->space ()
01881
01882 // If we have a partial message, copy it into our message block and
01883 // clear out the partial message.
01884 if (this->partial_message_ != 0 && this->partial_message_->length () > 0)
01885 {
01886 // (*) Copy back the partial message into current read-buffer,
01887 // verify that the read-strategy of "recv_size" bytes is not
01888 // exceeded. The latter check guarantees that recv_size does not
01889 // roll-over and keeps in range
01890 // 0<=recv_size<=message_block->space()
01891 if (this->partial_message_->length () <= recv_size &&
01892 message_block.copy (this->partial_message_->rd_ptr (),
01893 this->partial_message_->length ()) == 0)
01894 {
01895
01896 recv_size -= this->partial_message_->length ();
01897 this->partial_message_->reset ();
01898 }
01899 else
01900 {
01901 return -1;
01902 }
01903 }
01904 // POST: 0 <= recv_size <= buffer_space
01905
01906 if (0 >= recv_size) // paranoid: the check above (*) guarantees recv_size>=0
01907 {
01908 // This event would cause endless looping, trying frequently to
01909 // read zero bytes from stream. This might happen, if TAOs
01910 // protocol implementation is not correct and tries to read data
01911 // beyond header without "single_read_optimazation" being
01912 // activated.
01913 if (TAO_debug_level > 0)
01914 {
01915 ACE_ERROR ((LM_ERROR,
01916 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01917 ACE_TEXT ("Error - endless loop detection, closing connection"),
01918 this->id ()));
01919 }
01920 return -1;
01921 }
01922
01923 // Saving the size of the received buffer in case any one needs to
01924 // get the size of the message thats received in the
01925 // context. Obviously the value will be changed for each recv call
01926 // and the user is supposed to invoke the accessor only in the
01927 // invocation context to get meaningful information.
01928 this->recv_buffer_size_ = recv_size;
01929
01930 // Read the message into the message block that we have created on
01931 // the stack.
01932 ssize_t const n = this->recv (message_block.wr_ptr (),
01933 recv_size,
01934 max_wait_time);
01935
01936 // If there is an error return to the reactor..
01937 if (n <= 0)
01938 {
01939 return n;
01940 }
01941
01942 if (TAO_debug_level > 3)
01943 {
01944 ACE_DEBUG ((LM_DEBUG,
01945 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01946 ACE_TEXT ("read %d bytes\n"),
01947 this->id (), n));
01948 }
01949
01950 // Set the write pointer in the stack buffer
01951 message_block.wr_ptr (n);
01952
01953 //
01954 // STACK PROCESSING OR MESSAGE CONSOLIDATION
01955 //
01956
01957 // PRE: data in buffer is aligned && message_block.length() > 0
01958
01959 if (this->incoming_message_stack_.top (q_data) != -1
01960 && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
01961 {
01962 //
01963 // MESSAGE CONSOLIDATION
01964 //
01965
01966 // Partial message on incoming_message_stack_ needs to be
01967 // consolidated. The message header could not be parsed so far
01968 // and therefor the message size is unknown yet. Consolidating
01969 // the message destroys the memory alignment of succeeding
01970 // messages sharing the buffer, for that reason consolidation
01971 // and stack based processing are mutial exclusive.
01972 if (this->messaging_object ()->consolidate_node (q_data,
01973 message_block) == -1)
01974 {
01975 if (TAO_debug_level > 0)
01976 {
01977 ACE_ERROR ((LM_ERROR,
01978 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01979 ACE_TEXT ("error consolidating message from input buffer\n"),
01980 this->id () ));
01981 }
01982 return -1;
01983 }
01984
01985 // Complete message are to be enqueued and later processed
01986 if (q_data->missing_data () == 0)
01987 {
01988 if (this->incoming_message_stack_.pop (q_data) == -1)
01989 {
01990 return -1;
01991 }
01992
01993 if (this->consolidate_enqueue_message (q_data) == -1)
01994 {
01995 return -1;
01996 }
01997 }
01998
01999 if (message_block.length () > 0
02000 && this->handle_input_parse_extra_messages (message_block) == -1)
02001 {
02002 return -1;
02003 }
02004
02005 // In any case try to process the enqueued messages
02006 if (this->process_queue_head (rh) == -1)
02007 {
02008 return -1;
02009 }
02010 }
02011 else
02012 {
02013 //
02014 // STACK PROCESSING (critical path)
02015 //
02016
02017 // Process the first message in buffer on stack
02018
02019 // (PRE: first message resides in aligned memory) Make a node of
02020 // the message-block..
02021
02022 TAO_Queued_Data qd (&message_block,
02023 this->orb_core_->transport_message_buffer_allocator ());
02024
02025 size_t mesg_length = 0;
02026
02027 if (this->messaging_object ()->parse_next_message (qd,
02028 mesg_length) == -1
02029 || (qd.missing_data () == 0
02030 && mesg_length > message_block.length ()) )
02031 {
02032 // extracting message failed
02033 return -1;
02034 }
02035 // POST: qd.missing_data_ == 0 --> mesg_length <= message_block.length()
02036 // This prevents seeking rd_ptr behind the wr_ptr
02037
02038 if (qd.missing_data () != 0 ||
02039 qd.more_fragments () ||
02040 qd.msg_type () == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
02041 {
02042 if (qd.missing_data () == 0)
02043 {
02044 // Dealing with a fragment
02045 TAO_Queued_Data *nqd =
02046 TAO_Queued_Data::duplicate (qd);
02047
02048 if (nqd == 0)
02049 {
02050 return -1;
02051 }
02052
02053 // mark the end of message in new buffer
02054 char* end_mark = nqd->msg_block ()->rd_ptr ()
02055 + mesg_length;
02056 nqd->msg_block ()->wr_ptr (end_mark);
02057
02058 // move the read pointer forward in old buffer
02059 message_block.rd_ptr (mesg_length);
02060
02061 // enqueue the message
02062 if (this->consolidate_enqueue_message (nqd) == -1)
02063 {
02064 return -1;
02065 }
02066
02067 if (message_block.length () > 0
02068 && this->handle_input_parse_extra_messages (message_block) == -1)
02069 {
02070 return -1;
02071 }
02072
02073 // In any case try to process the enqueued messages
02074 if (this->process_queue_head (rh) == -1)
02075 {
02076 return -1;
02077 }
02078 }
02079 else if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED)
02080 {
02081 // Incomplete message, must be the last one in buffer
02082
02083 if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED &&
02084 qd.missing_data () > message_block.space ())
02085 {
02086 // Re-Allocate correct size on heap
02087 if (ACE_CDR::grow (qd.msg_block (),
02088 message_block.length ()
02089 + qd.missing_data ()) == -1)
02090 {
02091 return -1;
02092 }
02093 }
02094
02095 TAO_Queued_Data *nqd =
02096 TAO_Queued_Data::duplicate (qd);
02097
02098 if (nqd == 0)
02099 {
02100 return -1;
02101 }
02102
02103 // move read-pointer to end of buffer
02104 message_block.rd_ptr (message_block.length());
02105
02106 this->incoming_message_stack_.push (nqd);
02107 }
02108 }
02109 else
02110 {
02111 //
02112 // critical path
02113 //
02114
02115 // We cant process the message on stack right now. First we
02116 // have got to parse extra messages from message_block,
02117 // putting them into queue. When this is done we can return
02118 // to process this message, and notifying other threads to
02119 // process the messages in queue.
02120
02121 char * end_marker = message_block.rd_ptr ()
02122 + mesg_length;
02123
02124 if (message_block.length () > mesg_length)
02125 {
02126 // There are more message in data stream to be parsed.
02127 // Safe the rd_ptr to restore later.
02128 char *rd_ptr_stack_mesg = message_block.rd_ptr ();
02129
02130 // Skip parsed message, jump to next message in buffer
02131 // PRE: mesg_length <= message_block.length ()
02132 message_block.rd_ptr (mesg_length);
02133
02134 // Extract remaining messages and enqueue them for later
02135 // heap processing
02136 if (this->handle_input_parse_extra_messages (message_block) == -1)
02137 {
02138 return -1;
02139 }
02140
02141 // correct the end_marker
02142 end_marker = message_block.rd_ptr ();
02143
02144 // Restore rd_ptr
02145 message_block.rd_ptr (rd_ptr_stack_mesg);
02146 }
02147
02148 // The following if-else has been copied from
02149 // process_queue_head(). While process_queue_head()
02150 // processes message on heap, here we will process a message
02151 // on stack.
02152
02153 // Now that we have one message on stack to be processed,
02154 // check whether we have one more message in the queue...
02155 if (this->incoming_message_queue_.queue_length () > 0)
02156 {
02157 if (TAO_debug_level > 0)
02158 {
02159 ACE_DEBUG ((LM_DEBUG,
02160 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02161 ACE_TEXT ("notify reactor\n"),
02162 this->id ()));
02163
02164 }
02165
02166 const int retval = this->notify_reactor ();
02167
02168 if (retval == 1)
02169 {
02170 // Let the class know that it doesn't need to resume the
02171 // handle..
02172 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02173 }
02174 else if (retval < 0)
02175 return -1;
02176 }
02177 else
02178 {
02179 // As there are no further messages in queue just resume
02180 // the handle. Set the flag incase someone had reset the flag..
02181 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02182 }
02183
02184 // PRE: incoming_message_queue is empty
02185 if (this->process_parsed_messages (&qd,
02186 rh) == -1)
02187 {
02188 return -1;
02189 }
02190
02191 // move the rd_ptr tp position of end_marker
02192 message_block.rd_ptr (end_marker);
02193 }
02194 }
02195
02196 // Now that all cases have been processed, there might be kept some data
02197 // in buffer that needs to be safed for next "handle_input" invocations.
02198 if (message_block.length () > 0)
02199 {
02200 if (this->partial_message_ == 0)
02201 {
02202 this->allocate_partial_message_block ();
02203 }
02204
02205 if (this->partial_message_ != 0 &&
02206 this->partial_message_->copy (message_block.rd_ptr (),
02207 message_block.length ()) == 0)
02208 {
02209 message_block.rd_ptr (message_block.length ());
02210 }
02211 else
02212 {
02213 return -1;
02214 }
02215 }
02216
02217 return 0;
02218 }
|
|
|
Is invoked by handle_input_parse_data. Parses all messages remaining in message_block. Definition at line 1752 of file Transport.cpp. References consolidate_enqueue_message(), TAO_Pluggable_Messaging::extract_next_message(), incoming_message_stack_, ACE_Message_Block::length(), messaging_object(), TAO_Queued_Data::missing_data(), and TAO::Incoming_Message_Stack::push(). Referenced by handle_input_parse_data().
01753 {
01754
01755 // store buffer status of last extraction: -1 parse error, 0
01756 // incomplete message header in buffer, 1 complete messages header
01757 // parsed
01758 int buf_status = 0;
01759
01760 TAO_Queued_Data *q_data = 0; // init
01761
01762 // parse buffer until all messages have been extracted, consolidate
01763 // and enqueue complete messages, if the last message being parsed
01764 // has missin data, it is stays on top of incoming_message_stack.
01765 while (message_block.length () > 0 &&
01766 (buf_status = this->messaging_object ()->extract_next_message
01767 (message_block, q_data)) != -1 &&
01768 q_data != 0) // paranoid check
01769 {
01770 if (q_data->missing_data () == 0)
01771 {
01772 if (this->consolidate_enqueue_message (q_data) == -1)
01773 {
01774 return -1;
01775 }
01776 }
01777 else // incomplete message read, probably the last message in buffer
01778 {
01779 // can not fail
01780 this->incoming_message_stack_.push (q_data);
01781 }
01782
01783 q_data = 0; // reset
01784 } // while
01785
01786 if (buf_status == -1)
01787 {
01788 return -1;
01789 }
01790
01791 return 0;
01792 }
|
|
|
Callback method to reactively drain the outgoing data queue.
Definition at line 471 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, drain_queue(), LM_DEBUG, and TAO_debug_level. Referenced by TAO_Block_Flushing_Strategy::flush_message(), TAO_Block_Flushing_Strategy::flush_transport(), and TAO_Connection_Handler::handle_output_eh().
00472 {
00473 if (TAO_debug_level > 3)
00474 {
00475 ACE_DEBUG ((LM_DEBUG,
00476 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output\n"),
00477 this->id ()));
00478 }
00479
00480 // The flushing strategy (potentially via the Reactor) wants to send
00481 // more data, first check if there is a current message that needs
00482 // more sending...
00483 int const retval = this->drain_queue ();
00484
00485 if (TAO_debug_level > 3)
00486 {
00487 ACE_DEBUG ((LM_DEBUG,
00488 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ")
00489 ACE_TEXT ("drain_queue returns %d/%d\n"),
00490 this->id (),
00491 retval, errno));
00492 }
00493
00494 // Any errors are returned directly to the Reactor
00495 return retval;
00496 }
|
|
||||||||||||
|
Definition at line 810 of file Transport.cpp. References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT, current_deadline_, flush_timer_pending(), TAO_Flushing_Strategy::flush_transport(), TAO_ORB_Core::flushing_strategy(), LM_DEBUG, orb_core(), reset_flush_timer(), TAO_Flushing_Strategy::schedule_output(), and TAO_debug_level. Referenced by TAO_Transport_Timer::handle_timeout().
00812 {
00813 if (TAO_debug_level > 6)
00814 {
00815 ACE_DEBUG ((LM_DEBUG,
00816 ACE_TEXT ("TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, ")
00817 ACE_TEXT ("timer expired\n"),
00818 this->id ()));
00819 }
00820
00821 /// This is the only legal ACT in the current configuration....
00822 if (act != &this->current_deadline_)
00823 {
00824 return -1;
00825 }
00826
00827 if (this->flush_timer_pending ())
00828 {
00829 // The timer is always a oneshot timer, so mark is as not
00830 // pending.
00831 this->reset_flush_timer ();
00832
00833 TAO_Flushing_Strategy *flushing_strategy =
00834 this->orb_core ()->flushing_strategy ();
00835 int const result = flushing_strategy->schedule_output (this);
00836 if (result == TAO_Flushing_Strategy::MUST_FLUSH)
00837 {
00838 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00839 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00840 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00841 (void) flushing_strategy->flush_transport (this);
00842 }
00843 }
00844
00845 return 0;
00846 }
|
|
|
Definition at line 92 of file Transport.inl.
00093 {
00094 this->id_ = id;
00095 }
|
|
|
Set and Get the identifier for this transport instance.
If not set, this will return an integer representation of the Definition at line 86 of file Transport.inl. Referenced by TAO::Transport_Cache_Manager::bind_i(), TAO_Connection_Handler::close_connection_eh(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Transport_Cache_Manager::find_transport(), TAO_Connection_Handler::handle_input_internal(), TAO_Connector::parallel_connect(), TAO_GIOP_Message_Base::process_reply_message(), TAO::Transport_Cache_Manager::purge(), TAO_GIOP_Message_Base::send_close_connection(), TAO_GIOP_Message_Base::send_error(), TAO_Connector::wait_for_connection_completion(), and TAO_Leader_Follower::wait_for_event().
00087 {
00088 return this->id_;
00089 }
|
|
|
Request is sent and the reply is received. Idle the transport now. Definition at line 258 of file Transport.cpp. References TAO_Transport_Mux_Strategy::idle_after_reply(), and tms(). Referenced by TAO::LocateRequest_Invocation::invoke(), and TAO::Synch_Twoway_Invocation::remote_twoway().
00259 {
00260 return this->tms ()->idle_after_reply ();
00261 }
|
|
|
Request has been just sent, but the reply is not received. Idle the transport now. Definition at line 252 of file Transport.cpp. References TAO_Transport_Mux_Strategy::idle_after_send(), and tms(). Referenced by TAO::LocateRequest_Invocation::invoke(), and TAO::Synch_Twoway_Invocation::remote_twoway().
00253 {
00254 return this->tms ()->idle_after_send ();
00255 }
|
|
|
Is this transport really connected.
Definition at line 164 of file Transport.inl. References ACE_GUARD_RETURN. Referenced by TAO::Cache_IntId::Cache_IntId(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Synch_Oneway_Invocation::remote_oneway(), and TAO_Connector::wait_for_connection_completion().
00165 {
00166 ACE_GUARD_RETURN (ACE_Lock,
00167 ace_mon,
00168 *this->handler_lock_,
00169 false);
00170
00171 return this->is_connected_;
00172 }
|
|
|
Return true if the tcs has been set.
Definition at line 152 of file Transport.inl. References tcs_set_.
00153 {
00154 return tcs_set_;
00155 }
|
|
|
Cache management.
Definition at line 447 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, LM_DEBUG, TAO::Transport_Cache_Manager::make_idle(), TAO_debug_level, and transport_cache_manager(). Referenced by TAO_Exclusive_TMS::idle_after_reply(), TAO_Muxed_TMS::idle_after_send(), and TAO_IIOP_Connection_Handler::process_listen_point_list().
00448 {
00449 if (TAO_debug_level > 3)
00450 {
00451 ACE_DEBUG ((LM_DEBUG,
00452 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"),
00453 this->id ()));
00454 }
00455
00456 return this->transport_cache_manager ().make_idle (this->cache_map_entry_);
00457 }
|
|
||||||||||||
|
Initialising the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects. Implemented in TAO_IIOP_Transport. |
|
|
Return the messaging object that is used to format the data that needs to be sent. Implemented in TAO_IIOP_Transport. Referenced by allocate_partial_message_block(), consolidate_enqueue_message(), consolidate_process_message(), format_queue_message(), TAO_On_Demand_Fragmentation_Strategy::fragment(), generate_locate_request(), generate_request_header(), handle_input_parse_data(), handle_input_parse_extra_messages(), out_stream(), process_parsed_messages(), and send_connection_closed_notifications_i(). |
|
|
Definition at line 2410 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, event_handler_i(), TAO_Wait_Strategy::is_registered(), LM_DEBUG, ACE_Reactor::notify(), orb_core(), TAO_ORB_Core::reactor(), TAO_debug_level, and ws_. Referenced by handle_input_parse_data(), and process_queue_head().
02411 {
02412 if (!this->ws_->is_registered ())
02413 {
02414 return 0;
02415 }
02416
02417 ACE_Event_Handler *eh = this->event_handler_i ();
02418
02419 // Get the reactor associated with the event handler
02420 ACE_Reactor *reactor = this->orb_core ()->reactor ();
02421
02422 if (TAO_debug_level > 0)
02423 {
02424 ACE_DEBUG ((LM_DEBUG,
02425 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02426 ACE_TEXT ("notify to Reactor\n"),
02427 this->id ()));
02428 }
02429
02430
02431 // Send a notification to the reactor...
02432 int const retval = reactor->notify (eh, ACE_Event_Handler::READ_MASK);
02433
02434 if (retval < 0 && TAO_debug_level > 2)
02435 {
02436 // @todo: need to think about what is the action that
02437 // we can take when we get here.
02438 ACE_DEBUG ((LM_DEBUG,
02439 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02440 ACE_TEXT ("notify to the reactor failed..\n"),
02441 this->id ()));
02442 }
02443
02444 return 1;
02445 }
|
|
|
Definition at line 51 of file Transport.inl. References opening_connection_role_.
00052 {
00053 this->opening_connection_role_ = role;
00054 }
|
|
|
Methods dealing with the role of the connection, e.g., CLIENT or SERVER. See CORBA 2.6 Specification, Section 15.5.1 for origin of definitions. Definition at line 45 of file Transport.inl. References opening_connection_role_. Referenced by TAO::Wait_On_LF_No_Upcall::can_process_upcalls(), TAO_Connector::connect(), and TAO_IIOP_Connection_Handler::open().
00046 {
00047 return this->opening_connection_role_;
00048 }
|
|
|
|
|
|
|
Accessor for the output CDR stream.
Definition at line 2496 of file Transport.cpp. References messaging_object(), and TAO_Pluggable_Messaging::out_stream(). Referenced by TAO::LocateRequest_Invocation::invoke(), TAO::Synch_Oneway_Invocation::remote_oneway(), and TAO::Synch_Twoway_Invocation::remote_twoway().
02497 {
02498 return this->messaging_object ()->out_stream ();
02499 }
|
|
|
Hooks that can be overridden in concrete transports. These hooks are invoked just after connection establishment (or after a connection is fetched from cache). The return value signifies whether the invoker should proceed with post connection establishment activities. Protocols like SSLIOP need this to verify whether connections already established have valid certificates. There are no pre_connect_hooks () since the transport doesn't exist before a connection establishment. :-)
Definition at line 306 of file Transport.cpp. Referenced by TAO_Connector::connect().
00307 {
00308 return true;
00309 }
|
|
|
Perform all the actions when this transport get opened.
Definition at line 2513 of file Transport.cpp. References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT, close_connection(), LM_ERROR, purge_entry(), queue_is_empty_i(), TAO_Wait_Strategy::register_handler(), TAO_debug_level, and wait_strategy(). Referenced by TAO_IIOP_Connection_Handler::open().
02514 {
02515 this->id_ = id;
02516
02517 {
02518 ACE_GUARD_RETURN (ACE_Lock,
02519 ace_mon,
02520 *this->handler_lock_,
02521 false);
02522 this->is_connected_ = true;
02523 }
02524
02525 // When we have data in our outgoing queue schedule ourselves
02526 // for output
02527 if (this->queue_is_empty_i ())
02528 return true;
02529
02530 // If the wait strategy wants us to be registered with the reactor
02531 // then we do so. If registeration is required and it succeeds,
02532 // #REFCOUNT# becomes two.
02533 if (this->wait_strategy ()->register_handler () != 0)
02534 {
02535 // Registration failures.
02536
02537 // Purge from the connection cache, if we are not in the cache, this
02538 // just does nothing.
02539 (void) this->purge_entry ();
02540
02541 // Close the handler.
02542 (void) this->close_connection ();
02543
02544 if (TAO_debug_level > 0)
02545 ACE_ERROR ((LM_ERROR,
02546 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_connect , ")
02547 ACE_TEXT ("could not register the transport ")
02548 ACE_TEXT ("in the reactor.\n"),
02549 this->id ()));
02550
02551 return false;
02552 }
02553
02554 return true;
02555 }
|
|
|
do what needs to be done when closing the transport
Definition at line 2502 of file Transport.cpp. References ACE_GUARD, cleanup_queue_i(), and purge_entry(). Referenced by TAO_Connection_Handler::close_connection_eh().
02503 {
02504 this->is_connected_ = false;
02505 this->purge_entry ();
02506 {
02507 ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
02508 this->cleanup_queue_i ();
02509 }
02510 }
|
|
||||||||||||
|
Process the message by sending it to the higher layers of the ORB. Definition at line 2222 of file Transport.cpp. References ACE_DEBUG, ACE_ERROR, ACE_TEXT, TAO_Pluggable_Messaging::discard_fragmented_message(), ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO::Transport::Stats::messages_received(), messaging_object(), TAO_Queued_Data::missing_data(), TAO_Queued_Data::msg_block(), TAO_Queued_Data::msg_type(), TAO_Pluggable_Messaging::process_reply_message(), TAO_Pluggable_Messaging::process_request_message(), TAO_Resume_Handle::resume_handle(), stats_, TAO_debug_level, TAO_PLUGGABLE_MESSAGE_CANCELREQUEST, TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION, TAO_PLUGGABLE_MESSAGE_FRAGMENT, TAO_PLUGGABLE_MESSAGE_LOCATEREPLY, TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST, TAO_PLUGGABLE_MESSAGE_MESSAGERROR, TAO_PLUGGABLE_MESSAGE_REPLY, and TAO_PLUGGABLE_MESSAGE_REQUEST. Referenced by consolidate_process_message(), handle_input_parse_data(), and process_queue_head().
02224 {
02225 if (TAO_debug_level > 7)
02226 {
02227 ACE_DEBUG ((LM_DEBUG,
02228 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02229 ACE_TEXT ("entering (missing data == %d)\n"),
02230 this->id(), qd->missing_data ()));
02231 }
02232
02233 #if TAO_HAS_TRANSPORT_CURRENT == 1
02234 // Update stats, if any
02235 if (this->stats_ != 0)
02236 this->stats_->messages_received (qd->msg_block ()->length ());
02237 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
02238
02239 switch (qd->msg_type ())
02240 {
02241 case TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION:
02242 {
02243 if (TAO_debug_level > 0)
02244 ACE_DEBUG ((LM_DEBUG,
02245 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02246 ACE_TEXT ("received CloseConnection message - %m\n"),
02247 this->id()));
02248
02249 // Return a "-1" so that the next stage can take care of
02250 // closing connection and the necessary memory management.
02251 return -1;
02252 }
02253 break;
02254 case TAO_PLUGGABLE_MESSAGE_REQUEST:
02255 case TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST:
02256 {
02257 // Let us resume the handle before we go ahead to process the
02258 // request. This will open up the handle for other threads.
02259 rh.resume_handle ();
02260
02261 if (this->messaging_object ()->process_request_message (
02262 this,
02263 qd) == -1)
02264 {
02265 // Return a "-1" so that the next stage can take care of
02266 // closing connection and the necessary memory management.
02267 return -1;
02268 }
02269 }
02270 break;
02271 case TAO_PLUGGABLE_MESSAGE_REPLY:
02272 case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY:
02273 {
02274 rh.resume_handle ();
02275
02276 TAO_Pluggable_Reply_Params params (this);
02277
02278 if (this->messaging_object ()->process_reply_message (params,
02279 qd) == -1)
02280 {
02281 if (TAO_debug_level > 0)
02282 ACE_DEBUG ((LM_DEBUG,
02283 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02284 ACE_TEXT ("error in process_reply_message - %m\n"),
02285 this->id ()));
02286
02287 return -1;
02288 }
02289
02290 }
02291 break;
02292 case TAO_PLUGGABLE_MESSAGE_CANCELREQUEST:
02293 {
02294 // The associated request might be incomplete residing
02295 // fragmented in messaging object. We must make sure the
02296 // resources allocated by fragments are released.
02297 if (this->messaging_object ()->discard_fragmented_message (qd) == -1)
02298 {
02299 if (TAO_debug_level > 0)
02300 {
02301 ACE_ERROR ((LM_ERROR,
02302 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02303 ACE_TEXT ("error processing CancelRequest\n"),
02304 this->id ()));
02305 }
02306 }
02307
02308 // We are not able to cancel requests being processed already;
02309 // this is declared as optional feature by CORBA, and TAO does
02310 // not support this currently.
02311
02312 // Just continue processing, CancelRequest does not mean to cut
02313 // off the connection.
02314 }
02315 break;
02316 case TAO_PLUGGABLE_MESSAGE_MESSAGERROR:
02317 {
02318 if (TAO_debug_level > 0)
02319 {
02320 ACE_ERROR ((LM_ERROR,
02321 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02322 ACE_TEXT ("received MessageError, closing connection\n"),
02323 this->id ()));
02324 }
02325 return -1;
02326 }
02327 break;
02328 case TAO_PLUGGABLE_MESSAGE_FRAGMENT:
02329 {
02330 // Nothing to be done.
02331 }
02332 break;
02333 }
02334
02335 // If not, just return back..
02336 return 0;
02337 }
|
|
|
Definition at line 2340 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, TAO_Incoming_Message_Queue::dequeue_head(), incoming_message_queue_, LM_DEBUG, notify_reactor(), process_parsed_messages(), TAO_Incoming_Message_Queue::queue_length(), TAO_Queued_Data::release(), TAO_Resume_Handle::set_flag(), and TAO_debug_level. Referenced by handle_input(), and handle_input_parse_data().
02341 {
02342 if (TAO_debug_level > 3)
02343 {
02344 ACE_DEBUG ((LM_DEBUG,
02345 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"),
02346 this->id (), this->incoming_message_queue_.queue_length () ));
02347 }
02348
02349 // See if message in queue ...
02350 if (this->incoming_message_queue_.queue_length () > 0)
02351 {
02352 // Get the message on the head of the queue..
02353 TAO_Queued_Data *qd =
02354 this->incoming_message_queue_.dequeue_head ();
02355
02356 if (TAO_debug_level > 3)
02357 {
02358 ACE_DEBUG ((LM_DEBUG,
02359 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02360 ACE_TEXT ("the size of the queue is [%d]\n"),
02361 this->id (),
02362 this->incoming_message_queue_.queue_length()));
02363 }
02364 // Now that we have pulled out out one message out of the queue,
02365 // check whether we have one more message in the queue...
02366 if (this->incoming_message_queue_.queue_length () > 0)
02367 {
02368 if (TAO_debug_level > 0)
02369 {
02370 ACE_DEBUG ((LM_DEBUG,
02371 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02372 ACE_TEXT ("notify reactor\n"),
02373 this->id ()));
02374 }
02375
02376 int const retval = this->notify_reactor ();
02377
02378 if (retval == 1)
02379 {
02380 // Let the class know that it doesn't need to resume the
02381 // handle..
02382 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02383 }
02384 else if (retval < 0)
02385 return -1;
02386 }
02387 else
02388 {
02389 // As we are ready to process the last message just resume
02390 // the handle. Set the flag incase someone had reset the flag..
02391 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02392 }
02393
02394 // Process the message...
02395 if (this->process_parsed_messages (qd, rh) == -1)
02396 {
02397 return -1;
02398 }
02399
02400 // Delete the Queued_Data..
02401 TAO_Queued_Data::release (qd);
02402
02403 return 0;
02404 }
02405
02406 return 1;
02407 }
|
|
|
Called by the cache when the ORB is shuting down.
Definition at line 238 of file Transport.cpp. References add_reference(), TAO::Connection_Handler_Set, ACE_Unbounded_Set< T >::insert(), TAO_Wait_Strategy::non_blocking(), opening_connection_role_, and ws_.
00239 {
00240 if (this->ws_->non_blocking () ||
00241 this->opening_connection_role_ == TAO::TAO_SERVER_ROLE)
00242 return false;
00243
00244 (void) this->add_reference ();
00245
00246 h.insert (this->connection_handler_i ());
00247
00248 return true;
00249 }
|
|
|
Added event handler to the handlers set. Called by the cache when the cache is closing.
Definition at line 230 of file Transport.cpp. References add_reference(), TAO::Connection_Handler_Set, and ACE_Unbounded_Set< T >::insert().
00231 {
00232 (void) this->add_reference ();
00233
00234 handlers.insert (this->connection_handler_i ());
00235 }
|
|
|
Cache management.
Definition at line 441 of file Transport.cpp. References TAO::Transport_Cache_Manager::purge_entry(), and transport_cache_manager(). Referenced by TAO_Connection_Handler::close_handler(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), post_open(), pre_close(), recache_transport(), ~TAO_Transport(), and TransportCleanupGuard::~TransportCleanupGuard().
00442 {
00443 return this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
00444 }
|
|
|
Definition at line 76 of file Transport.inl. References purging_order_.
00077 {
00078 // This should only be called by the Transport Cache Manager when
00079 // it is holding it's lock.
00080 // The transport should still be here since the cache manager still
00081 // has a reference to it.
00082 this->purging_order_ = value;
00083 }
|
|
|
Get and Set the purging order. The purging strategy uses the set version to set the purging order. Definition at line 70 of file Transport.inl. References purging_order_. Referenced by TAO_LRU_Connection_Purging_Strategy::update_item().
00071 {
00072 return this->purging_order_;
00073 }
|
|
|
Check if there are messages pending in the queue.
Definition at line 98 of file Transport.inl. References ACE_GUARD_RETURN, and queue_is_empty_i(). Referenced by TAO_Reactive_Flushing_Strategy::flush_transport(), TAO_Leader_Follower_Flushing_Strategy::flush_transport(), and TAO_Block_Flushing_Strategy::flush_transport().
00099 {
00100 ACE_GUARD_RETURN (ACE_Lock,
00101 ace_mon,
00102 *this->handler_lock_,
00103 false);
00104 return this->queue_is_empty_i ();
00105 }
|
|
|
Check if there are messages pending in the queue. This version assumes that the lock is already held. Use with care!
Definition at line 747 of file Transport.cpp. Referenced by post_open(), and queue_is_empty().
00748 {
00749 return (this->head_ == 0);
00750 }
|
|
||||||||||||
|
Queue a message for message_block
Definition at line 1432 of file Transport.cpp. References ACE_NEW_RETURN, and TAO_Queued_Message::push_back(). Referenced by format_queue_message(), and send_asynchronous_message_i().
01434 {
01435 TAO_Queued_Message *queued_message = 0;
01436 ACE_NEW_RETURN (queued_message,
01437 TAO_Asynch_Queued_Message (message_block,
01438 this->orb_core_,
01439 max_wait_time,
01440 0,
01441 true),
01442 -1);
01443 queued_message->push_back (this->head_, this->tail_);
01444
01445 return 0;
01446 }
|
|
|
Recache ourselves in the cache.
Definition at line 430 of file Transport.cpp. References TAO::Transport_Cache_Manager::cache_transport(), purge_entry(), and transport_cache_manager(). Referenced by TAO_IIOP_Connection_Handler::process_listen_point_list().
00431 {
00432 // First purge our entry
00433 this->purge_entry ();
00434
00435 // Then add ourselves to the cache
00436 return this->transport_cache_manager ().cache_transport (desc,
00437 this);
00438 }
|
|
||||||||||||||||
|
Read len bytes from into buf. This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.
Implemented in TAO_IIOP_Transport. Referenced by handle_input_missing_data(), and handle_input_parse_data(). |
|
|
Accessor to recv_buffer_size_.
Definition at line 181 of file Transport.inl.
00182 {
00183 return this->recv_buffer_size_;
00184 }
|
|
|
Register the handler with the reactor. Register the handler with the reactor. This method is used by the Wait_On_Reactor strategy. The transport must register its event handler with the ORB's Reactor.
Definition at line 318 of file Transport.cpp. References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT, event_handler_i(), TAO_Wait_Strategy::is_registered(), LM_DEBUG, ACE_Event_Handler::reactor(), TAO_ORB_Core::reactor(), ACE_Reactor::register_handler(), TAO_debug_level, and ws_. Referenced by TAO_Wait_On_Reactor::register_handler(), TAO_Wait_On_Leader_Follower::register_handler(), and TAO_Wait_On_Leader_Follower::sending_request().
00319 {
00320 if (TAO_debug_level > 4)
00321 {
00322 ACE_DEBUG ((LM_DEBUG,
00323 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"),
00324 this->id ()));
00325 }
00326
00327 ACE_Reactor * const r = this->orb_core_->reactor ();
00328
00329 // @@note: This should be okay since the register handler call will
00330 // not make a nested call into the transport.
00331 ACE_GUARD_RETURN (ACE_Lock,
00332 ace_mon,
00333 *this->handler_lock_,
00334 false);
00335
00336 if (r == this->event_handler_i ()->reactor ())
00337 {
00338 return 0;
00339 }
00340
00341 // Set the flag in the Connection Handler and in the Wait Strategy
00342 // @@Maybe we should set these flags after registering with the
00343 // reactor. What if the registration fails???
00344 this->ws_->is_registered (true);
00345
00346 // Register the handler with the reactor
00347 return r->register_handler (this->event_handler_i (),
00348 ACE_Event_Handler::READ_MASK);
00349 }
|
|
|
Initialising the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects. Definition at line 2490 of file Transport.cpp. References event_handler_i(), and ACE_Event_Handler::remove_reference(). Referenced by TAO_Connection_Handler::cancel_pending_connection(), TAO_Connection_Handler::close_handler(), TAO::Cache_IntId::operator=(), TAO::Transport_Cache_Manager::purge(), TAO_Asynch_Reply_Dispatcher_Base::transport(), TAO::Cache_IntId::~Cache_IntId(), TAO_Asynch_Reply_Dispatcher_Base::~TAO_Asynch_Reply_Dispatcher_Base(), TAO_Thread_Per_Connection_Handler::~TAO_Thread_Per_Connection_Handler(), and TransportCleanupGuard::~TransportCleanupGuard().
02491 {
02492 return this->event_handler_i ()->remove_reference ();
02493 }
|
|
|
Print out error messages if the event handler is not valid.
Definition at line 1197 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, ACE_TEXT_CHAR_TO_TCHAR, LM_DEBUG, and TAO_debug_level.
01198 {
01199 if (TAO_debug_level > 0)
01200 {
01201 ACE_DEBUG ((LM_DEBUG,
01202 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler")
01203 ACE_TEXT ("(%s) no longer associated with handler [tag=%d]\n"),
01204 this->id (), ACE_TEXT_CHAR_TO_TCHAR (caller), this->tag_));
01205 }
01206 }
|
|
|
The flush timer expired or was explicitly cancelled, mark it as not pending Definition at line 114 of file Transport.inl. References current_deadline_, and flush_timer_id_. Referenced by drain_queue_i(), and handle_timeout().
00115 {
00116 this->flush_timer_id_ = -1;
00117 this->current_deadline_ = ACE_Time_Value::zero;
00118 }
|
|
|
Schedule handle_output() callbacks.
Definition at line 754 of file Transport.cpp. References ACE_DEBUG, ACE_TEXT, event_handler_i(), ACE_Reactor::find_handler(), ACE_Event_Handler::get_handle(), LM_DEBUG, ACE_Event_Handler::reactor(), ACE_Event_Handler::remove_reference(), ACE_Reactor::schedule_wakeup(), and TAO_debug_level. Referenced by TAO_Reactive_Flushing_Strategy::schedule_output(), and TAO_Leader_Follower_Flushing_Strategy::schedule_output().
00755 {
00756 ACE_Event_Handler * const eh = this->event_handler_i ();
00757 ACE_Reactor * const reactor = eh->reactor ();
00758
00759 if (reactor == 0)
00760 return -1;
00761
00762 // Check to see if our event handler is still registered with the
00763 // reactor. It's possible for another thread to have run close_connection()
00764 // since we last used the event handler.
00765 ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ());
00766 if (found != eh)
00767 {
00768 if(TAO_debug_level > 3)
00769 {
00770 ACE_DEBUG ((LM_DEBUG,
00771 "TAO (%P|%t) - Transport[%d]::schedule_output_i "
00772 "event handler not found in reactor, returning -1\n",
00773 this->id ()));
00774 }
00775 if (found)
00776 {
00777 found->remove_reference ();
00778 }
00779 return -1;
00780 }
00781 found->remove_reference ();
00782
00783 if (TAO_debug_level > 3)
00784 {
00785 ACE_DEBUG ((LM_DEBUG,
00786 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"),
00787 this->id ()));
00788 }
00789
00790 return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00791 }
|
|
||||||||||||||||||||
|
Write the complete Message_Block chain to the connection. This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently. Often the implementation simply forwards the arguments to the underlying ACE_Svc_Handler class. Using the code factored out into ACE. Be careful with protocols that perform non-trivial transformations of the data, such as SSLIOP or protocols that compress the stream.
ENOENT.
Implemented in TAO_IIOP_Transport. Referenced by drain_queue_helper(). |
|
||||||||||||||||
|
Send an asynchronous message, i.e. do not block until the message is on the wire Definition at line 1269 of file Transport.cpp. References ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT, check_buffering_constraints_i(), ACE_Message_Block::cont(), ETIME, EWOULDBLOCK, TAO_Flushing_Strategy::flush_transport(), TAO_ORB_Core::flushing_strategy(), ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO::Transport_Queueing_Strategy::must_queue(), orb_core(), queue_message_i(), TAO_Flushing_Strategy::schedule_output(), send_message_block_chain_i(), ssize_t, TAO_debug_level, ACE_Message_Block::total_length(), and TAO_Stub::transport_queueing_strategy(). Referenced by send_message_shared_i().
01272 {
01273 // Let's figure out if the message should be queued without trying
01274 // to send first:
01275 bool try_sending_first = true;
01276
01277 bool const queue_empty = (this->head_ == 0);
01278
01279 TAO::Transport_Queueing_Strategy *queue_strategy =
01280 stub->transport_queueing_strategy ();
01281
01282 if (!queue_empty)
01283 {
01284 try_sending_first = false;
01285 }
01286 else if (queue_strategy)
01287 {
01288 if (queue_strategy->must_queue (queue_empty))
01289 {
01290 try_sending_first = false;
01291 }
01292 }
01293
01294 if (try_sending_first)
01295 {
01296 ssize_t n = 0;
01297 size_t byte_count = 0;
01298 // ... in this case we must try to send the message first ...
01299
01300 size_t const total_length = message_block->total_length ();
01301
01302 if (TAO_debug_level > 6)
01303 {
01304 ACE_DEBUG ((LM_DEBUG,
01305 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01306 ACE_TEXT ("trying to send the message (ml = %d)\n"),
01307 this->id (), total_length));
01308 }
01309
01310 // @@ I don't think we want to hold the mutex here, however if
01311 // we release it we need to recheck the status of the transport
01312 // after we return... once I understand the final form for this
01313 // code I will re-visit this decision
01314 n = this->send_message_block_chain_i (message_block,
01315 byte_count,
01316 max_wait_time);
01317 if (n == -1)
01318 {
01319 // ... if this is just an EWOULDBLOCK we must schedule the
01320 // message for later, if it is ETIME we still have to send
01321 // the complete message, because cutting off the message at
01322 // this point will destroy the synchronization with the
01323 // server ...
01324 if (errno != EWOULDBLOCK && errno != ETIME)
01325 {
01326 if (TAO_debug_level > 0)
01327 {
01328 ACE_ERROR ((LM_ERROR,
01329 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01330 ACE_TEXT ("fatal error in ")
01331 ACE_TEXT ("send_message_block_chain_i - %m\n"),
01332 this->id ()));
01333 }
01334 return -1;
01335 }
01336 }
01337
01338 // ... let's figure out if the complete message was sent ...
01339 if (total_length == byte_count)
01340 {
01341 // Done, just return. Notice that there are no allocations
01342 // or copies up to this point (though some fancy calling
01343 // back and forth).
01344 // This is the common case for the critical path, it should
01345 // be fast.
01346 return 0;
01347 }
01348
01349 // If it was partially sent, then we can't allow a timeout
01350 if (byte_count > 0)
01351 max_wait_time = 0;
01352
01353 if (TAO_debug_level > 6)
01354 {
01355 ACE_DEBUG ((LM_DEBUG,
01356 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01357 ACE_TEXT ("partial send %d / %d bytes\n"),
01358 this->id (), byte_count, total_length));
01359 }
01360
01361 // ... part of the data was sent, need to figure out what piece
01362 // of the message block chain must be queued ...
01363 while (message_block != 0 && message_block->length () == 0)
01364 {
01365 message_block = message_block->cont ();
01366 }
01367
01368 // ... at least some portion of the message block chain should
01369 // remain ...
01370 }
01371
01372 // ... either the message must be queued or we need to queue it
01373 // because it was not completely sent out ...
01374
01375 if (TAO_debug_level > 6)
01376 {
01377 ACE_DEBUG ((LM_DEBUG,
01378 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01379 ACE_TEXT ("message is queued\n"),
01380 this->id ()));
01381 }
01382
01383 if (this->queue_message_i (message_block, max_wait_time) == -1)
01384 {
01385 if (TAO_debug_level > 0)
01386 {
01387 ACE_DEBUG ((LM_DEBUG,
01388 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01389 ACE_TEXT ("send_asynchronous_message_i, ")
01390 ACE_TEXT ("cannot queue message for - %m\n"),
01391 this->id ()));
01392 }
01393 return -1;
01394 }
01395
01396 // ... if the queue is full we need to activate the output on the
01397 // queue ...
01398 bool must_flush = false;
01399 const bool constraints_reached =
01400 this->check_buffering_constraints_i (stub,
01401 must_flush);
01402
01403 // ... but we also want to activate it if the message was partially
01404 // sent.... Plus, when we use the blocking flushing strategy the
01405 // queue is flushed as a side-effect of 'schedule_output()'
01406
01407 TAO_Flushing_Strategy *flushing_strategy =
01408 this->orb_core ()->flushing_strategy ();
01409
01410 if (constraints_reached || try_sending_first)
01411 {
01412 int const result = flushing_strategy->schedule_output (this);
01413 if (result == TAO_Flushing_Strategy::MUST_FLUSH)
01414 {
01415 must_flush = true;
01416 }
01417 }
01418
01419 if (must_flush)
01420 {
01421 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
01422 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
01423 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
01424
01425 (void) flushing_strategy->flush_transport (this);
01426 }
01427
01428 return 0;
01429 }
|
|
|
Notify all the components inside a Transport when the underlying connection is closed. Definition at line 1209 of file Transport.cpp. References ACE_GUARD, TAO_Transport_Mux_Strategy::connection_closed(), send_connection_closed_notifications_i(), and tms(). Referenced by TAO_Connection_Handler::close_connection_eh().
01210 {
01211 {
01212 ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
01213
01214 this->send_connection_closed_notifications_i ();
01215 }
01216
01217 this->tms ()->connection_closed ();
01218 }
|
|
|
Assume the lock is held.
Definition at line 1221 of file Transport.cpp. References cleanup_queue_i(), messaging_object(), and TAO_Pluggable_Messaging::reset(). Referenced by send_connection_closed_notifications().
01222 {
01223 this->cleanup_queue_i ();
01224
01225 this->messaging_object ()->reset ();
01226 }
|
|
||||||||||||||||||||
|
Once the ORB is prepared to receive a reply (see send_request() above), and all the arguments have been marshaled the CDR stream must be 'formatted', i.e. the message_size field in the GIOP header can finally be set to the proper value. Implemented in TAO_IIOP_Transport. Referenced by TAO_On_Demand_Fragmentation_Strategy::fragment(), TAO_GIOP_Message_Base::make_send_locate_reply(), TAO_GIOP_Message_Base::process_request(), and TAO_GIOP_Message_Base::send_reply_exception(). |
|
||||||||||||||||
|
Send a message block chain,.
Definition at line 509 of file Transport.cpp. References ACE_GUARD_RETURN, and send_message_block_chain_i(). Referenced by TAO_GIOP_Message_Base::send_close_connection(), and TAO_GIOP_Message_Base::send_error().
00512 {
00513 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00514
00515 return this->send_message_block_chain_i (mb,
00516 bytes_transferred,
00517 max_wait_time);
00518 }
|
|
||||||||||||||||
|
Send a message block chain, assuming the lock is held.
Definition at line 521 of file Transport.cpp. References drain_queue_i(), TAO_Synch_Queued_Message::message_length(), TAO_Queued_Message::push_back(), TAO_Queued_Message::remove_from_list(), and ACE_Message_Block::total_length(). Referenced by send_asynchronous_message_i(), and send_message_block_chain().
00524 {
00525 size_t const total_length = mb->total_length ();
00526
00527 // We are going to block, so there is no need to clone
00528 // the message block.
00529 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00530
00531 synch_message.push_back (this->head_, this->tail_);
00532
00533 int const n = this->drain_queue_i ();
00534
00535 if (n == -1)
00536 {
00537 synch_message.remove_from_list (this->head_, this->tail_);
00538 return -1; // Error while sending...
00539 }
00540 else if (n == 1)
00541 {
00542 bytes_transferred = total_length;
00543 return 1; // Empty queue, message was sent..
00544 }
00545
00546 // Remove the temporary message from the queue...
00547 synch_message.remove_from_list (this->head_, this->tail_);
00548
00549 bytes_transferred = total_length - synch_message.message_length ();
00550
00551 return 0;
00552 }
|
|
||||||||||||||||||||
|
Sent the contents of message_block.
Reimplemented in TAO_IIOP_Transport. Definition at line 280 of file Transport.cpp. References ACE_GUARD_RETURN, close_connection(), and send_message_shared_i().
00284 {
00285 int result = 0;
00286
00287 {
00288 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00289
00290 result =
00291 this->send_message_shared_i (stub, message_semantics,
00292 message_block, max_wait_time);
00293 }
00294
00295 if (result == -1)
00296 {
00297 this->close_connection ();
00298 }
00299
00300 return result;
00301 }
|
|
||||||||||||||||||||
|
Implement send_message_shared() assuming the handler_lock_ is held. Definition at line 1229 of file Transport.cpp. References ACE_Message_Block::length(), TAO::Transport::Stats::messages_sent(), send_asynchronous_message_i(), send_reply_message_i(), send_synchronous_message_i(), stats_, TAO_ONEWAY_REQUEST, TAO_REPLY, and TAO_TWOWAY_REQUEST. Referenced by send_message_shared(), and TAO_IIOP_Transport::send_message_shared().
01233 {
01234 int ret = 0;
01235
01236 #if TAO_HAS_TRANSPORT_CURRENT == 1
01237 size_t const message_length = message_block->length ();
01238 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
01239
01240 switch (message_semantics)
01241 {
01242 case TAO_Transport::TAO_TWOWAY_REQUEST:
01243 ret = this->send_synchronous_message_i (message_block,
01244 max_wait_time);
01245 break;
01246
01247 case TAO_Transport::TAO_REPLY:
01248 ret = this->send_reply_message_i (message_block,
01249 max_wait_time);
01250 break;
01251
01252 case TAO_Transport::TAO_ONEWAY_REQUEST:
01253 ret = this->send_asynchronous_message_i (stub,
01254 message_block,
01255 max_wait_time);
01256 break;
01257 }
01258
01259 #if TAO_HAS_TRANSPORT_CURRENT == 1
01260 // "Count" the message, only if no error was encountered.
01261 if (ret != -1 && this->stats_ != 0)
01262 this->stats_->messages_sent (message_length);
01263 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
01264
01265 return ret;
01266 }
|
|
||||||||||||
|
Send a reply message, i.e. do not block until the message is on the wire, but just return after adding them to the queue. Definition at line 657 of file Transport.cpp. References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT, TAO_Synch_Queued_Message::clone(), TAO_Queued_Message::destroy(), TAO_Flushing_Strategy::flush_message(), TAO_ORB_Core::flushing_strategy(), LM_DEBUG, orb_core(), TAO_Queued_Message::push_back(), TAO_Queued_Message::remove_from_list(), TAO_Flushing_Strategy::schedule_output(), send_synch_message_helper_i(), and TAO_debug_level. Referenced by send_message_shared_i().
00659 {
00660 // Dont clone now.. We could be sent in one shot!
00661 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00662
00663 synch_message.push_back (this->head_,
00664 this->tail_);
00665
00666 int const n =
00667 this->send_synch_message_helper_i (synch_message,
00668 max_wait_time);
00669
00670 if (n == -1 || n == 1)
00671 {
00672 return n;
00673 }
00674
00675 if (TAO_debug_level > 3)
00676 {
00677 ACE_DEBUG ((LM_DEBUG,
00678 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ")
00679 ACE_TEXT ("preparing to add to queue before leaving\n"),
00680 this->id ()));
00681 }
00682
00683 // Till this point we shouldn't have any copying and that is the
00684 // point anyway. Now, remove the node from the list
00685 synch_message.remove_from_list (this->head_, this->tail_);
00686
00687 // Clone the node that we have.
00688 TAO_Queued_Message *msg =
00689 synch_message.clone (this->orb_core_->transport_message_buffer_allocator ());
00690
00691 // Stick it in the queue
00692 msg->push_back (this->head_, this->tail_);
00693
00694 TAO_Flushing_Strategy *flushing_strategy =
00695 this->orb_core ()->flushing_strategy ();
00696
00697 int const result = flushing_strategy->schedule_output (this);
00698
00699 if (result == -1)
00700 {
00701 if (TAO_debug_level > 5)
00702 {
00703 ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_"
00704 "message_i, dequeuing msg due to schedule_output "
00705 "failure\n", this->id ()));
00706 }
00707 msg->remove_from_list (this->head_, this->tail_);
00708 msg->destroy ();
00709 }
00710 else if (result == TAO_Flushing_Strategy::MUST_FLUSH)
00711 {
00712 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00713 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00714 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00715 (void) flushing_strategy->flush_message(this, msg, 0);
00716 }
00717
00718 return 1;
00719 }
|
|
||||||||||||||||||||||||
|
Preparing the ORB to receive the reply only once the request is completely sent opens the system to some subtle race conditions: suppose the ORB is running in a multi-threaded configuration, thread A makes a request while thread B is using the Reactor to process all incoming requests. Thread A could be implemented as follows: 1) send the request 2) setup the ORB to receive the reply 3) wait for the request but in this case thread B may receive the reply between step (1) and (2), and drop it as an invalid or unexpected message. Consequently the correct implementation is: 1) setup the ORB to receive the reply 2) send the request 3) wait for the reply The following method encapsulates this idiom.
Implemented in TAO_IIOP_Transport. Referenced by TAO::Remote_Invocation::send_message(). |
|
||||||||||||
|
A helper method used by send_synchronous_message_i() and send_reply_message_i(). Reusable code that could be used by both the methods. Definition at line 722 of file Transport.cpp. References TAO_Synch_Queued_Message::all_data_sent(), drain_queue_i(), and TAO_Queued_Message::remove_from_list(). Referenced by send_reply_message_i(), and send_synchronous_message_i().
00724 {
00725 // @todo: Need to send timeouts for writing..
00726 int const n = this->drain_queue_i ();
00727
00728 if (n == -1)
00729 {
00730 synch_message.remove_from_list (this->head_, this->tail_);
00731 return -1; // Error while sending...
00732 }
00733 else if (n == 1)
00734 {
00735 return 1; // Empty queue, message was sent..
00736 }
00737
00738 if (synch_message.all_data_sent ())
00739 {
00740 return 1;
00741 }
00742
00743 return 0;
00744 }
|
|
||||||||||||
|
Send a synchronous message, i.e. block until the message is on the wire Definition at line 555 of file Transport.cpp. References ACE_ERROR, ACE_GUARD_RETURN, ACE_NEW_RETURN, ACE_TEXT, TAO_Synch_Queued_Message::current_block(), ETIME, TAO_Flushing_Strategy::flush_message(), TAO_ORB_Core::flushing_strategy(), LM_ERROR, TAO_Synch_Queued_Message::message_length(), orb_core(), TAO_Queued_Message::push_back(), TAO_Queued_Message::push_front(), TAO_Queued_Message::remove_from_list(), TAO_Flushing_Strategy::schedule_output(), send_synch_message_helper_i(), and TAO_debug_level. Referenced by send_message_shared_i().
00557 {
00558 // We are going to block, so there is no need to clone
00559 // the message block.
00560 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00561 size_t const message_length = synch_message.message_length ();
00562
00563 synch_message.push_back (this->head_, this->tail_);
00564
00565 int const n = this->send_synch_message_helper_i (synch_message,
00566 0 /*ignored*/);
00567 if (n == -1 || n == 1)
00568 {
00569 return n;
00570 }
00571
00572 // @todo: Check for timeouts!
00573 // if (max_wait_time != 0 && errno == ETIME) return -1;
00574 TAO_Flushing_Strategy *flushing_strategy =
00575 this->orb_core ()->flushing_strategy ();
00576 int result = flushing_strategy->schedule_output (this);
00577 if (result == -1)
00578 {
00579 synch_message.remove_from_list (this->head_, this->tail_);
00580 if (TAO_debug_level > 0)
00581 {
00582 ACE_ERROR ((LM_ERROR,
00583 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
00584 ACE_TEXT ("send_synchronous_message_i, ")
00585 ACE_TEXT ("error while scheduling flush - %m\n"),
00586 this->id ()));
00587 }
00588 return -1;
00589 }
00590
00591 // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH,
00592 // because we're always going to flush anyway.
00593
00594 // Release the mutex, other threads may modify the queue as we
00595 // block for a long time writing out data.
00596 {
00597 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00598 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00599 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00600
00601 result = flushing_strategy->flush_message (this,
00602 &synch_message,
00603 max_wait_time);
00604 }
00605
00606 if (result == -1)
00607 {
00608 synch_message.remove_from_list (this->head_, this->tail_);
00609
00610 if (errno == ETIME)
00611 {
00612 // If partially sent, then we must queue the remainder.
00613 if (message_length != synch_message.message_length ())
00614 {
00615 // This is a timeout, there is only one nasty case: the
00616 // message has been partially sent! We simply cannot take
00617 // the message out of the queue, because that would corrupt
00618 // the connection.
00619 //
00620 // What we do is replace the queued message with an
00621 // asynchronous message, that contains only what remains of
00622 // the timed out request. If you think about sending
00623 // CancelRequests in this case: there is no much point in
00624 // doing that: the receiving ORB would probably ignore it,
00625 // and figuring out the request ID would be a bit of a
00626 // nightmare.
00627 //
00628 TAO_Queued_Message *queued_message = 0;
00629 ACE_NEW_RETURN (queued_message,
00630 TAO_Asynch_Queued_Message (
00631 synch_message.current_block (),
00632 this->orb_core_,
00633 0, // no timeout
00634 0,
00635 true),
00636 -1);
00637 queued_message->push_front (this->head_, this->tail_);
00638 }
00639 }
00640
00641 if (TAO_debug_level > 0)
00642 {
00643 ACE_ERROR ((LM_ERROR,
00644 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ")
00645 ACE_TEXT ("error while flushing message - %m\n"),
00646 this->id ()));
00647 }
00648
00649 return -1;
00650 }
00651
00652 return 1;
00653 }
|
|
|
Accessor to sent_byte_count_.
Definition at line 187 of file Transport.inl. References sent_byte_count_.
00188 {
00189 return this->sent_byte_count_;
00190 }
|
|
|
Transport statistics.
|
|
|
Return the protocol tag. The OMG assigns unique tags (a 32-bit unsigned number) to each protocol. New protocol tags can be obtained free of charge from the OMG, check the documents in corbafwd.h for more details. Definition at line 8 of file Transport.inl.
00009 {
00010 return this->tag_;
00011 }
|
|
|
Extracts the list of listen points from the cdr stream. The list would have the protocol specific details of the ListenPoints Reimplemented in TAO_IIOP_Transport. Definition at line 274 of file Transport.cpp. References ACE_NOTSUP_RETURN. Referenced by TAO_GIOP_Message_Generator_Parser_12::process_bidir_context().
00275 {
00276 ACE_NOTSUP_RETURN (-1);
00277 }
|
|
|
Get the TAO_Tranport_Mux_Strategy used by this object. The role of the TAO_Transport_Mux_Strategy is described in more detail in that class' documentation. Enough is to say that the class is used to control how many threads can have pending requests over the same connection. Multiplexing multiple threads over the same connection conserves resources and is almost required for AMI, but having only one pending request per connection is more efficient and reduces the possibilities of priority inversions. Definition at line 20 of file Transport.inl. Referenced by idle_after_reply(), idle_after_send(), TAO::LocateRequest_Invocation_Adapter::invoke(), TAO::LocateRequest_Invocation::invoke(), TAO::Invocation_Adapter::invoke_remote_i(), TAO_GIOP_Message_Base::process_reply_message(), TAO::Synch_Twoway_Invocation::remote_twoway(), and send_connection_closed_notifications().
00021 {
00022 return tms_;
00023 }
|
|
|
Helper method that returns the Transport Cache Manager.
Definition at line 2448 of file Transport.cpp. References TAO_ORB_Core::lane_resources(), and TAO_Thread_Lane_Resources::transport_cache(). Referenced by make_idle(), purge_entry(), recache_transport(), and update_transport().
02449 {
02450 return this->orb_core_->lane_resources ().transport_cache ();
02451 }
|
|
|
Cache management.
Definition at line 460 of file Transport.cpp. References transport_cache_manager(), and TAO::Transport_Cache_Manager::update_entry(). Referenced by TAO_Connection_Handler::handle_input_internal(), TAO_Connection_Handler::handle_output_eh(), and TAO_Connection_Handler::svc_i().
00461 {
00462 return this->transport_cache_manager ().update_entry (this->cache_map_entry_);
00463 }
|
|
|
Return the TAO_Wait_Strategy used by this object. The role of the TAO_Wait_Strategy is described in more detail in that class' documentation. Enough is to say that the ORB can wait for a reply blocking on read(), using the Reactor to wait for multiple events concurrently or using the Leader/Followers protocol. Definition at line 27 of file Transport.inl. References ws_. Referenced by TAO_Connection_Handler::close_connection_eh(), TAO_IIOP_Connector::complete_connection(), TAO_Connector::connect(), TAO::Transport_Cache_Manager::find_transport(), TAO_Connection_Handler::handle_input_eh(), TAO_IIOP_Connection_Handler::open(), post_open(), and TAO::Synch_Twoway_Invocation::wait_for_reply().
00028 {
00029 return this->ws_;
00030 }
|
|
|
CodeSet negotiation - Set the wchar codeset translator factory.
Definition at line 144 of file Transport.inl. References tcs_set_.
00145 {
00146 this->wchar_translator_ = tf;
00147 this->tcs_set_ = 1;
00148 }
|
|
|
CodeSet Negotiation - Get the wchar codeset translator factory.
Definition at line 131 of file Transport.inl.
00132 {
00133 return this->wchar_translator_;
00134 }
|
|
|
Definition at line 808 of file Transport.h. |
|
|
These classes need privileged access to: Definition at line 807 of file Transport.h. |
|
|
Needs priveleged access to event_handler_i () Definition at line 812 of file Transport.h. |
|
|
Have we sent any info on bidirectional information or have we received any info regarding making the connection served by this transport bidirectional. The flag is used as follows: + We dont want to send the bidirectional context info more than once on the connection. Why? Waste of marshalling and demarshalling time on the client. + On the server side -- once a client that has established the connection asks the server to use the connection both ways, we *dont* want the server to pack service info to the client. That is not allowed. We need a flag to prevent such a things from happening. The value of this flag will be 0 if the client sends info and 1 if the server receives the info. Definition at line 963 of file Transport.h. Referenced by bidirectional_flag(). |
|
|
Our entry in the cache. We don't own this. It is here for our convenience. We cannot just change things around. Definition at line 935 of file Transport.h. Referenced by cache_map_entry(). |
|
|
Additional member values required to support codeset translation. @Phil, I think it would be nice if we could think of a way to do the following. We have been trying to use the transport for marking about translator factories and such! IMHO this is a wrong encapulation ie. trying to populate the transport object with these details. We should probably have a class something like TAO_Message_Property or TAO_Message_Translator or whatever (I am sure you get the idea) and encapsulate all these details. Coupling these seems odd. if I have to be more cynical we can move this to the connection_handler and it may more sense with the DSCP stuff around there. Do you agree? Definition at line 1038 of file Transport.h. |
|
|
The queue will start draining no later than if* the deadline is Definition at line 980 of file Transport.h. Referenced by check_buffering_constraints_i(), handle_timeout(), and reset_flush_timer(). |
|
|
First_request_ is true until the first request is sent or received. This is necessary since codeset context information is necessary only on the first request. After that, the translators are fixed for the life of the connection. Definition at line 1050 of file Transport.h. Referenced by first_request_sent(), and generate_request_header(). |
|
|
The timer ID.
Definition at line 983 of file Transport.h. Referenced by check_buffering_constraints_i(), flush_timer_pending(), and reset_flush_timer(). |
|
|
This is an Definition at line 997 of file Transport.h. Referenced by ~TAO_Transport(). |
|
|
Implement the outgoing data queue.
Definition at line 968 of file Transport.h. |
|
|
A unique identifier for the transport. This never *never* changes over the lifespan, so we don't have to worry about locking it. HINT: Protocol-specific transports that use connection handler might choose to set this to the handle for their connection. Definition at line 1007 of file Transport.h. |
|
|
Queue of the consolidated, incoming messages..
Definition at line 972 of file Transport.h. Referenced by consolidate_enqueue_message(), handle_input_parse_data(), and process_queue_head(). |
|
|
Stack of incoming fragments, consolidated messages are going to be enqueued in "incoming_message_queue_" Definition at line 976 of file Transport.h. Referenced by handle_input(), handle_input_missing_data(), handle_input_parse_data(), and handle_input_parse_extra_messages(). |
|
|
Is this transport really connected or not. In case of oneways with SYNC_NONE Policy we don't wait until the connection is ready and we buffer the requests in this transport until the connection is ready Definition at line 1021 of file Transport.h. |
|
|
Definition at line 965 of file Transport.h. Referenced by opened_as(), and provide_blockable_handler(). |
|
|
Global orbcore resource.
Definition at line 931 of file Transport.h. Referenced by TAO_GIOP_Message_Base::process_locate_request(), and TAO_GIOP_Message_Base::process_request(). |
|
|
Holds the partial GIOP message (if there is one).
Definition at line 1053 of file Transport.h. Referenced by allocate_partial_message_block(), and handle_input_parse_data(). |
|
|
Used by the LRU, LFU and FIFO Connection Purging Strategies.
Definition at line 1010 of file Transport.h. Referenced by purging_order(). |
|
|
Size of the buffer received.
Definition at line 1013 of file Transport.h. |
|
|
Number of bytes sent.
Definition at line 1016 of file Transport.h. Referenced by drain_queue_helper(), drain_queue_i(), and sent_byte_count(). |
|
|
Statistics.
Definition at line 1066 of file Transport.h. Referenced by process_parsed_messages(), send_message_shared_i(), and ~TAO_Transport(). |
|
|
IOP protocol tag.
Definition at line 928 of file Transport.h. |
|
|
Definition at line 969 of file Transport.h. |
|
|
The tcs_set_ flag indicates that negotiation has occured and so the translators are correct, since a null translator is valid if both ends are using the same codeset, whatever that codeset might be. Definition at line 1044 of file Transport.h. Referenced by char_translator(), is_tcs_set(), and wchar_translator(). |
|
|
Strategy to decide whether multiple requests can be sent over the same connection or the connection is exclusive for a request. Definition at line 939 of file Transport.h. |
|
|
The adapter used to receive timeout callbacks from the Reactor.
Definition at line 986 of file Transport.h. |
|
|
Definition at line 1039 of file Transport.h. |
|
|
Strategy for waiting for the reply after sending the request.
Definition at line 942 of file Transport.h. Referenced by notify_reactor(), provide_blockable_handler(), register_handler(), TAO_Transport(), wait_strategy(), and ~TAO_Transport(). |
1.3.6