Generic definitions for the Transport class. More...
#include <Transport.h>
Classes | |
struct | Drain_Result |
Public Types | |
enum | Drain_Result_Enum { DR_ERROR = -1, DR_OK = 0, DR_QUEUE_EMPTY = 1, DR_WOULDBLOCK = 2 } |
Public Member Functions | |
TAO_Transport (CORBA::ULong tag, TAO_ORB_Core *orb_core, size_t input_cdr_size=ACE_CDR::DEFAULT_BUFSIZE) | |
Default creator, requires the tag value be supplied. | |
virtual | ~TAO_Transport (void) |
Destructor. | |
CORBA::ULong | tag (void) const |
Return the protocol tag. | |
TAO_ORB_Core * | orb_core (void) const |
Access the ORB that owns this connection. | |
TAO_Transport_Mux_Strategy * | tms (void) const |
Get the TAO_Tranport_Mux_Strategy used by this object. | |
TAO_Wait_Strategy * | wait_strategy (void) const |
Return the TAO_Wait_Strategy used by this object. | |
Drain_Result | handle_output (TAO::Transport::Drain_Constraints const &c) |
Callback method to reactively drain the outgoing data queue. | |
int | bidirectional_flag (void) const |
Get the bidirectional flag. | |
void | bidirectional_flag (int flag) |
Set the bidirectional flag. | |
void | cache_map_entry (TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *entry) |
Set the Cache Map entry. | |
TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * | cache_map_entry (void) |
Get the Cache Map entry. | |
size_t | id (void) const |
Set and Get the identifier for this transport instance. | |
void | id (size_t id) |
TAO::Connection_Role | opened_as (void) const |
void | opened_as (TAO::Connection_Role) |
unsigned long | purging_order (void) const |
void | purging_order (unsigned long value) |
bool | queue_is_empty (void) |
Check if there are messages pending in the queue. | |
bool | register_if_necessary (void) |
Register with the reactor via the wait strategy. | |
void | provide_handler (TAO::Connection_Handler_Set &handlers) |
Added event handler to the handlers set. | |
bool | provide_blockable_handler (TAO::Connection_Handler_Set &handlers) |
virtual int | register_handler (void) |
Register the handler with the reactor. | |
virtual ssize_t | send (iovec *iov, int iovcnt, size_t &bytes_transferred, ACE_Time_Value const *timeout)=0 |
Write the complete Message_Block chain to the connection. | |
virtual ssize_t | recv (char *buffer, size_t len, const ACE_Time_Value *timeout=0)=0 |
Read len bytes from into buf. | |
Control connection lifecycle | |
bool | idle_after_send (void) |
bool | idle_after_reply (void) |
virtual void | close_connection (void) |
Call the implementation method after obtaining the lock. | |
Template methods | |
The Transport class uses the Template Method Pattern to implement the protocol specific functionality. Implementors of a pluggable protocol should override the following methods with the semantics documented below. | |
class | TAO_Reactive_Flushing_Strategy |
class | TAO_Leader_Follower_Flushing_Strategy |
class | TAO_Thread_Per_Connection_Handler |
CORBA::ULong const | tag_ |
IOP protocol tag. | |
TAO_ORB_Core *const | orb_core_ |
Global orbcore resource. | |
TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * | cache_map_entry_ |
TAO_Transport_Mux_Strategy * | tms_ |
TAO_Wait_Strategy * | ws_ |
Strategy for waiting for the reply after sending the request. | |
int | bidirectional_flag_ |
TAO::Connection_Role | opening_connection_role_ |
TAO_Queued_Message * | head_ |
Implement the outgoing data queue. | |
TAO_Queued_Message * | tail_ |
TAO_Incoming_Message_Queue | incoming_message_queue_ |
Queue of the consolidated, incoming messages.. | |
TAO::Incoming_Message_Stack | incoming_message_stack_ |
ACE_Time_Value | current_deadline_ |
long | flush_timer_id_ |
The timer ID. | |
TAO_Transport_Timer | transport_timer_ |
The adapter used to receive timeout callbacks from the Reactor. | |
ACE_Lock * | handler_lock_ |
size_t | id_ |
A unique identifier for the transport. | |
unsigned long | purging_order_ |
Used by the LRU, LFU and FIFO Connection Purging Strategies. | |
size_t | recv_buffer_size_ |
Size of the buffer received. | |
size_t | sent_byte_count_ |
Number of bytes sent. | |
bool | is_connected_ |
TAO_GIOP_Message_Base * | messaging_object_ |
Our messaging object. | |
TAO_Codeset_Translator_Base * | char_translator_ |
Additional member values required to support codeset translation. | |
TAO_Codeset_Translator_Base * | wchar_translator_ |
CORBA::Boolean | tcs_set_ |
bool | first_request_ |
ACE_Message_Block * | partial_message_ |
Holds the partial GIOP message (if there is one). | |
TAO::Transport::Stats * | stats_ |
Statistics. | |
bool | flush_in_post_open_ |
Indicate that flushing needs to be done in post_open(). | |
TAO_SYNCH_MUTEX | output_cdr_mutex_ |
lock for synchronizing Transport OutputCDR access | |
void | messaging_init (TAO_GIOP_Message_Version const &version) |
virtual int | tear_listen_point_list (TAO_InputCDR &cdr) |
virtual bool | post_connect_hook (void) |
Hooks that can be overridden in concrete transports. | |
ACE_Event_Handler::Reference_Count | add_reference (void) |
Memory management routines. | |
ACE_Event_Handler::Reference_Count | remove_reference (void) |
TAO_GIOP_Message_Base * | messaging_object (void) |
virtual ACE_Event_Handler * | event_handler_i (void)=0 |
bool | is_connected (void) const |
Is this transport really connected. | |
bool | post_open (size_t id) |
Perform all the actions when this transport get opened. | |
void | pre_close (void) |
do what needs to be done when closing the transport | |
TAO_Connection_Handler * | connection_handler (void) |
Get the connection handler for this transport. | |
TAO_OutputCDR & | out_stream (void) |
Accessor for the output CDR stream. | |
TAO_SYNCH_MUTEX & | output_cdr_lock (void) |
Accessor for synchronizing Transport OutputCDR access. | |
void | set_flush_in_post_open (void) |
Set the flush in post open flag. | |
bool | can_be_purged (void) |
Can the transport be purged? | |
virtual void | set_bidir_context_info (TAO_Operation_Details &opdetails) |
int | generate_locate_request (TAO_Target_Specification &spec, TAO_Operation_Details &opdetails, TAO_OutputCDR &output) |
virtual int | generate_request_header (TAO_Operation_Details &opd, TAO_Target_Specification &spec, TAO_OutputCDR &msg) |
int | recache_transport (TAO_Transport_Descriptor_Interface *desc) |
Recache ourselves in the cache. | |
virtual int | handle_input (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time=0) |
Callback to read incoming data. | |
virtual int | send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, TAO_Message_Semantics message_semantics, ACE_Time_Value *max_time_wait)=0 |
virtual int | send_message (TAO_OutputCDR &stream, TAO_Stub *stub=0, TAO_Message_Semantics message_semantics=TAO_TWOWAY_REQUEST, ACE_Time_Value *max_time_wait=0)=0 |
virtual int | send_message_shared (TAO_Stub *stub, TAO_Message_Semantics message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
Sent the contents of message_block. | |
int | format_queue_message (TAO_OutputCDR &stream, ACE_Time_Value *max_wait_time, TAO_Stub *stub) |
int | send_message_block_chain (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time=0) |
int | send_message_block_chain_i (const ACE_Message_Block *message_block, size_t &bytes_transferred, TAO::Transport::Drain_Constraints const &dc) |
Send a message block chain, assuming the lock is held. | |
int | purge_entry (void) |
Cache management. | |
int | make_idle (void) |
Cache management. | |
int | update_transport (void) |
Cache management. | |
int | handle_timeout (const ACE_Time_Value ¤t_time, const void *act) |
size_t | recv_buffer_size (void) const |
Accessor to recv_buffer_size_. | |
size_t | sent_byte_count (void) const |
Accessor to sent_byte_count_. | |
TAO_Codeset_Translator_Base * | char_translator (void) const |
CodeSet Negotiation - Get the char codeset translator factory. | |
TAO_Codeset_Translator_Base * | wchar_translator (void) const |
CodeSet Negotiation - Get the wchar codeset translator factory. | |
void | char_translator (TAO_Codeset_Translator_Base *) |
CodeSet negotiation - Set the char codeset translator factory. | |
void | wchar_translator (TAO_Codeset_Translator_Base *) |
CodeSet negotiation - Set the wchar codeset translator factory. | |
void | assign_translators (TAO_InputCDR *, TAO_OutputCDR *) |
void | clear_translators (TAO_InputCDR *, TAO_OutputCDR *) |
CORBA::Boolean | is_tcs_set () const |
Return true if the tcs has been set. | |
void | first_request_sent (bool flag=false) |
Set the state of the first_request_ to flag. | |
bool | first_request () const |
Get the first request flag. | |
void | send_connection_closed_notifications (void) |
TAO::Transport::Stats * | stats (void) const |
Transport statistics. | |
virtual TAO_Connection_Handler * | connection_handler_i (void)=0 |
int | process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) |
int | send_message_shared_i (TAO_Stub *stub, TAO_Message_Semantics message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
int | queue_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time, bool back=true) |
ACE_Time_Value const * | io_timeout (TAO::Transport::Drain_Constraints const &dc) const |
Re-factor computation of I/O timeouts based on operation timeouts. Depending on the wait strategy, we need to timeout I/O operations or not. For example, if we are using a non-blocking strategy, we want to pass 0 to all I/O operations, and rely on the ACE_NONBLOCK settings on the underlying sockets. However, for blocking strategies we want to pass the operation timeouts, to respect the application level policies. | |
TAO::Transport_Cache_Manager & | transport_cache_manager (void) |
Helper method that returns the Transport Cache Manager. | |
Drain_Result | drain_queue (TAO::Transport::Drain_Constraints const &dc) |
Send some of the data in the queue. | |
Drain_Result | drain_queue_i (TAO::Transport::Drain_Constraints const &dc) |
Implement drain_queue() assuming the lock is held. | |
bool | queue_is_empty_i (void) const |
Check if there are messages pending in the queue. | |
Drain_Result | drain_queue_helper (int &iovcnt, iovec iov[], TAO::Transport::Drain_Constraints const &dc) |
A helper routine used in drain_queue_i(). | |
int | schedule_output_i (void) |
Schedule handle_output() callbacks. | |
int | cancel_output_i (void) |
Cancel handle_output() callbacks. | |
void | cleanup_queue (size_t byte_count) |
Cleanup the queue. | |
void | cleanup_queue_i () |
Cleanup the complete queue. | |
int | check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush) |
Check if the buffering constraints have been reached. | |
int | send_synchronous_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
int | send_reply_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
int | send_asynchronous_message_i (TAO_Stub *stub, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
int | send_synch_message_helper_i (TAO_Synch_Queued_Message &s, ACE_Time_Value *max_wait_time) |
int | flush_timer_pending (void) const |
Check if the flush timer is still pending. | |
void | reset_flush_timer (void) |
void | report_invalid_event_handler (const char *caller) |
Print out error messages if the event handler is not valid. | |
int | handle_input_missing_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time, TAO_Queued_Data *q_data) |
int | handle_input_parse_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time) |
int | handle_input_parse_extra_messages (ACE_Message_Block &message_block) |
int | consolidate_enqueue_message (TAO_Queued_Data *qd) |
int | consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) |
int | process_queue_head (TAO_Resume_Handle &rh) |
int | notify_reactor (void) |
void | send_connection_closed_notifications_i (void) |
Assume the lock is held. | |
void | allocate_partial_message_block (void) |
bool | using_blocking_io_for_synch_messages () const |
bool | using_blocking_io_for_asynch_messages () const |
Generic definitions for the Transport class.
The transport object is created in the Service handler constructor and deleted in the Service Handler's destructor!!
The main responsability of a Transport object is to encapsulate a connection, and provide a transport independent way to send and receive data. Since TAO is heavily based on the Reactor for all if not all its I/O the Transport class is usually implemented with a helper Connection Handler that adapts the generic Transport interface to the Reactor types.
One of the responsibilities of the TAO_Transport class is to send out GIOP messages as efficiently as possible. In most cases messages are put out in FIFO order, the transport object will put out the message using a single system call and return control to the application. However, for oneways and AMI requests it may be more efficient (or required if the SYNC_NONE policy is in effect) to queue the messages until a large enough data set is available. Another reason to queue is that some applications cannot block for I/O, yet they want to send messages so large that a single write() operation would not be able to cope with them. In such cases we need to queue the data and use the Reactor to drain the queue.
Therefore, the Transport class may need to use a queue to temporarily hold the messages, and, in some configurations, it may need to use the Reactor to concurrently drain such queues.
TAO provides explicit policies to send 'urgent' messages. Such messages may put at the head of the queue. However, they cannot be sent immediately because the transport may already be sending another message in a reactive fashion.
Consequently, the Transport must also know if the head of the queue has been partially sent. In that case new messages can only follow the head. Only once the head is completely sent we can start sending new messages.
One or more threads can be blocked waiting for the connection to completely send the message. The thread should return as soon as its message has been sent, so a per-thread condition is required. This suggest that simply using a ACE_Message_Queue would not be enough: there is a significant amount of ancillary information, to keep on each message that the Message_Block class does not provide room for.
Blocking I/O is still attractive for some applications. First, my eliminating the Reactor overhead performance is improved when sending large blocks of data. Second, using the Reactor to send out data opens the door for nested upcalls, yet some applications cannot deal with the reentrancy issues in this case.
Some or all messages could have a timeout period attached to them. The timeout source could either be some high-level policy or maybe some strategy to prevent denial of service attacks. In any case the timeouts are per-message, and later messages could have shorter timeouts. In fact, some kind of scheduling (such as EDF) could be required in a few applications.
The outgoing data path consist in several components:
The Transport object provides a single method to send request messages (send_request_message ()).
One of the main responsibilities of the transport is to read and process the incoming GIOP message as quickly and efficiently as possible. There are other forces that needs to be given due consideration. They are
The messages should be checked for validity and the right information should be sent to the higher layer for processing. The process of doing a sanity check and preparing the messages for the higher layers of the ORB are done by the messaging protocol.
To keep things as efficient as possible for medium sized requests, it would be good to minimise data copying and locking along the incoming path ie. from the time of reading the data from the handle to the application. We achieve this by creating a buffer on stack and reading the data from the handle into the buffer. We then pass the same data block (the buffer is encapsulated into a data block) to the higher layers of the ORB. The problems stem from the following (a) Data is bigger than the buffer that we have on stack (b) Transports like TCP do not guarantee availability of the whole chunk of data in one shot. Data could trickle in byte by byte. (c) Single read gives multiple messages
We solve the problems as follows
(a) First do a read with the buffer on stack. Query the underlying messaging object whether the message has any incomplete portion. If so, data will be copied into new buffer being able to hold full message and is queued; succeeding events will read data from socket and write directly into this buffer. Otherwise, if if the message in local buffer is complete, we free the handle and then send the message to the higher layers of the ORB for processing.
(b) If buffer with incomplete message has been enqueued, while trying to do the above, the reactor will call us back when the handle becomes read ready. The read-operation will copy data directly into the enqueued buffer. If the message has bee read completely the message is sent to the higher layers of the ORB for processing.
(c) If we get multiple messages (possible if the client connected to the server sends oneways or AMI requests), we parse and split the messages. Every message is put in the queue. Once the messages are queued, the thread picks up one message to send to the higher layers of the ORB. Before doing that, if it finds more messages, it sends a notify to the reactor without resuming the handle. The next thread picks up a message from the queue and processes that. Once the queue is drained the last thread resumes the handle.
We could use the outgoing path of the ORB to send replies. This would allow us to reuse most of the code in the outgoing data path. We were doing this till TAO-1.2.3. We run in to problems. When writing the reply the ORB gets flow controlled, and the ORB tries to flush the message by going into the reactor. This resulted in unnecessary nesting. The thread that gets into the Reactor could potentially handle other messages (incoming or outgoing) and the stack starts growing leading to crashes.
The solution that we (plan to) adopt is pretty straight forward. The thread sending replies will not block to send the replies but queue the replies and return to the Reactor. (Note the careful usages of the terms "blocking in the Reactor" as opposed to "return back to the Reactor".
See Also:
Definition at line 321 of file Transport.h.
Definition at line 367 of file Transport.h.
{ DR_ERROR = -1, DR_OK = 0, DR_QUEUE_EMPTY = 1, // used internally, not returned from drain_queue() DR_WOULDBLOCK = 2 };
TAO_Transport::TAO_Transport | ( | CORBA::ULong | tag, | |
TAO_ORB_Core * | orb_core, | |||
size_t | input_cdr_size = ACE_CDR::DEFAULT_BUFSIZE | |||
) |
Default creator, requires the tag value be supplied.
Definition at line 129 of file Transport.cpp.
: tag_ (tag) , orb_core_ (orb_core) , cache_map_entry_ (0) , tms_ (0) , ws_ (0) , bidirectional_flag_ (-1) , opening_connection_role_ (TAO::TAO_UNSPECIFIED_ROLE) , head_ (0) , tail_ (0) , incoming_message_queue_ (orb_core) , current_deadline_ (ACE_Time_Value::zero) , flush_timer_id_ (-1) , transport_timer_ (this) , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ()) , id_ ((size_t) this) , purging_order_ (0) , recv_buffer_size_ (0) , sent_byte_count_ (0) , is_connected_ (false) , messaging_object_ (0) , char_translator_ (0) , wchar_translator_ (0) , tcs_set_ (0) , first_request_ (true) , partial_message_ (0) #if TAO_HAS_SENDFILE == 1 // The ORB has been configured to use the MMAP allocator, meaning // we could/should use sendfile() to send data. Cast once rather // here rather than during each send. This assumes that all // TAO_OutputCDR instances are using the same TAO_MMAP_Allocator // instance as the underlying output CDR buffer allocator. , mmap_allocator_ ( dynamic_cast<TAO_MMAP_Allocator *> ( orb_core->output_cdr_buffer_allocator ())) #endif /* TAO_HAS_SENDFILE==1 */ #if TAO_HAS_TRANSPORT_CURRENT == 1 , stats_ (0) #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ , flush_in_post_open_ (false) { ACE_NEW (this->messaging_object_, TAO_GIOP_Message_Base (orb_core, this, input_cdr_size)); TAO_Client_Strategy_Factory *cf = this->orb_core_->client_factory (); // Create WS now. this->ws_ = cf->create_wait_strategy (this); // Create TMS now. this->tms_ = cf->create_transport_mux_strategy (this); #if TAO_HAS_TRANSPORT_CURRENT == 1 // Allocate stats ACE_NEW_THROW_EX (this->stats_, TAO::Transport::Stats, CORBA::NO_MEMORY ()); #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ /* * Hook to add code that initializes components that * belong to the concrete protocol implementation. * Further additions to this Transport class will * need to add code *before* this hook. */ //@@ TAO_TRANSPORT_SPL_CONSTRUCTOR_ADD_HOOK }
TAO_Transport::~TAO_Transport | ( | void | ) | [virtual] |
Destructor.
Definition at line 202 of file Transport.cpp.
{ delete this->messaging_object_; delete this->ws_; delete this->tms_; delete this->handler_lock_; if (!this->is_connected_) { // When we have a not connected transport we could have buffered // messages on this transport which we have to cleanup now. this->cleanup_queue_i(); } // Release the partial message block, however we may // have never allocated one. ACE_Message_Block::release (this->partial_message_); // By the time the destructor is reached here all the connection stuff // *must* have been cleaned up. // The following assert is needed for the test "Bug_2494_Regression". // See the bugzilla bug #2494 for details. ACE_ASSERT (this->queue_is_empty_i ()); ACE_ASSERT (this->cache_map_entry_ == 0); #if TAO_HAS_TRANSPORT_CURRENT == 1 delete this->stats_; #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ /* * Hook to add code that cleans up components * belong to the concrete protocol implementation. * Further additions to this Transport class will * need to add code *before* this hook. */ //@@ TAO_TRANSPORT_SPL_DESTRUCTOR_ADD_HOOK }
ACE_Event_Handler::Reference_Count TAO_Transport::add_reference | ( | void | ) |
Memory management routines.
Definition at line 2646 of file Transport.cpp.
{ return this->event_handler_i ()->add_reference (); }
void TAO_Transport::allocate_partial_message_block | ( | void | ) | [private] |
Allocate a partial message block and store it in our partial_message_ data member.
Definition at line 2769 of file Transport.cpp.
{ if (this->partial_message_ == 0) { // This value must be at least large enough to hold a GIOP message // header plus a GIOP fragment header size_t const partial_message_size = this->messaging_object ()->header_length (); // + this->messaging_object ()->fragment_header_length (); // deprecated, conflicts with not-single_read_opt. ACE_NEW (this->partial_message_, ACE_Message_Block (partial_message_size)); } }
void TAO_Transport::assign_translators | ( | TAO_InputCDR * | inp, | |
TAO_OutputCDR * | outp | |||
) |
Use the Transport's codeset factories to set the translator for input and output CDRs.
Definition at line 2616 of file Transport.cpp.
{ if (this->char_translator_) { this->char_translator_->assign (inp); this->char_translator_->assign (outp); } if (this->wchar_translator_) { this->wchar_translator_->assign (inp); this->wchar_translator_->assign (outp); } }
int TAO_Transport::bidirectional_flag | ( | void | ) | const |
Get the bidirectional flag.
Definition at line 39 of file Transport.inl.
{ return this->bidirectional_flag_; }
void TAO_Transport::bidirectional_flag | ( | int | flag | ) |
Set the bidirectional flag.
Definition at line 45 of file Transport.inl.
{ this->bidirectional_flag_ = flag; }
void TAO_Transport::cache_map_entry | ( | TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * | entry | ) |
Set the Cache Map entry.
Definition at line 69 of file Transport.inl.
{ // Sync with TAO_Transport::purge_entry() ACE_GUARD (ACE_Lock, ace_mon, *this->handler_lock_); this->cache_map_entry_ = entry; }
TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * TAO_Transport::cache_map_entry | ( | void | ) |
Get the Cache Map entry.
Definition at line 63 of file Transport.inl.
{ return this->cache_map_entry_; }
bool TAO_Transport::can_be_purged | ( | void | ) |
Can the transport be purged?
Definition at line 495 of file Transport.cpp.
{ return !this->tms_->has_request (); }
int TAO_Transport::cancel_output_i | ( | void | ) | [private] |
Cancel handle_output() callbacks.
Definition at line 858 of file Transport.cpp.
{ ACE_Event_Handler * const eh = this->event_handler_i (); ACE_Reactor *const reactor = eh->reactor (); if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"), this->id ())); } return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK); }
void TAO_Transport::char_translator | ( | TAO_Codeset_Translator_Base * | tf | ) |
CodeSet negotiation - Set the char codeset translator factory.
Definition at line 151 of file Transport.inl.
{ this->char_translator_ = tf; this->tcs_set_ = 1; }
TAO_Codeset_Translator_Base * TAO_Transport::char_translator | ( | void | ) | const |
CodeSet Negotiation - Get the char codeset translator factory.
Definition at line 139 of file Transport.inl.
{ return this->char_translator_; }
int TAO_Transport::check_buffering_constraints_i | ( | TAO_Stub * | stub, | |
bool & | must_flush | |||
) | [private] |
Check if the buffering constraints have been reached.
Definition at line 1223 of file Transport.cpp.
{ // First let's compute the size of the queue: size_t msg_count = 0; size_t total_bytes = 0; for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) { ++msg_count; total_bytes += i->message_length (); } bool set_timer = false; ACE_Time_Value new_deadline; TAO::Transport_Queueing_Strategy *queue_strategy = stub->transport_queueing_strategy (); bool constraints_reached = true; if (queue_strategy) { constraints_reached = queue_strategy->buffering_constraints_reached (stub, msg_count, total_bytes, must_flush, this->current_deadline_, set_timer, new_deadline); } else { must_flush = false; } // ... set the new timer, also cancel any previous timers ... if (set_timer) { ACE_Event_Handler *eh = this->event_handler_i (); ACE_Reactor * const reactor = eh->reactor (); this->current_deadline_ = new_deadline; ACE_Time_Value delay = new_deadline - ACE_OS::gettimeofday (); if (this->flush_timer_pending ()) { reactor->cancel_timer (this->flush_timer_id_); } this->flush_timer_id_ = reactor->schedule_timer (&this->transport_timer_, &this->current_deadline_, delay); } return constraints_reached; }
void TAO_Transport::cleanup_queue | ( | size_t | byte_count | ) | [private] |
Cleanup the queue.
Exactly byte_count bytes have been sent, the queue must be cleaned up as potentially several messages have been completely sent out. It leaves on head_ the next message to send out.
Definition at line 1176 of file Transport.cpp.
{ while (!this->queue_is_empty_i () && byte_count > 0) { TAO_Queued_Message *i = this->head_; if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ") ACE_TEXT ("byte_count = %d\n"), this->id (), byte_count)); } // Update the state of the first message i->bytes_transferred (byte_count); if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ") ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"), this->id (), byte_count, i->all_data_sent (), i->message_length ())); } // ... if all the data was sent the message must be removed from // the queue... if (i->all_data_sent ()) { i->remove_from_list (this->head_, this->tail_); i->destroy (); } else if (byte_count == 0) { // If we have sent out a full message block, but we are not // finished with this message, we need to do something with the // message block chain held by our output stream. If we don't, // another thread can attempt to service this transport and end // up resetting the output stream which will release the // message that we haven't finished sending. i->copy_if_necessary (this->out_stream ().begin ()); } } }
void TAO_Transport::cleanup_queue_i | ( | ) | [private] |
Cleanup the complete queue.
Definition at line 1133 of file Transport.cpp.
{ if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ") ACE_TEXT ("cleaning up complete queue\n"), this->id ())); } size_t byte_count = 0; int msg_count = 0; // Cleanup all messages while (!this->queue_is_empty_i ()) { TAO_Queued_Message *i = this->head_; if (TAO_debug_level > 4) { byte_count += i->message_length(); ++msg_count; } // @@ This is a good point to insert a flag to indicate that a // CloseConnection message was successfully received. i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED, this->orb_core_->leader_follower ()); i->remove_from_list (this->head_, this->tail_); i->destroy (); } if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ") ACE_TEXT ("discarded %d messages, %u bytes.\n"), this->id (), msg_count, byte_count)); } }
void TAO_Transport::clear_translators | ( | TAO_InputCDR * | inp, | |
TAO_OutputCDR * | outp | |||
) |
It is necessary to clear the codeset translator when a CDR stream is used for more than one GIOP message. This is required since the header must not be translated, whereas the body must be.
Definition at line 2631 of file Transport.cpp.
{ if (inp) { inp->char_translator (0); inp->wchar_translator (0); } if (outp) { outp->char_translator (0); outp->wchar_translator (0); } }
void TAO_Transport::close_connection | ( | void | ) | [virtual] |
Call the implementation method after obtaining the lock.
Definition at line 359 of file Transport.cpp.
{ this->connection_handler_i ()->close_connection (); }
TAO_Connection_Handler * TAO_Transport::connection_handler | ( | void | ) |
Get the connection handler for this transport.
Definition at line 191 of file Transport.inl.
{ return this->connection_handler_i(); }
virtual TAO_Connection_Handler* TAO_Transport::connection_handler_i | ( | void | ) | [protected, pure virtual] |
These classes need privileged access to:
Implemented in TAO_IIOP_Transport, TAO_DIOP_Transport, and TAO_UIOP_Transport.
int TAO_Transport::consolidate_enqueue_message | ( | TAO_Queued_Data * | qd | ) | [private] |
Definition at line 1790 of file Transport.cpp.
{ // consolidate message on top of stack, only for fragmented messages // paranoid check if (q_data->missing_data () != 0) { return -1; } if (q_data->more_fragments () || q_data->msg_type () == GIOP::Fragment) { TAO_Queued_Data *new_q_data = 0; switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data)) { case -1: // error return -1; case 0: // returning consolidated message in new_q_data if (!new_q_data) { if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ") ACE_TEXT ("error, consolidated message is NULL\n"), this->id ())); } return -1; } if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0) { TAO_Queued_Data::release (new_q_data); return -1; } break; case 1: // fragment has been stored in messaging_oject() break; } } else { if (this->incoming_message_queue_.enqueue_tail (q_data) != 0) { TAO_Queued_Data::release (q_data); return -1; } } return 0; // success }
int TAO_Transport::consolidate_process_message | ( | TAO_Queued_Data * | qd, | |
TAO_Resume_Handle & | rh | |||
) | [private] |
Definition at line 1703 of file Transport.cpp.
{ // paranoid check if (q_data->missing_data () != 0) { if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") ACE_TEXT ("missing data\n"), this->id ())); } return -1; } if (q_data->more_fragments () || q_data->msg_type () == GIOP::Fragment) { // consolidate message on top of stack, only for fragmented messages TAO_Queued_Data *new_q_data = 0; switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data)) { case -1: // error return -1; case 0: // returning consolidated message in q_data if (!new_q_data) { if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") ACE_TEXT ("error, consolidated message is NULL\n"), this->id ())); } return -1; } if (this->process_parsed_messages (new_q_data, rh) == -1) { TAO_Queued_Data::release (new_q_data); if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") ACE_TEXT ("error processing consolidated message\n"), this->id ())); } return -1; } TAO_Queued_Data::release (new_q_data); break; case 1: // fragment has been stored in messaging_oject() break; } } else { if (this->process_parsed_messages (q_data, rh) == -1) { TAO_Queued_Data::release (q_data); if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") ACE_TEXT ("error processing message\n"), this->id ())); } return -1; } TAO_Queued_Data::release (q_data); } return 0; }
TAO_Transport::Drain_Result TAO_Transport::drain_queue | ( | TAO::Transport::Drain_Constraints const & | dc | ) | [private] |
Send some of the data in the queue.
As the outgoing data is drained this method is invoked to send as much of the current message as possible.
Definition at line 915 of file Transport.cpp.
{ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, DR_ERROR); Drain_Result const retval = this->drain_queue_i (dc); if (retval == DR_QUEUE_EMPTY) { // ... there is no current message or it was completely // sent, cancel output... TAO_Flushing_Strategy *flushing_strategy = this->orb_core ()->flushing_strategy (); flushing_strategy->cancel_output (this); return DR_OK; } return retval; }
TAO_Transport::Drain_Result TAO_Transport::drain_queue_helper | ( | int & | iovcnt, | |
iovec | iov[], | |||
TAO::Transport::Drain_Constraints const & | dc | |||
) | [private] |
A helper routine used in drain_queue_i().
Definition at line 936 of file Transport.cpp.
{ // As a side-effect, this decrements the timeout() pointed-to value by // the time used in this function. That might be important as there are // potentially long running system calls invoked from here. ACE_Countdown_Time countdown(dc.timeout()); size_t byte_count = 0; // ... send the message ... ssize_t retval = -1; #if TAO_HAS_SENDFILE == 1 if (this->mmap_allocator_) retval = this->sendfile (this->mmap_allocator_, iov, iovcnt, byte_count, dc); else #endif /* TAO_HAS_SENDFILE==1 */ retval = this->send (iov, iovcnt, byte_count, this->io_timeout (dc)); if (TAO_debug_level == 5) { dump_iov (iov, iovcnt, this->id (), byte_count, ACE_TEXT("drain_queue_helper")); } if (retval == 0) { if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") ACE_TEXT ("send() returns 0\n"), this->id ())); } return DR_ERROR; } else if (retval == -1) { if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") ACE_TEXT ("error during send() (errno: %d) - %m\n"), this->id (), ACE_ERRNO_GET)); } if (errno == EWOULDBLOCK || errno == EAGAIN) { return DR_WOULDBLOCK; } return DR_ERROR; } // ... now we need to update the queue, removing elements // that have been sent, and updating the last element if it // was only partially sent ... this->cleanup_queue (byte_count); iovcnt = 0; // ... start over, how do we guarantee progress? Because if // no bytes are sent send() can only return 0 or -1 // Total no. of bytes sent for a send call this->sent_byte_count_ += byte_count; if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"), this->id(), byte_count, this->queue_is_empty_i ())); } return DR_QUEUE_EMPTY; // drain_queue_i will check if the queue is actually empty }
TAO_Transport::Drain_Result TAO_Transport::drain_queue_i | ( | TAO::Transport::Drain_Constraints const & | dc | ) | [private] |
Implement drain_queue() assuming the lock is held.
Definition at line 1021 of file Transport.cpp.
{ // This is the vector used to send data, it must be declared outside // the loop because after the loop there may still be data to be // sent int iovcnt = 0; #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) iovec iov[ACE_IOV_MAX] = { { 0 , 0 } }; #else iovec iov[ACE_IOV_MAX]; #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ // We loop over all the elements in the queue ... TAO_Queued_Message *i = this->head_; // Reset the value so that the counting is done for each new send // call. this->sent_byte_count_ = 0; // Avoid calling this expensive function each time through the loop. Instead // we'll assume that the time is unlikely to change much during the loop. // If we are forced to send in the loop then we'll recompute the time. ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr (); while (i != 0) { if (i->is_expired (now)) { if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ") ACE_TEXT ("Discarding expired queued message.\n"), this->id ())); } TAO_Queued_Message *next = i->next (); i->state_changed (TAO_LF_Event::LFS_TIMEOUT, this->orb_core_->leader_follower ()); i->remove_from_list (this->head_, this->tail_); i->destroy (); i = next; continue; } // ... each element fills the iovector ... i->fill_iov (ACE_IOV_MAX, iovcnt, iov); // ... the vector is full, no choice but to send some data out. // We need to loop because a single message can span multiple // IOV_MAX elements ... if (iovcnt == ACE_IOV_MAX) { Drain_Result const retval = this->drain_queue_helper (iovcnt, iov, dc); if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ") ACE_TEXT ("helper retval = %d\n"), this->id (), static_cast<int> (retval.dre_))); } if (retval != DR_QUEUE_EMPTY) { return retval; } now = ACE_High_Res_Timer::gettimeofday_hr (); i = this->head_; continue; } // ... notice that this line is only reached if there is still // room in the iovector ... i = i->next (); } if (iovcnt != 0) { Drain_Result const retval = this->drain_queue_helper (iovcnt, iov, dc); if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ") ACE_TEXT ("helper retval = %d\n"), this->id (), static_cast<int> (retval.dre_))); } if (retval != DR_QUEUE_EMPTY) { return retval; } } if (this->queue_is_empty_i ()) { if (this->flush_timer_pending ()) { ACE_Event_Handler *eh = this->event_handler_i (); ACE_Reactor * const reactor = eh->reactor (); reactor->cancel_timer (this->flush_timer_id_); this->reset_flush_timer (); } return DR_QUEUE_EMPTY; } return DR_OK; }
virtual ACE_Event_Handler* TAO_Transport::event_handler_i | ( | void | ) | [pure virtual] |
Return the event handler used to receive notifications from the Reactor. Normally a concrete TAO_Transport object has-a ACE_Event_Handler member that functions as an adapter between the ACE_Reactor framework and the TAO pluggable protocol framework. In all the protocols implemented so far this role is fullfilled by an instance of ACE_Svc_Handler.
Implemented in TAO_IIOP_Transport, TAO_DIOP_Transport, and TAO_UIOP_Transport.
bool TAO_Transport::first_request | ( | void | ) | const |
Get the first request flag.
Definition at line 178 of file Transport.inl.
{ return this->first_request_; }
void TAO_Transport::first_request_sent | ( | bool | flag = false |
) |
Set the state of the first_request_ to flag.
Definition at line 172 of file Transport.inl.
{ this->first_request_ = flag; }
int TAO_Transport::flush_timer_pending | ( | void | ) | const [private] |
Check if the flush timer is still pending.
Definition at line 116 of file Transport.inl.
{ return this->flush_timer_id_ != -1; }
int TAO_Transport::format_queue_message | ( | TAO_OutputCDR & | stream, | |
ACE_Time_Value * | max_wait_time, | |||
TAO_Stub * | stub | |||
) |
Format and queue a message for stream
max_wait_time | The maximum time that the operation can block, used in the implementation of timeouts. |
Definition at line 557 of file Transport.cpp.
{ if (this->messaging_object ()->format_message (stream, stub) != 0) return -1; return this->queue_message_i (stream.begin (), max_wait_time); }
int TAO_Transport::generate_locate_request | ( | TAO_Target_Specification & | spec, | |
TAO_Operation_Details & | opdetails, | |||
TAO_OutputCDR & | output | |||
) |
This is a request for the transport object to write a LocateRequest header before it is sent out.
Definition at line 419 of file Transport.cpp.
{ if (this->messaging_object ()->generate_locate_request_header (opdetails, spec, output) == -1) { if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ") ACE_TEXT ("error while marshalling the LocateRequest header\n"), this->id ())); } return -1; } return 0; }
int TAO_Transport::generate_request_header | ( | TAO_Operation_Details & | opd, | |
TAO_Target_Specification & | spec, | |||
TAO_OutputCDR & | msg | |||
) | [virtual] |
This is a request for the transport object to write a request header before it sends out the request
Definition at line 443 of file Transport.cpp.
{ if (this->messaging_object ()->generate_request_header (opdetails, spec, output) == -1) { if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_request_header, ") ACE_TEXT ("error while marshalling the Request header\n"), this->id())); } return -1; } return 0; }
int TAO_Transport::handle_input | ( | TAO_Resume_Handle & | rh, | |
ACE_Time_Value * | max_wait_time = 0 | |||
) | [virtual] |
Callback to read incoming data.
The ACE_Event_Handler adapter invokes this method as part of its handle_input() operation.
Once a complete message is read the Transport class delegates on the Messaging layer to invoke the right upcall (on the server) or the TAO_Reply_Dispatcher (on the client side).
max_wait_time | In some cases the I/O is synchronous, e.g. a thread-per-connection server or when Wait_On_Read is enabled. In those cases a maximum read time can be specified. |
Reimplemented in TAO_DIOP_Transport.
Definition at line 1629 of file Transport.cpp.
{ if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"), this->id ())); } // First try to process messages of the head of the incoming queue. int const retval = this->process_queue_head (rh); if (retval <= 0) { if (retval == -1) { if (TAO_debug_level > 2) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") ACE_TEXT ("error while parsing the head of the queue\n"), this->id())); } return -1; } else { // retval == 0 // Processed a message in queue successfully. This // thread must return to thread-pool now. return 0; } } TAO_Queued_Data *q_data = 0; if (this->incoming_message_stack_.top (q_data) != -1 && q_data->missing_data () != TAO_MISSING_DATA_UNDEFINED) { /* PRE: q_data->missing_data_ > 0 as all QD on stack must be incomplete */ if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1) { if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") ACE_TEXT ("error consolidating incoming message\n"), this->id ())); } return -1; } } else { if (this->handle_input_parse_data (rh, max_wait_time) == -1) { if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") ACE_TEXT ("error parsing incoming message\n"), this->id ())); } return -1; } } return 0; }
int TAO_Transport::handle_input_missing_data | ( | TAO_Resume_Handle & | rh, | |
ACE_Time_Value * | max_wait_time, | |||
TAO_Queued_Data * | q_data | |||
) | [private] |
Is invoked by handle_input operation. It consolidate message on top of incoming_message_stack. The amount of missing data is known and recv operation copies data directly into message buffer, as much as a single recv-invocation provides.
Definition at line 1847 of file Transport.cpp.
{ // paranoid check if (q_data == 0) { return -1; } if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ") ACE_TEXT ("enter (missing data == %d)\n"), this->id (), q_data->missing_data ())); } size_t const recv_size = q_data->missing_data (); if (q_data->msg_block ()->space() < recv_size) { // make sure the message_block has enough space size_t const message_size = recv_size + q_data->msg_block ()->length(); if (ACE_CDR::grow (q_data->msg_block (), message_size) == -1) { return -1; } } // Saving the size of the received buffer in case any one needs to // get the size of the message thats received in the // context. Obviously the value will be changed for each recv call // and the user is supposed to invoke the accessor only in the // invocation context to get meaningful information. this->recv_buffer_size_ = recv_size; // Read the message into the existing message block on heap ssize_t const n = this->recv (q_data->msg_block ()->wr_ptr(), recv_size, max_wait_time); if (n <= 0) { return n; } if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ") ACE_TEXT ("read bytes %d\n"), this->id (), n)); } q_data->msg_block ()->wr_ptr(n); q_data->missing_data (q_data->missing_data () - n); if (q_data->missing_data () == 0) { // paranoid check if (this->incoming_message_stack_.pop (q_data) == -1) { return -1; } if (this->consolidate_process_message (q_data, rh) == -1) { return -1; } } return 0; }
int TAO_Transport::handle_input_parse_data | ( | TAO_Resume_Handle & | rh, | |
ACE_Time_Value * | max_wait_time | |||
) | [private] |
Is invoked by handle_input operation. It parses new messages from input stream or consolidates messages whose header has been partially read, the message size being unknown so far. It parses as much data as a single recv-invocation provides.
Definition at line 1968 of file Transport.cpp.
{ if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") ACE_TEXT ("enter\n"), this->id ())); } // The buffer on the stack which will be used to hold the input // messages, ACE_CDR::MAX_ALIGNMENT compensates the // memory-alignment. This improves performance with SUN-Java-ORB-1.4 // and higher that sends fragmented requests of size 1024 bytes. char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT]; #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) (void) ACE_OS::memset (buf, '\0', sizeof buf); #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ // Create a data block ACE_Data_Block db (sizeof (buf), ACE_Message_Block::MB_DATA, buf, this->orb_core_->input_cdr_buffer_allocator (), this->orb_core_->locking_strategy (), ACE_Message_Block::DONT_DELETE, this->orb_core_->input_cdr_dblock_allocator ()); // Create a message block ACE_Message_Block message_block (&db, ACE_Message_Block::DONT_DELETE, this->orb_core_->input_cdr_msgblock_allocator ()); // Align the message block ACE_CDR::mb_align (&message_block); size_t recv_size = 0; // Note: unsigned integer // Pointer to newly parsed message TAO_Queued_Data *q_data = 0; // Optimizing access of constants size_t const header_length = this->messaging_object ()->header_length (); // Paranoid check if (header_length > message_block.space ()) { return -1; } if (this->orb_core_->orb_params ()->single_read_optimization ()) { recv_size = message_block.space (); } else { // Single read optimization has been de-activated. That means // that we need to read from transport the GIOP header first // before the payload. This codes first checks the incoming // stack for partial messages which needs to be // consolidated. Otherwise we are in new cycle, reading complete // GIOP header of new incoming message. if (this->incoming_message_stack_.top (q_data) != -1 && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED) { // There is a partial message on incoming_message_stack_ // whose length is unknown so far. We need to consolidate // the GIOP header to get to know the payload size, recv_size = header_length - q_data->msg_block ()->length (); } else { // Read amount of data forming GIOP header of new incoming // message. recv_size = header_length; } // POST: 0 <= recv_size <= header_length } // POST: 0 <= recv_size <= message_block->space () // If we have a partial message, copy it into our message block and // clear out the partial message. if (this->partial_message_ != 0 && this->partial_message_->length () > 0) { // (*) Copy back the partial message into current read-buffer, // verify that the read-strategy of "recv_size" bytes is not // exceeded. The latter check guarantees that recv_size does not // roll-over and keeps in range // 0<=recv_size<=message_block->space() if (this->partial_message_->length () <= recv_size && message_block.copy (this->partial_message_->rd_ptr (), this->partial_message_->length ()) == 0) { recv_size -= this->partial_message_->length (); this->partial_message_->reset (); } else { return -1; } } // POST: 0 <= recv_size <= buffer_space if (0 >= recv_size) // paranoid: the check above (*) guarantees recv_size>=0 { // This event would cause endless looping, trying frequently to // read zero bytes from stream. This might happen, if TAOs // protocol implementation is not correct and tries to read data // beyond header without "single_read_optimazation" being // activated. if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") ACE_TEXT ("Error - endless loop detection, closing connection"), this->id ())); } return -1; } // Saving the size of the received buffer in case any one needs to // get the size of the message thats received in the // context. Obviously the value will be changed for each recv call // and the user is supposed to invoke the accessor only in the // invocation context to get meaningful information. this->recv_buffer_size_ = recv_size; // Read the message into the message block that we have created on // the stack. ssize_t const n = this->recv (message_block.wr_ptr (), recv_size, max_wait_time); // If there is an error return to the reactor.. if (n <= 0) { return n; } if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") ACE_TEXT ("read %d bytes\n"), this->id (), n)); } // Set the write pointer in the stack buffer message_block.wr_ptr (n); // // STACK PROCESSING OR MESSAGE CONSOLIDATION // // PRE: data in buffer is aligned && message_block.length() > 0 if (this->incoming_message_stack_.top (q_data) != -1 && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED) { // // MESSAGE CONSOLIDATION // // Partial message on incoming_message_stack_ needs to be // consolidated. The message header could not be parsed so far // and therefor the message size is unknown yet. Consolidating // the message destroys the memory alignment of succeeding // messages sharing the buffer, for that reason consolidation // and stack based processing are mutial exclusive. if (this->messaging_object ()->consolidate_node (q_data, message_block) == -1) { if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") ACE_TEXT ("error consolidating message from input buffer\n"), this->id () )); } return -1; } // Complete message are to be enqueued and later processed if (q_data->missing_data () == 0) { if (this->incoming_message_stack_.pop (q_data) == -1) { return -1; } if (this->consolidate_enqueue_message (q_data) == -1) { return -1; } } if (message_block.length () > 0 && this->handle_input_parse_extra_messages (message_block) == -1) { return -1; } // In any case try to process the enqueued messages if (this->process_queue_head (rh) == -1) { return -1; } } else { // // STACK PROCESSING (critical path) // // Process the first message in buffer on stack // (PRE: first message resides in aligned memory) Make a node of // the message-block.. TAO_Queued_Data qd (&message_block, this->orb_core_->transport_message_buffer_allocator ()); size_t mesg_length = 0; if (this->messaging_object ()->parse_next_message (qd, mesg_length) == -1 || (qd.missing_data () == 0 && mesg_length > message_block.length ()) ) { // extracting message failed return -1; } // POST: qd.missing_data_ == 0 --> mesg_length <= message_block.length() // This prevents seeking rd_ptr behind the wr_ptr if (qd.missing_data () != 0 || qd.more_fragments () || qd.msg_type () == GIOP::Fragment) { if (qd.missing_data () == 0) { // Dealing with a fragment TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (qd); if (nqd == 0) { return -1; } // mark the end of message in new buffer char* end_mark = nqd->msg_block ()->rd_ptr () + mesg_length; nqd->msg_block ()->wr_ptr (end_mark); // move the read pointer forward in old buffer message_block.rd_ptr (mesg_length); // enqueue the message if (this->consolidate_enqueue_message (nqd) == -1) { return -1; } if (message_block.length () > 0 && this->handle_input_parse_extra_messages (message_block) == -1) { return -1; } // In any case try to process the enqueued messages if (this->process_queue_head (rh) == -1) { return -1; } } else if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED) { // Incomplete message, must be the last one in buffer if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED && qd.missing_data () > message_block.space ()) { // Re-Allocate correct size on heap if (ACE_CDR::grow (qd.msg_block (), message_block.length () + qd.missing_data ()) == -1) { return -1; } } TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (qd); if (nqd == 0) { return -1; } // move read-pointer to end of buffer message_block.rd_ptr (message_block.length()); this->incoming_message_stack_.push (nqd); } } else { // // critical path // // We cant process the message on stack right now. First we // have got to parse extra messages from message_block, // putting them into queue. When this is done we can return // to process this message, and notifying other threads to // process the messages in queue. char * end_marker = message_block.rd_ptr () + mesg_length; if (message_block.length () > mesg_length) { // There are more message in data stream to be parsed. // Safe the rd_ptr to restore later. char *rd_ptr_stack_mesg = message_block.rd_ptr (); // Skip parsed message, jump to next message in buffer // PRE: mesg_length <= message_block.length () message_block.rd_ptr (mesg_length); // Extract remaining messages and enqueue them for later // heap processing if (this->handle_input_parse_extra_messages (message_block) == -1) { return -1; } // correct the wr_ptr using the end_marker to point to the // end of the first message else the code after this will // see the full stream with all the messages message_block.wr_ptr (end_marker); // Restore rd_ptr message_block.rd_ptr (rd_ptr_stack_mesg); } // The following if-else has been copied from // process_queue_head(). While process_queue_head() // processes message on heap, here we will process a message // on stack. // Now that we have one message on stack to be processed, // check whether we have one more message in the queue... if (this->incoming_message_queue_.queue_length () > 0) { if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") ACE_TEXT ("notify reactor\n"), this->id ())); } int const retval = this->notify_reactor (); if (retval == 1) { // Let the class know that it doesn't need to resume the // handle.. rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); } else if (retval < 0) return -1; } else { // As there are no further messages in queue just resume // the handle. Set the flag incase someone had reset the flag.. rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); } // PRE: incoming_message_queue is empty if (this->process_parsed_messages (&qd, rh) == -1) { return -1; } // move the rd_ptr tp position of end_marker message_block.rd_ptr (end_marker); } } // Now that all cases have been processed, there might be kept some data // in buffer that needs to be safed for next "handle_input" invocations. if (message_block.length () > 0) { if (this->partial_message_ == 0) { this->allocate_partial_message_block (); } if (this->partial_message_ != 0 && this->partial_message_->copy (message_block.rd_ptr (), message_block.length ()) == 0) { message_block.rd_ptr (message_block.length ()); } else { return -1; } } return 0; }
int TAO_Transport::handle_input_parse_extra_messages | ( | ACE_Message_Block & | message_block | ) | [private] |
Is invoked by handle_input_parse_data. Parses all messages remaining in message_block.
Definition at line 1925 of file Transport.cpp.
{ // store buffer status of last extraction: -1 parse error, 0 // incomplete message header in buffer, 1 complete messages header // parsed int buf_status = 0; TAO_Queued_Data *q_data = 0; // init // parse buffer until all messages have been extracted, consolidate // and enqueue complete messages, if the last message being parsed // has missin data, it is stays on top of incoming_message_stack. while (message_block.length () > 0 && (buf_status = this->messaging_object ()->extract_next_message (message_block, q_data)) != -1 && q_data != 0) // paranoid check { if (q_data->missing_data () == 0) { if (this->consolidate_enqueue_message (q_data) == -1) { return -1; } } else // incomplete message read, probably the last message in buffer { // can not fail this->incoming_message_stack_.push (q_data); } q_data = 0; // reset } // while if (buf_status == -1) { return -1; } return 0; }
TAO_Transport::Drain_Result TAO_Transport::handle_output | ( | TAO::Transport::Drain_Constraints const & | c | ) |
Callback method to reactively drain the outgoing data queue.
Definition at line 525 of file Transport.cpp.
{ if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output") ACE_TEXT (" - block_on_io=%d, timeout=%d.%06d\n"), this->id (), dc.block_on_io(), dc.timeout() ? dc.timeout()->sec() : -1, dc.timeout() ? dc.timeout()->usec() : -1 )); } // The flushing strategy (potentially via the Reactor) wants to send // more data, first check if there is a current message that needs // more sending... Drain_Result const retval = this->drain_queue (dc); if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ") ACE_TEXT ("drain_queue returns %d/%d\n"), this->id (), static_cast<int> (retval.dre_), ACE_ERRNO_GET)); } // Any errors are returned directly to the Reactor return retval; }
int TAO_Transport::handle_timeout | ( | const ACE_Time_Value & | current_time, | |
const void * | act | |||
) |
The timeout callback, invoked when any of the timers related to this transport expire.
current_time | The current time as reported from the Reactor | |
act | The Asynchronous Completion Token. Currently it is interpreted as follows:
|
This is the only legal ACT in the current configuration....
Definition at line 874 of file Transport.cpp.
{ if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_timeout, ") ACE_TEXT ("timer expired\n"), this->id ())); } /// This is the only legal ACT in the current configuration.... if (act != &this->current_deadline_) { return -1; } if (this->flush_timer_pending ()) { // The timer is always a oneshot timer, so mark is as not // pending. this->reset_flush_timer (); TAO_Flushing_Strategy *flushing_strategy = this->orb_core ()->flushing_strategy (); int const result = flushing_strategy->schedule_output (this); if (result == TAO_Flushing_Strategy::MUST_FLUSH) { typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; TAO_REVERSE_LOCK reverse (*this->handler_lock_); ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); if (flushing_strategy->flush_transport (this, 0) == -1) { return -1; } } } return 0; }
size_t TAO_Transport::id | ( | void | ) | const |
Set and Get the identifier for this transport instance.
If not set, this will return an integer representation of the this
pointer for the instance on which it's called.
Definition at line 94 of file Transport.inl.
{ return this->id_; }
void TAO_Transport::id | ( | size_t | id | ) |
Definition at line 100 of file Transport.inl.
{ this->id_ = id; }
bool TAO_Transport::idle_after_reply | ( | void | ) |
Request is sent and the reply is received. Idle the transport now.
Definition at line 273 of file Transport.cpp.
{ return this->tms ()->idle_after_reply (); }
bool TAO_Transport::idle_after_send | ( | void | ) |
Request has been just sent, but the reply is not received. Idle the transport now.
Definition at line 267 of file Transport.cpp.
{ return this->tms ()->idle_after_send (); }
ACE_Time_Value const * TAO_Transport::io_timeout | ( | TAO::Transport::Drain_Constraints const & | dc | ) | const [protected] |
Re-factor computation of I/O timeouts based on operation timeouts. Depending on the wait strategy, we need to timeout I/O operations or not. For example, if we are using a non-blocking strategy, we want to pass 0 to all I/O operations, and rely on the ACE_NONBLOCK settings on the underlying sockets. However, for blocking strategies we want to pass the operation timeouts, to respect the application level policies.
This function was introduced as part of the fixes for bug 3647.
Definition at line 2791 of file Transport.cpp.
{ if (dc.block_on_io()) { return dc.timeout(); } if (this->wait_strategy()->can_process_upcalls()) { return 0; } return dc.timeout(); }
bool TAO_Transport::is_connected | ( | void | ) | const |
Is this transport really connected.
Definition at line 184 of file Transport.inl.
{ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, false); return this->is_connected_; }
CORBA::Boolean TAO_Transport::is_tcs_set | ( | void | ) | const |
Return true if the tcs has been set.
CodeSet negotiation.
Definition at line 166 of file Transport.inl.
{ return tcs_set_; }
int TAO_Transport::make_idle | ( | void | ) |
Cache management.
Definition at line 501 of file Transport.cpp.
{ if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"), this->id ())); } return this->transport_cache_manager ().make_idle (this->cache_map_entry_); }
void TAO_Transport::messaging_init | ( | TAO_GIOP_Message_Version const & | version | ) |
Initialising the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects.
Definition at line 2670 of file Transport.cpp.
{ this->messaging_object ()->init (version.major, version.minor); }
TAO_GIOP_Message_Base * TAO_Transport::messaging_object | ( | void | ) |
Return the messaging object that is used to format the data that needs to be sent.
Definition at line 129 of file Transport.inl.
{ return this->messaging_object_; }
int TAO_Transport::notify_reactor | ( | void | ) | [private] |
These classes need privileged access to:
Definition at line 2573 of file Transport.cpp.
{ if (!this->ws_->is_registered ()) { return 0; } ACE_Event_Handler *eh = this->event_handler_i (); // Get the reactor associated with the event handler ACE_Reactor *reactor = this->orb_core ()->reactor (); if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ") ACE_TEXT ("notify to Reactor\n"), this->id ())); } // Send a notification to the reactor... int const retval = reactor->notify (eh, ACE_Event_Handler::READ_MASK); if (retval < 0 && TAO_debug_level > 2) { // @todo: need to think about what is the action that // we can take when we get here. ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ") ACE_TEXT ("notify to the reactor failed..\n"), this->id ())); } return 1; }
TAO::Connection_Role TAO_Transport::opened_as | ( | void | ) | const |
Methods dealing with the role of the connection, e.g., CLIENT or SERVER. See CORBA 2.6 Specification, Section 15.5.1 for origin of definitions.
Definition at line 51 of file Transport.inl.
{ return this->opening_connection_role_; }
void TAO_Transport::opened_as | ( | TAO::Connection_Role | role | ) |
Definition at line 57 of file Transport.inl.
{ this->opening_connection_role_ = role; }
TAO_ORB_Core * TAO_Transport::orb_core | ( | void | ) | const |
Access the ORB that owns this connection.
Definition at line 20 of file Transport.inl.
{ return this->orb_core_; }
TAO_OutputCDR & TAO_Transport::out_stream | ( | void | ) |
Accessor for the output CDR stream.
Definition at line 2658 of file Transport.cpp.
{ return this->messaging_object ()->out_stream (); }
TAO_SYNCH_MUTEX & TAO_Transport::output_cdr_lock | ( | void | ) |
Accessor for synchronizing Transport OutputCDR access.
Definition at line 2664 of file Transport.cpp.
{ return this->output_cdr_mutex_; }
bool TAO_Transport::post_connect_hook | ( | void | ) | [virtual] |
Hooks that can be overridden in concrete transports.
These hooks are invoked just after connection establishment (or after a connection is fetched from cache). The return value signifies whether the invoker should proceed with post connection establishment activities. Protocols like SSLIOP need this to verify whether connections already established have valid certificates. There are no pre_connect_hooks () since the transport doesn't exist before a connection establishment. :-)
Definition at line 325 of file Transport.cpp.
{ return true; }
bool TAO_Transport::post_open | ( | size_t | id | ) |
Perform all the actions when this transport get opened.
Definition at line 2694 of file Transport.cpp.
{ if (TAO_debug_level > 9) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport::post_open, ") ACE_TEXT ("tport id changed from %d to %d\n"), this->id_, id)); this->id_ = id; // When we have data in our outgoing queue schedule ourselves // for output if (!this->queue_is_empty_i ()) { // If the wait strategy wants us to be registered with the reactor // then we do so. If registeration is required and it succeeds, // #REFCOUNT# becomes two. if (this->wait_strategy ()->register_handler () == 0) { if (this->flush_in_post_open_) { TAO_Flushing_Strategy *flushing_strategy = this->orb_core ()->flushing_strategy (); if (flushing_strategy == 0) throw CORBA::INTERNAL (); this->flush_in_post_open_ = false; (void) flushing_strategy->schedule_output (this); } } else { // Registration failures. // Purge from the connection cache, if we are not in the cache, this // just does nothing. (void) this->purge_entry (); // Close the handler. (void) this->close_connection (); if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_open , ") ACE_TEXT ("could not register the transport ") ACE_TEXT ("in the reactor.\n"), this->id ())); } return false; } } { ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, false); this->is_connected_ = true; } if (TAO_debug_level > 9 && !this->cache_map_entry_) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_open") ACE_TEXT (", cache_map_entry_ is 0\n"), this->id_)); } this->transport_cache_manager ().mark_connected (this->cache_map_entry_, true); // update transport cache to make this entry available this->transport_cache_manager ().set_entry_state ( this->cache_map_entry_, TAO::ENTRY_IDLE_AND_PURGABLE); return true; }
void TAO_Transport::pre_close | ( | void | ) |
do what needs to be done when closing the transport
Definition at line 2676 of file Transport.cpp.
{ // @TODO: something needs to be done with is_connected_. Checking it is // guarded by a mutex, but setting it is not. Until the need for mutexed // protection is required, the transport cache is holding its own copy // of the is_connected_ flag, so that during cache lookups the cache // manager doesn't need to be burdened by the lock in is_connected(). this->is_connected_ = false; this->transport_cache_manager ().mark_connected (this->cache_map_entry_, false); this->purge_entry (); { ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_)); this->cleanup_queue_i (); } }
int TAO_Transport::process_parsed_messages | ( | TAO_Queued_Data * | qd, | |
TAO_Resume_Handle & | rh | |||
) | [protected] |
Process the message by sending it to the higher layers of the ORB.
Definition at line 2387 of file Transport.cpp.
{ if (TAO_debug_level > 7) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") ACE_TEXT ("entering (missing data == %d)\n"), this->id(), qd->missing_data ())); } #if TAO_HAS_TRANSPORT_CURRENT == 1 // Update stats, if any if (this->stats_ != 0) this->stats_->messages_received (qd->msg_block ()->length ()); #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ switch (qd->msg_type ()) { case GIOP::CloseConnection: { if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") ACE_TEXT ("received CloseConnection message - %m\n"), this->id())); } // Return a "-1" so that the next stage can take care of // closing connection and the necessary memory management. return -1; } break; case GIOP::Request: case GIOP::LocateRequest: { // Let us resume the handle before we go ahead to process the // request. This will open up the handle for other threads. rh.resume_handle (); if (this->messaging_object ()->process_request_message (this, qd) == -1) { // Return a "-1" so that the next stage can take care of // closing connection and the necessary memory management. return -1; } } break; case GIOP::Reply: case GIOP::LocateReply: { rh.resume_handle (); TAO_Pluggable_Reply_Params params (this); if (this->messaging_object ()->process_reply_message (params, qd) == -1) { if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") ACE_TEXT ("error in process_reply_message - %m\n"), this->id ())); } return -1; } } break; case GIOP::CancelRequest: { // The associated request might be incomplete residing // fragmented in messaging object. We must make sure the // resources allocated by fragments are released. if (this->messaging_object ()->discard_fragmented_message (qd) == -1) { if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") ACE_TEXT ("error processing CancelRequest\n"), this->id ())); } } // We are not able to cancel requests being processed already; // this is declared as optional feature by CORBA, and TAO does // not support this currently. // Just continue processing, CancelRequest does not mean to cut // off the connection. } break; case GIOP::MessageError: { if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") ACE_TEXT ("received MessageError, closing connection\n"), this->id ())); } return -1; } break; case GIOP::Fragment: { // Nothing to be done. } break; } // If not, just return back.. return 0; }
int TAO_Transport::process_queue_head | ( | TAO_Resume_Handle & | rh | ) | [private] |
These classes need privileged access to:
Definition at line 2506 of file Transport.cpp.
{ if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"), this->id (), this->incoming_message_queue_.queue_length () )); } // See if message in queue ... if (this->incoming_message_queue_.queue_length () > 0) { // Get the message on the head of the queue.. TAO_Queued_Data *qd = this->incoming_message_queue_.dequeue_head (); if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ") ACE_TEXT ("the size of the queue is [%d]\n"), this->id (), this->incoming_message_queue_.queue_length())); } // Now that we have pulled out out one message out of the queue, // check whether we have one more message in the queue... if (this->incoming_message_queue_.queue_length () > 0) { if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ") ACE_TEXT ("notify reactor\n"), this->id ())); } int const retval = this->notify_reactor (); if (retval == 1) { // Let the class know that it doesn't need to resume the // handle.. rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); } else if (retval < 0) return -1; } else { // As we are ready to process the last message just resume // the handle. Set the flag incase someone had reset the flag.. rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); } // Process the message... int const retval = this->process_parsed_messages (qd, rh); // Delete the Queued_Data.. TAO_Queued_Data::release (qd); return retval; } return 1; }
bool TAO_Transport::provide_blockable_handler | ( | TAO::Connection_Handler_Set & | handlers | ) |
Add event handlers corresponding to transports that have RW wait strategy to the handlers set. Called by the cache when the ORB is shuting down.
handlers | The TAO_Connection_Handler_Set into which the transport should place its handler if the transport has RW strategy on. |
Definition at line 253 of file Transport.cpp.
{ if (this->ws_->non_blocking () || this->opening_connection_role_ == TAO::TAO_SERVER_ROLE) return false; (void) this->add_reference (); h.insert (this->connection_handler_i ()); return true; }
void TAO_Transport::provide_handler | ( | TAO::Connection_Handler_Set & | handlers | ) |
Added event handler to the handlers set.
Called by the cache when the cache is closing.
handlers | The TAO_Connection_Handler_Set into which the transport should place its handler |
Definition at line 245 of file Transport.cpp.
{ (void) this->add_reference (); handlers.insert (this->connection_handler_i ()); }
int TAO_Transport::purge_entry | ( | void | ) |
Cache management.
Definition at line 479 of file Transport.cpp.
{ // We must store our entry in a temporary and zero out the data member. // If there is only one reference count on us, we will end up causing // our own destruction. And we can not be holding a cache map entry if // that happens. TAO::Transport_Cache_Manager::HASH_MAP_ENTRY* entry = 0; { ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); entry = this->cache_map_entry_; this->cache_map_entry_ = 0; } return this->transport_cache_manager ().purge_entry (entry); }
unsigned long TAO_Transport::purging_order | ( | void | ) | const |
Get and Set the purging order. The purging strategy uses the set version to set the purging order.
Definition at line 78 of file Transport.inl.
{ return this->purging_order_; }
void TAO_Transport::purging_order | ( | unsigned long | value | ) |
Definition at line 84 of file Transport.inl.
{ // This should only be called by the Transport Cache Manager when // it is holding it's lock. // The transport should still be here since the cache manager still // has a reference to it. this->purging_order_ = value; }
bool TAO_Transport::queue_is_empty | ( | void | ) |
Check if there are messages pending in the queue.
Definition at line 106 of file Transport.inl.
{ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, false); return this->queue_is_empty_i (); }
bool TAO_Transport::queue_is_empty_i | ( | void | ) | const [private] |
Check if there are messages pending in the queue.
This version assumes that the lock is already held. Use with care!
Definition at line 8 of file Transport.inl.
{ return (this->head_ == 0); }
int TAO_Transport::queue_message_i | ( | const ACE_Message_Block * | message_block, | |
ACE_Time_Value * | max_wait_time, | |||
bool | back = true | |||
) | [protected] |
Queue a message for message_block
max_wait_time | The maximum time that the operation can block, used in the implementation of timeouts. | |
back | If true, the message will be pushed to the back of the queue. If false, the message will be pushed to the front of the queue. |
Definition at line 1601 of file Transport.cpp.
{ TAO_Queued_Message *queued_message = 0; ACE_NEW_RETURN (queued_message, TAO_Asynch_Queued_Message (message_block, this->orb_core_, max_wait_time, 0, true), -1); if (back) { queued_message->push_back (this->head_, this->tail_); } else { queued_message->push_front (this->head_, this->tail_); } return 0; }
int TAO_Transport::recache_transport | ( | TAO_Transport_Descriptor_Interface * | desc | ) |
Recache ourselves in the cache.
Ideally the following should be inline.
purge_entry has a return value, use it
Definition at line 469 of file Transport.cpp.
{ // First purge our entry this->purge_entry (); // Then add ourselves to the cache return this->transport_cache_manager ().cache_transport (desc, this); }
virtual ssize_t TAO_Transport::recv | ( | char * | buffer, | |
size_t | len, | |||
const ACE_Time_Value * | timeout = 0 | |||
) | [pure virtual] |
Read len bytes from into buf.
This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.
buffer | ORB allocated buffer where the data should be @ The ACE_Time_Value *s is just a place holder for now. It is not clear this this is the best place to specify this. The actual timeout values will be kept in the Policies. |
Implemented in TAO_IIOP_Transport, TAO_DIOP_Transport, and TAO_UIOP_Transport.
size_t TAO_Transport::recv_buffer_size | ( | void | ) | const |
Accessor to recv_buffer_size_.
Definition at line 197 of file Transport.inl.
{ return this->recv_buffer_size_; }
int TAO_Transport::register_handler | ( | void | ) | [virtual] |
Register the handler with the reactor.
Register the handler with the reactor. This method is used by the Wait_On_Reactor strategy. The transport must register its event handler with the ORB's Reactor.
Reimplemented in TAO_DIOP_Transport.
Definition at line 365 of file Transport.cpp.
{ if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"), this->id ())); } ACE_Reactor * const r = this->orb_core_->reactor (); // @@note: This should be okay since the register handler call will // not make a nested call into the transport. ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, false); if (r == this->event_handler_i ()->reactor ()) { return 0; } // Set the flag in the Connection Handler and in the Wait Strategy // @@Maybe we should set these flags after registering with the // reactor. What if the registration fails??? this->ws_->is_registered (true); // Register the handler with the reactor return r->register_handler (this->event_handler_i (), ACE_Event_Handler::READ_MASK); }
bool TAO_Transport::register_if_necessary | ( | void | ) |
Register with the reactor via the wait strategy.
Definition at line 331 of file Transport.cpp.
{ if (this->is_connected_ && this->wait_strategy ()->register_handler () == -1) { // Registration failures. if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_if_necessary, ") ACE_TEXT ("could not register the transport ") ACE_TEXT ("in the reactor.\n"), this->id ())); } // Purge from the connection cache, if we are not in the cache, this // just does nothing. (void) this->purge_entry (); // Close the handler. (void) this->close_connection (); return false; } return true; }
ACE_Event_Handler::Reference_Count TAO_Transport::remove_reference | ( | void | ) |
These classes need privileged access to:
Definition at line 2652 of file Transport.cpp.
{ return this->event_handler_i ()->remove_reference (); }
void TAO_Transport::report_invalid_event_handler | ( | const char * | caller | ) | [private] |
Print out error messages if the event handler is not valid.
Definition at line 1282 of file Transport.cpp.
{ if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler") ACE_TEXT ("(%C) no longer associated with handler [tag=%d]\n"), this->id (), caller, this->tag_)); } }
void TAO_Transport::reset_flush_timer | ( | void | ) | [private] |
The flush timer expired or was explicitly cancelled, mark it as not pending
Definition at line 122 of file Transport.inl.
{ this->flush_timer_id_ = -1; this->current_deadline_ = ACE_Time_Value::zero; }
int TAO_Transport::schedule_output_i | ( | void | ) | [private] |
Schedule handle_output() callbacks.
Definition at line 804 of file Transport.cpp.
{ ACE_Event_Handler * const eh = this->event_handler_i (); ACE_Reactor * const reactor = eh->reactor (); if (reactor == 0) { if (TAO_debug_level > 1) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - ") ACE_TEXT ("Transport[%d]::schedule_output_i, ") ACE_TEXT ("no reactor,") ACE_TEXT ("returning -1\n"), this->id ())); } return -1; } // Check to see if our event handler is still registered with the // reactor. It's possible for another thread to have run close_connection() // since we last used the event handler. ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ()); if (found) { found->remove_reference (); if (found != eh) { if (TAO_debug_level > 3) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - ") ACE_TEXT ("Transport[%d]::schedule_output_i ") ACE_TEXT ("event handler not found in reactor,") ACE_TEXT ("returning -1\n"), this->id ())); } return -1; } } if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"), this->id ())); } return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK); }
virtual ssize_t TAO_Transport::send | ( | iovec * | iov, | |
int | iovcnt, | |||
size_t & | bytes_transferred, | |||
ACE_Time_Value const * | timeout | |||
) | [pure virtual] |
Write the complete Message_Block chain to the connection.
This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.
Often the implementation simply forwards the arguments to the underlying ACE_Svc_Handler class. Using the code factored out into ACE.
Be careful with protocols that perform non-trivial transformations of the data, such as SSLIOP or protocols that compress the stream.
iov | contains the data that must be sent. | |
timeout | is the maximum time that the application is willing to wait for the data to be sent, useful in platforms that implement timed writes. The timeout value is obtained from the policies set by the application. | |
bytes_transferred | should return the total number of bytes successfully transferred before the connection blocked. This is required because in some platforms and/or protocols multiple system calls may be required to send the chain of message blocks. The first few calls can work successfully, but the final one can fail or signal a flow control situation (via EAGAIN). In this case the ORB expects the function to return -1, errno to be appropriately set and this argument to return the number of bytes already on the OS I/O subsystem. |
This call can also fail if the transport instance is no longer associated with a connection (e.g., the connection handler closed down). In that case, it returns -1 and sets errno to ENOENT
.
Implemented in TAO_IIOP_Transport, TAO_DIOP_Transport, and TAO_UIOP_Transport.
int TAO_Transport::send_asynchronous_message_i | ( | TAO_Stub * | stub, | |
const ACE_Message_Block * | message_block, | |||
ACE_Time_Value * | max_wait_time | |||
) | [private] |
Send an asynchronous message, i.e. do not block until the message is on the wire
Definition at line 1350 of file Transport.cpp.
{ // Let's figure out if the message should be queued without trying // to send first: bool try_sending_first = true; bool const queue_empty = this->queue_is_empty_i (); TAO::Transport_Queueing_Strategy *queue_strategy = stub->transport_queueing_strategy (); if (!queue_empty) { try_sending_first = false; } else if (queue_strategy) { if (queue_strategy->must_queue (queue_empty)) { try_sending_first = false; } } bool partially_sent = false; bool timeout_encountered = false; TAO::Transport::Drain_Constraints dc( max_wait_time, this->using_blocking_io_for_asynch_messages()); if (try_sending_first) { ssize_t n = 0; size_t byte_count = 0; // ... in this case we must try to send the message first ... size_t const total_length = message_block->total_length (); if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") ACE_TEXT ("trying to send the message (ml = %d)\n"), this->id (), total_length)); } // @@ I don't think we want to hold the mutex here, however if // we release it we need to recheck the status of the transport // after we return... once I understand the final form for this // code I will re-visit this decision n = this->send_message_block_chain_i (message_block, byte_count, dc); if (n == -1) { // ... if this is just an EWOULDBLOCK we must schedule the // message for later, if it is ETIME we still have to send // the complete message, because cutting off the message at // this point will destroy the synchronization with the // server ... if (errno != EWOULDBLOCK && errno != ETIME) { if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") ACE_TEXT ("fatal error in ") ACE_TEXT ("send_message_block_chain_i - %m\n"), this->id ())); } return -1; } } // ... let's figure out if the complete message was sent ... if (total_length == byte_count) { // Done, just return. Notice that there are no allocations // or copies up to this point (though some fancy calling // back and forth). // This is the common case for the critical path, it should // be fast. return 0; } if (byte_count > 0) { partially_sent = true; } // If it was partially sent, then push to front of queue and don't flush if (errno == ETIME) { timeout_encountered = true; if (byte_count == 0) { //This request has timed out and none of it was sent to the transport //We can't return -1 here, since that would end up closing the tranpsort if (TAO_debug_level > 2) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - ") ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ") ACE_TEXT ("timeout encountered before any bytes sent\n"), this->id ())); } throw ::CORBA::TIMEOUT ( CORBA::SystemException::_tao_minor_code ( TAO_TIMEOUT_SEND_MINOR_CODE, ETIME), CORBA::COMPLETED_NO); } } if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") ACE_TEXT ("partial send %d / %d bytes\n"), this->id (), byte_count, total_length)); } // ... part of the data was sent, need to figure out what piece // of the message block chain must be queued ... while (message_block != 0 && message_block->length () == 0) { message_block = message_block->cont (); } // ... at least some portion of the message block chain should // remain ... } // ... either the message must be queued or we need to queue it // because it was not completely sent out ... ACE_Time_Value *wait_time = (partially_sent ? 0: max_wait_time); if (this->queue_message_i (message_block, wait_time, !partially_sent) == -1) { if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") ACE_TEXT ("send_asynchronous_message_i, ") ACE_TEXT ("cannot queue message for - %m\n"), this->id ())); } return -1; } if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") ACE_TEXT ("message is queued\n"), this->id ())); } if (timeout_encountered && partially_sent) { //Must close down the transport here since we can't guarantee the //integrity of the GIOP stream (the next send may try to write to //the socket before looking at the queue). if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") ACE_TEXT ("send_asynchronous_message_i, ") ACE_TEXT ("timeout after partial send, closing.\n"), this->id ())); } return -1; } else if (!timeout_encountered) { // We can't flush if we have already encountered a timeout // ... if the queue is full we need to activate the output on the // queue ... bool must_flush = false; const bool constraints_reached = this->check_buffering_constraints_i (stub, must_flush); // ... but we also want to activate it if the message was partially // sent.... Plus, when we use the blocking flushing strategy the // queue is flushed as a side-effect of 'schedule_output()' TAO_Flushing_Strategy *flushing_strategy = this->orb_core ()->flushing_strategy (); if (constraints_reached || try_sending_first) { int const result = flushing_strategy->schedule_output (this); if (result == TAO_Flushing_Strategy::MUST_FLUSH) { must_flush = true; } } if (must_flush) { if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") ACE_TEXT ("send_asynchronous_message_i, ") ACE_TEXT ("flushing transport.\n"), this->id ())); } size_t sent_byte = sent_byte_count_; int ret = 0; { typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; TAO_REVERSE_LOCK reverse (*this->handler_lock_); ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); ret = flushing_strategy->flush_transport (this, max_wait_time); } if (ret == -1) { if (errno == ETIME) { if (sent_byte == sent_byte_count_) // if nothing was actually flushed { //This request has timed out and none of it was sent to the transport //We can't return -1 here, since that would end up closing the tranpsort if (TAO_debug_level > 2) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - ") ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ") ACE_TEXT ("2 timeout encountered before any bytes sent\n"), this->id ())); } throw ::CORBA::TIMEOUT (CORBA::SystemException::_tao_minor_code (TAO_TIMEOUT_SEND_MINOR_CODE, ETIME), CORBA::COMPLETED_NO); } } return -1; } } } return 0; }
void TAO_Transport::send_connection_closed_notifications | ( | void | ) |
Notify all the components inside a Transport when the underlying connection is closed.
Definition at line 1294 of file Transport.cpp.
{ { ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_)); this->send_connection_closed_notifications_i (); } this->tms ()->connection_closed (); }
void TAO_Transport::send_connection_closed_notifications_i | ( | void | ) | [private] |
Assume the lock is held.
Definition at line 1306 of file Transport.cpp.
{ this->cleanup_queue_i (); }
virtual int TAO_Transport::send_message | ( | TAO_OutputCDR & | stream, | |
TAO_Stub * | stub = 0 , |
|||
TAO_Message_Semantics | message_semantics = TAO_TWOWAY_REQUEST , |
|||
ACE_Time_Value * | max_time_wait = 0 | |||
) | [pure virtual] |
This method formats the stream and then sends the message on the transport. Once the ORB is prepared to receive a reply (see send_request() above), and all the arguments have been marshaled the CDR stream must be 'formatted', i.e. the message_size field in the GIOP header can finally be set to the proper value.
Implemented in TAO_IIOP_Transport, TAO_DIOP_Transport, and TAO_UIOP_Transport.
int TAO_Transport::send_message_block_chain | ( | const ACE_Message_Block * | message_block, | |
size_t & | bytes_transferred, | |||
ACE_Time_Value * | max_wait_time = 0 | |||
) |
This is a very specialized interface to send a simple chain of messages through the Transport. The only place we use this interface is in GIOP_Message_Base.cpp, to send error messages (i.e., an indication that we received a malformed GIOP message,) and to close the connection.
Definition at line 568 of file Transport.cpp.
{ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); TAO::Transport::Drain_Constraints dc( max_wait_time, true); return this->send_message_block_chain_i (mb, bytes_transferred, dc); }
int TAO_Transport::send_message_block_chain_i | ( | const ACE_Message_Block * | message_block, | |
size_t & | bytes_transferred, | |||
TAO::Transport::Drain_Constraints const & | dc | |||
) |
Send a message block chain, assuming the lock is held.
Definition at line 583 of file Transport.cpp.
{ size_t const total_length = mb->total_length (); // We are going to block, so there is no need to clone // the message block. TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); synch_message.push_back (this->head_, this->tail_); Drain_Result const n = this->drain_queue_i (dc); if (n == DR_ERROR) { synch_message.remove_from_list (this->head_, this->tail_); return -1; // Error while sending... } else if (n == DR_QUEUE_EMPTY) { bytes_transferred = total_length; return 1; // Empty queue, message was sent.. } // Remove the temporary message from the queue... synch_message.remove_from_list (this->head_, this->tail_); bytes_transferred = total_length - synch_message.message_length (); return 0; }
int TAO_Transport::send_message_shared | ( | TAO_Stub * | stub, | |
TAO_Message_Semantics | message_semantics, | |||
const ACE_Message_Block * | message_block, | |||
ACE_Time_Value * | max_wait_time | |||
) | [virtual] |
Sent the contents of message_block.
stub | The object reference used for this operation, useful to obtain the current policies. | |
message_semantics | If this is set to TAO_TWO_REQUEST this method will block until the operation is completely written on the wire. If it is set to other values this operation could return. | |
message_block | The CDR encapsulation of the GIOP message that must be sent. The message may consist of multiple Message Blocks chained through the cont() field. | |
max_wait_time | The maximum time that the operation can block, used in the implementation of timeouts. |
Definition at line 295 of file Transport.cpp.
{ int result = 0; { ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); result = this->send_message_shared_i (stub, message_semantics, message_block, max_wait_time); } if (result == -1) { // The connection needs to be closed here. // In the case of a partially written message this is the only way to cleanup // the physical connection as well as the Transport. An EOF on the remote end // will cancel the partially received message. this->close_connection (); } return result; }
int TAO_Transport::send_message_shared_i | ( | TAO_Stub * | stub, | |
TAO_Message_Semantics | message_semantics, | |||
const ACE_Message_Block * | message_block, | |||
ACE_Time_Value * | max_wait_time | |||
) | [protected] |
Implement send_message_shared() assuming the handler_lock_ is held.
Definition at line 1312 of file Transport.cpp.
{ int ret = 0; #if TAO_HAS_TRANSPORT_CURRENT == 1 size_t const message_length = message_block->length (); #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ switch (message_semantics) { case TAO_TWOWAY_REQUEST: ret = this->send_synchronous_message_i (message_block, max_wait_time); break; case TAO_REPLY: ret = this->send_reply_message_i (message_block, max_wait_time); break; case TAO_ONEWAY_REQUEST: ret = this->send_asynchronous_message_i (stub, message_block, max_wait_time); break; } #if TAO_HAS_TRANSPORT_CURRENT == 1 // "Count" the message, only if no error was encountered. if (ret != -1 && this->stats_ != 0) this->stats_->messages_sent (message_length); #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ return ret; }
int TAO_Transport::send_reply_message_i | ( | const ACE_Message_Block * | message_block, | |
ACE_Time_Value * | max_wait_time | |||
) | [private] |
Send a reply message, i.e. do not block until the message is on the wire, but just return after adding them to the queue.
Definition at line 713 of file Transport.cpp.
{ // Dont clone now.. We could be sent in one shot! TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); synch_message.push_back (this->head_, this->tail_); int const n = this->send_synch_message_helper_i (synch_message, max_wait_time); // What about partially sent messages. if (n == -1 || n == 1) { return n; } if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ") ACE_TEXT ("preparing to add to queue before leaving\n"), this->id ())); } // Till this point we shouldn't have any copying and that is the // point anyway. Now, remove the node from the list synch_message.remove_from_list (this->head_, this->tail_); // Clone the node that we have. TAO_Queued_Message *msg = synch_message.clone (this->orb_core_->transport_message_buffer_allocator ()); // Stick it in the queue msg->push_back (this->head_, this->tail_); TAO_Flushing_Strategy *flushing_strategy = this->orb_core ()->flushing_strategy (); int const result = flushing_strategy->schedule_output (this); if (result == -1) { if (TAO_debug_level > 5) { ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_" "message_i, dequeuing msg due to schedule_output " "failure\n", this->id ())); } msg->remove_from_list (this->head_, this->tail_); msg->destroy (); } else if (result == TAO_Flushing_Strategy::MUST_FLUSH) { typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; TAO_REVERSE_LOCK reverse (*this->handler_lock_); ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); (void) flushing_strategy->flush_transport (this, 0); } return 1; }
virtual int TAO_Transport::send_request | ( | TAO_Stub * | stub, | |
TAO_ORB_Core * | orb_core, | |||
TAO_OutputCDR & | stream, | |||
TAO_Message_Semantics | message_semantics, | |||
ACE_Time_Value * | max_time_wait | |||
) | [pure virtual] |
Prepare the waiting and demuxing strategy to receive a reply for a new request. Preparing the ORB to receive the reply only once the request is completely sent opens the system to some subtle race conditions: suppose the ORB is running in a multi-threaded configuration, thread A makes a request while thread B is using the Reactor to process all incoming requests. Thread A could be implemented as follows: 1) send the request 2) setup the ORB to receive the reply 3) wait for the request
but in this case thread B may receive the reply between step (1) and (2), and drop it as an invalid or unexpected message. Consequently the correct implementation is: 1) setup the ORB to receive the reply 2) send the request 3) wait for the reply
The following method encapsulates this idiom.
Implemented in TAO_IIOP_Transport, TAO_DIOP_Transport, and TAO_UIOP_Transport.
int TAO_Transport::send_synch_message_helper_i | ( | TAO_Synch_Queued_Message & | s, | |
ACE_Time_Value * | max_wait_time | |||
) | [private] |
A helper method used by send_synchronous_message_i() and send_reply_message_i(). Reusable code that could be used by both the methods.
Definition at line 777 of file Transport.cpp.
{ TAO::Transport::Drain_Constraints dc( max_wait_time, this->using_blocking_io_for_synch_messages()); Drain_Result const n = this->drain_queue_i (dc); if (n == DR_ERROR) { synch_message.remove_from_list (this->head_, this->tail_); return -1; // Error while sending... } else if (n == DR_QUEUE_EMPTY) { return 1; // Empty queue, message was sent.. } if (synch_message.all_data_sent ()) { return 1; } return 0; }
int TAO_Transport::send_synchronous_message_i | ( | const ACE_Message_Block * | message_block, | |
ACE_Time_Value * | max_wait_time | |||
) | [private] |
Send a synchronous message, i.e. block until the message is on the wire
Definition at line 617 of file Transport.cpp.
{ // We are going to block, so there is no need to clone // the message block. size_t const total_length = mb->total_length (); TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); synch_message.push_back (this->head_, this->tail_); int const result = this->send_synch_message_helper_i (synch_message, max_wait_time); if (result == -1 && errno == ETIME) { if (total_length == synch_message.message_length ()) //none was sent { if (TAO_debug_level > 2) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - ") ACE_TEXT ("Transport[%d]::send_synchronous_message_i, ") ACE_TEXT ("timeout encountered before any bytes sent\n"), this->id ())); } throw ::CORBA::TIMEOUT ( CORBA::SystemException::_tao_minor_code ( TAO_TIMEOUT_SEND_MINOR_CODE, ETIME), CORBA::COMPLETED_NO); } else { return -1; } } else if(result == -1 || result == 1) { return result; } TAO_Flushing_Strategy *flushing_strategy = this->orb_core ()->flushing_strategy (); if (flushing_strategy->schedule_output (this) == -1) { synch_message.remove_from_list (this->head_, this->tail_); if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") ACE_TEXT ("send_synchronous_message_i, ") ACE_TEXT ("error while scheduling flush - %m\n"), this->id ())); } return -1; } // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH, // because we're always going to flush anyway. // Release the mutex, other threads may modify the queue as we // block for a long time writing out data. int flush_result; { typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; TAO_REVERSE_LOCK reverse (*this->handler_lock_); ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); flush_result = flushing_strategy->flush_message (this, &synch_message, max_wait_time); } if (flush_result == -1) { synch_message.remove_from_list (this->head_, this->tail_); // We don't need to do anything special for the timeout case. // The connection is going to get closed and the Transport destroyed. // The only thing to do maybe is to empty the queue. if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ") ACE_TEXT ("error while sending message - %m\n"), this->id ())); } return -1; } return 1; }
size_t TAO_Transport::sent_byte_count | ( | void | ) | const |
Accessor to sent_byte_count_.
Definition at line 203 of file Transport.inl.
{ return this->sent_byte_count_; }
void TAO_Transport::set_bidir_context_info | ( | TAO_Operation_Details & | opdetails | ) | [virtual] |
These classes need privileged access to:
Reimplemented in TAO_IIOP_Transport.
Definition at line 2786 of file Transport.cpp.
{ }
void TAO_Transport::set_flush_in_post_open | ( | void | ) |
Set the flush in post open flag.
Definition at line 209 of file Transport.inl.
{ this->flush_in_post_open_ = true; }
TAO::Transport::Stats* TAO_Transport::stats | ( | void | ) | const |
Transport statistics.
CORBA::ULong TAO_Transport::tag | ( | void | ) | const |
Return the protocol tag.
The OMG assigns unique tags (a 32-bit unsigned number) to each protocol. New protocol tags can be obtained free of charge from the OMG, check the documents in corbafwd.h for more details.
Definition at line 14 of file Transport.inl.
{ return this->tag_; }
int TAO_Transport::tear_listen_point_list | ( | TAO_InputCDR & | cdr | ) | [virtual] |
Extracts the list of listen points from the cdr stream. The list would have the protocol specific details of the ListenPoints
Reimplemented in TAO_IIOP_Transport.
Definition at line 289 of file Transport.cpp.
{ ACE_NOTSUP_RETURN (-1); }
TAO_Transport_Mux_Strategy * TAO_Transport::tms | ( | void | ) | const |
Get the TAO_Tranport_Mux_Strategy used by this object.
The role of the TAO_Transport_Mux_Strategy is described in more detail in that class' documentation. Enough is to say that the class is used to control how many threads can have pending requests over the same connection. Multiplexing multiple threads over the same connection conserves resources and is almost required for AMI, but having only one pending request per connection is more efficient and reduces the possibilities of priority inversions.
Definition at line 26 of file Transport.inl.
{ return tms_; }
TAO::Transport_Cache_Manager & TAO_Transport::transport_cache_manager | ( | void | ) | [private] |
Helper method that returns the Transport Cache Manager.
Definition at line 2610 of file Transport.cpp.
{ return this->orb_core_->lane_resources ().transport_cache (); }
int TAO_Transport::update_transport | ( | void | ) |
Cache management.
Definition at line 514 of file Transport.cpp.
{ return this->transport_cache_manager ().update_entry (this->cache_map_entry_); }
bool TAO_Transport::using_blocking_io_for_asynch_messages | ( | ) | const [private] |
Return true if blocking I/O should be used for sending asynchronous (AMI calls, non-blocking oneways, responses to operations, etc.) messages. This is determined based on the current flushing strategy.
Definition at line 2816 of file Transport.cpp.
{ return false; }
bool TAO_Transport::using_blocking_io_for_synch_messages | ( | ) | const [private] |
Return true if blocking I/O should be used for sending synchronous (two-way, reliable oneways, etc.) messages. This is determined based on the current flushing and waiting strategies.
Definition at line 2806 of file Transport.cpp.
{ if (this->wait_strategy()->can_process_upcalls()) { return false; } return true; }
TAO_Wait_Strategy * TAO_Transport::wait_strategy | ( | void | ) | const |
Return the TAO_Wait_Strategy used by this object.
The role of the TAO_Wait_Strategy is described in more detail in that class' documentation. Enough is to say that the ORB can wait for a reply blocking on read(), using the Reactor to wait for multiple events concurrently or using the Leader/Followers protocol.
Definition at line 33 of file Transport.inl.
{ return this->ws_; }
TAO_Codeset_Translator_Base * TAO_Transport::wchar_translator | ( | void | ) | const |
CodeSet Negotiation - Get the wchar codeset translator factory.
Definition at line 145 of file Transport.inl.
{ return this->wchar_translator_; }
void TAO_Transport::wchar_translator | ( | TAO_Codeset_Translator_Base * | tf | ) |
CodeSet negotiation - Set the wchar codeset translator factory.
Definition at line 158 of file Transport.inl.
{ this->wchar_translator_ = tf; this->tcs_set_ = 1; }
friend class TAO_Leader_Follower_Flushing_Strategy [friend] |
These classes need privileged access to:
Definition at line 948 of file Transport.h.
friend class TAO_Reactive_Flushing_Strategy [friend] |
These classes need privileged access to:
Definition at line 947 of file Transport.h.
friend class TAO_Thread_Per_Connection_Handler [friend] |
Needs priveleged access to event_handler_i ()
Definition at line 952 of file Transport.h.
int TAO_Transport::bidirectional_flag_ [protected] |
Use to check if bidirectional info has been synchronized with the peer. Have we sent any info on bidirectional information or have we received any info regarding making the connection served by this transport bidirectional. The flag is used as follows: + We dont want to send the bidirectional context info more than once on the connection. Why? Waste of marshalling and demarshalling time on the client. + On the server side -- once a client that has established the connection asks the server to use the connection both ways, we *dont* want the server to pack service info to the client. That is not allowed. We need a flag to prevent such a things from happening.
The value of this flag will be 0 if the client sends info and 1 if the server receives the info.
Definition at line 1113 of file Transport.h.
Our entry in the cache. We don't own this. It is here for our convenience. We cannot just change things around.
Definition at line 1085 of file Transport.h.
Additional member values required to support codeset translation.
@Phil, I think it would be nice if we could think of a way to do the following. We have been trying to use the transport for marking about translator factories and such! IMHO this is a wrong encapulation ie. trying to populate the transport object with these details. We should probably have a class something like TAO_Message_Property or TAO_Message_Translator or whatever (I am sure you get the idea) and encapsulate all these details. Coupling these seems odd. if I have to be more cynical we can move this to the connection_handler and it may more sense with the DSCP stuff around there. Do you agree?
Definition at line 1191 of file Transport.h.
ACE_Time_Value TAO_Transport::current_deadline_ [protected] |
The queue will start draining no later than <queeing_deadline_> if* the deadline is
Definition at line 1130 of file Transport.h.
bool TAO_Transport::first_request_ [private] |
First_request_ is true until the first request is sent or received. This is necessary since codeset context information is necessary only on the first request. After that, the translators are fixed for the life of the connection.
Definition at line 1203 of file Transport.h.
bool TAO_Transport::flush_in_post_open_ [private] |
Indicate that flushing needs to be done in post_open().
Definition at line 1223 of file Transport.h.
long TAO_Transport::flush_timer_id_ [protected] |
The timer ID.
Definition at line 1133 of file Transport.h.
ACE_Lock* TAO_Transport::handler_lock_ [mutable, protected] |
Lock that insures that activities that *might* use handler-related resources (such as a connection handler) get serialized. This is an ACE_Lock
that gets initialized from TAO_ORB_Core::resource_factory()->create_cached_connection_lock()
. This way, one can use a lock appropriate for the type of system, i.e., a null lock for single-threaded systems, and a real lock for multi-threaded systems.
Definition at line 1147 of file Transport.h.
TAO_Queued_Message* TAO_Transport::head_ [protected] |
Implement the outgoing data queue.
Definition at line 1118 of file Transport.h.
size_t TAO_Transport::id_ [protected] |
A unique identifier for the transport.
This never *never* changes over the lifespan, so we don't have to worry about locking it.
HINT: Protocol-specific transports that use connection handler might choose to set this to the handle for their connection.
Definition at line 1157 of file Transport.h.
Queue of the consolidated, incoming messages..
Definition at line 1122 of file Transport.h.
TAO::Incoming_Message_Stack TAO_Transport::incoming_message_stack_ [protected] |
Stack of incoming fragments, consolidated messages are going to be enqueued in "incoming_message_queue_"
Definition at line 1126 of file Transport.h.
bool TAO_Transport::is_connected_ [protected] |
Is this transport really connected or not. In case of oneways with SYNC_NONE Policy we don't wait until the connection is ready and we buffer the requests in this transport until the connection is ready
Definition at line 1171 of file Transport.h.
Our messaging object.
Definition at line 1176 of file Transport.h.
These classes need privileged access to:
Definition at line 1115 of file Transport.h.
TAO_ORB_Core* const TAO_Transport::orb_core_ [protected] |
Global orbcore resource.
Definition at line 1081 of file Transport.h.
TAO_SYNCH_MUTEX TAO_Transport::output_cdr_mutex_ [mutable, private] |
lock for synchronizing Transport OutputCDR access
Definition at line 1226 of file Transport.h.
ACE_Message_Block* TAO_Transport::partial_message_ [private] |
Holds the partial GIOP message (if there is one).
Definition at line 1206 of file Transport.h.
unsigned long TAO_Transport::purging_order_ [protected] |
Used by the LRU, LFU and FIFO Connection Purging Strategies.
Definition at line 1160 of file Transport.h.
size_t TAO_Transport::recv_buffer_size_ [protected] |
Size of the buffer received.
Definition at line 1163 of file Transport.h.
size_t TAO_Transport::sent_byte_count_ [protected] |
Number of bytes sent.
Definition at line 1166 of file Transport.h.
TAO::Transport::Stats* TAO_Transport::stats_ [private] |
Statistics.
Definition at line 1219 of file Transport.h.
CORBA::ULong const TAO_Transport::tag_ [protected] |
IOP protocol tag.
Definition at line 1078 of file Transport.h.
TAO_Queued_Message* TAO_Transport::tail_ [protected] |
These classes need privileged access to:
Definition at line 1119 of file Transport.h.
CORBA::Boolean TAO_Transport::tcs_set_ [private] |
The tcs_set_ flag indicates that negotiation has occured and so the translators are correct, since a null translator is valid if both ends are using the same codeset, whatever that codeset might be.
Definition at line 1197 of file Transport.h.
TAO_Transport_Mux_Strategy* TAO_Transport::tms_ [protected] |
Strategy to decide whether multiple requests can be sent over the same connection or the connection is exclusive for a request.
Definition at line 1089 of file Transport.h.
TAO_Transport_Timer TAO_Transport::transport_timer_ [protected] |
The adapter used to receive timeout callbacks from the Reactor.
Definition at line 1136 of file Transport.h.
These classes need privileged access to:
Definition at line 1192 of file Transport.h.
TAO_Wait_Strategy* TAO_Transport::ws_ [protected] |
Strategy for waiting for the reply after sending the request.
Definition at line 1092 of file Transport.h.