Transport.h

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 
00003 //=============================================================================
00004 /**
00005  *  @file Transport.h
00006  *
00007  *  $Id: Transport.h 79388 2007-08-17 16:05:00Z wilsond $
00008  *
00009  *  Define the interface for the Transport component in TAO's
00010  *  pluggable protocol framework.
00011  *
00012  *  @author  Fred Kuhns <fredk@cs.wustl.edu>
00013  */
00014 //=============================================================================
00015 
00016 #ifndef TAO_TRANSPORT_H
00017 #define TAO_TRANSPORT_H
00018 
00019 #include /**/ "ace/pre.h"
00020 
00021 #include "tao/Transport_Cache_Manager.h"
00022 
00023 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00024 # pragma once
00025 #endif /* ACE_LACKS_PRAGMA_ONCE */
00026 
00027 #include "tao/Transport_Timer.h"
00028 #include "tao/Incoming_Message_Queue.h"
00029 #include "tao/Incoming_Message_Stack.h"
00030 #include "ace/Time_Value.h"
00031 #include "ace/Basic_Stats.h"
00032 
00033 struct iovec;
00034 
00035 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00036 
00037 class TAO_ORB_Core;
00038 class TAO_Target_Specification;
00039 class TAO_Operation_Details;
00040 class TAO_Transport_Mux_Strategy;
00041 class TAO_Wait_Strategy;
00042 class TAO_Connection_Handler;
00043 class TAO_Pluggable_Messaging;
00044 class TAO_Codeset_Translator_Base;
00045 
00046 class TAO_Queued_Message;
00047 class TAO_Synch_Queued_Message;
00048 class TAO_Resume_Handle;
00049 class TAO_Stub;
00050 class TAO_MMAP_Allocator;
00051 
00052 namespace TAO
00053 {
00054   /**
00055    * @note Should this be in TAO namespace. Seems like a candidate
00056    * that should be in the transport
00057    */
00058   enum Connection_Role
00059     {
00060       TAO_UNSPECIFIED_ROLE = 0,
00061       TAO_SERVER_ROLE = 1,
00062       TAO_CLIENT_ROLE = 2
00063     };
00064 
00065   namespace Transport
00066   {
00067     /// Transport-level statistics. Initially introduced to support
00068     /// the "Transport Current" functionality.
00069     class Stats;
00070   }
00071 }
00072 
00073 /*
00074  * Specialization hook for the TAO's transport implementation.
00075  */
00076 //@@ TAO_TRANSPORT_SPL_INCLUDE_FORWARD_DECL_ADD_HOOK
00077 
00078 /**
00079  * @class TAO_Transport
00080  *
00081  * @brief Generic definitions for the Transport class.
00082  *
00083  * The transport object is created in the Service handler
00084  * constructor and deleted in the Service Handler's destructor!!
00085  *
00086  * The main responsability of a Transport object is to encapsulate a
00087  * connection, and provide a transport independent way to send and
00088  * receive data.  Since TAO is heavily based on the Reactor for all if
00089  * not all its I/O the Transport class is usually implemented with a
00090  * helper Connection Handler that adapts the generic Transport
00091  * interface to the Reactor types.
00092  *
00093  * <H3>The outgoing data path:</H3>
00094  *
00095  * One of the responsibilities of the TAO_Transport class is to send
00096  * out GIOP messages as efficiently as possible.  In most cases
00097  * messages are put out in FIFO order, the transport object will put
00098  * out the message using a single system call and return control to
00099  * the application.  However, for oneways and AMI requests it may be
00100  * more efficient (or required if the SYNC_NONE policy is in effect)
00101  * to queue the messages until a large enough data set is available.
00102  * Another reason to queue is that some applications cannot block for
00103  * I/O, yet they want to send messages so large that a single write()
00104  * operation would not be able to cope with them.  In such cases we
00105  * need to queue the data and use the Reactor to drain the queue.
00106  *
00107  * Therefore, the Transport class may need to use a queue to
00108  * temporarily hold the messages, and, in some configurations, it may
00109  * need to use the Reactor to concurrently drain such queues.
00110  *
00111  * <H4>Out of order messages:</H4> TAO provides explicit policies to
00112  * send 'urgent' messages.  Such messages may put at the head of the
00113  * queue. However, they cannot be sent immediately because the
00114  * transport may already be sending another message in a reactive
00115  * fashion.
00116  *
00117  * Consequently, the Transport must also know if the head of the queue
00118  * has been partially sent.  In that case new messages can only follow
00119  * the head. Only once the head is completely sent we can start
00120  * sending new messages.
00121  *
00122  * <H4>Waiting threads:</H4> One or more threads can be blocked
00123  * waiting for the connection to completely send the message.
00124  * The thread should return as soon as its message has been sent, so a
00125  * per-thread condition is required.  This suggest that simply using a
00126  * ACE_Message_Queue would not be enough:  there is a significant
00127  * amount of ancillary information, to keep on each message that the
00128  * Message_Block class does not provide room for.
00129  *
00130  * Blocking I/O is still attractive for some applications.  First, my
00131  * eliminating the Reactor overhead performance is improved when
00132  * sending large blocks of data.  Second, using the Reactor to send
00133  * out data opens the door for nested upcalls, yet some applications
00134  * cannot deal with the reentrancy issues in this case.
00135  *
00136  * <H4>Timeouts:</H4> Some or all messages could have a timeout period
00137  * attached to them.  The timeout source could either be some
00138  * high-level policy or maybe some strategy to prevent denial of
00139  * service attacks.  In any case the timeouts are per-message, and
00140  * later messages could have shorter timeouts.
00141  * In fact, some kind of scheduling (such as EDF) could be required in
00142  * a few applications.
00143  *
00144  * <H4>Conclusions:</H4> The outgoing data path consist in several
00145  * components:
00146  *
00147  * - A queue of pending messages
00148  * - A message currently being transmitted
00149  * - A per-transport 'send strategy' to choose between blocking on
00150  *   write, blocking on the reactor or blockin on leader/follower.
00151  * - A per-message 'waiting object'
00152  * - A per-message timeout
00153  *
00154  * The Transport object provides a single method to send request
00155  * messages (send_request_message ()).
00156  *
00157  * <H3>The incoming data path:</H3>
00158  *
00159  * One of the main responsibilities of the transport is to read and
00160  * process the incoming GIOP message as quickly and efficiently as
00161  * possible. There are other forces that needs to be given due
00162  * consideration. They are
00163  *  - Multiple threads should be able to traverse along the same data
00164  *    path but should not be able to read from the same handle at the
00165  *    same time ie. the handle should not be shared between threads at
00166  *    any instant.
00167  *  - Reads on the handle could give one or more messages.
00168  *  - Minimise locking and copying overhead when trying to attack the
00169  *    above.
00170  *
00171  * <H3> Parsing messages (GIOP) & processing the message:</H3>
00172  *
00173  * The messages should be checked for validity and the right
00174  * information should be sent to the higher layer for processing. The
00175  * process of doing a sanity check and preparing the messages for the
00176  * higher layers of the ORB are done by the messaging protocol.
00177  *
00178  * <H3> Design forces and Challenges </H3>
00179  *
00180  * To keep things as efficient as possible for medium sized requests,
00181  * it would be good to minimise data copying and locking along the
00182  * incoming path ie. from the time of reading the data from the handle
00183  * to the application. We achieve this by creating a buffer on stack
00184  * and reading the data from the handle into the buffer. We then pass
00185  * the same data block (the buffer is encapsulated into a data block)
00186  * to the higher layers of the ORB. The problems stem from the
00187  * following
00188  *  (a) Data is bigger than the buffer that we have on stack
00189  *  (b) Transports like TCP do not guarantee availability of the whole
00190  *      chunk of data in one shot. Data could trickle in byte by byte.
00191  *  (c) Single read gives multiple messages
00192  *
00193  * We solve the problems as follows
00194  *
00195  *   (a) First do a read with the buffer on stack. Query the underlying
00196  *       messaging object whether the message has any incomplete
00197  *       portion. If so, data will be copied into new buffer being able
00198  *       to hold full message and is queued; succeeding events will read
00199  *       data from socket and write directly into this buffer.
00200  *       Otherwise, if if the message in local buffer is complete, we free
00201  *       the handle and then send the message to the higher layers of the
00202  *       ORB for processing.
00203  *
00204  *   (b) If buffer with incomplete message has been enqueued, while trying
00205  *       to do the above, the reactor will call us back when the handle
00206  *       becomes read ready. The read-operation will copy data directly
00207  *       into the enqueued buffer.  If the message has bee read completely
00208  *       the message is sent to the higher layers of the ORB for processing.
00209  *
00210  *   (c) If we get multiple messages (possible if the client connected
00211  *       to the server sends oneways or AMI requests), we parse and
00212  *       split the messages. Every message is put in the queue. Once
00213  *       the messages are queued, the thread picks up one message to
00214  *       send to the higher layers of the ORB. Before doing that, if
00215  *       it finds more messages, it sends a notify to the reactor
00216  *       without resuming the handle. The next thread picks up a
00217  *       message from the queue and processes that. Once the queue
00218  *       is drained the last thread resumes the handle.
00219  *
00220  * <H3> Sending Replies </H3>
00221  *
00222  * We could use the outgoing path of the ORB to send replies. This
00223  * would allow us to reuse most of the code in the outgoing data
00224  * path. We were doing this till TAO-1.2.3. We run in to
00225  * problems. When writing the reply the ORB gets flow controlled, and the
00226  * ORB tries to flush the message by going into the reactor. This
00227  * resulted in unnecessary nesting. The thread that gets into the
00228  * Reactor could potentially handle other messages (incoming or
00229  * outgoing) and the stack starts growing leading to crashes.
00230  *
00231  * <H4> Solution to the nesting problem </H4>
00232  *
00233  * The solution that we (plan to) adopt is pretty straight
00234  * forward. The thread sending replies will not block to send the
00235  * replies but queue the replies and return to the Reactor. (Note the
00236  * careful usages of the terms "blocking in the Reactor" as opposed to
00237  * "return back to the Reactor".
00238  *
00239  *
00240  * <B>See Also:</B>
00241  *
00242  * https://svn.dre.vanderbilt.edu/viewvc/Middleware/trunk/TAO/docs/pluggable_protocols/index.html?revision=HEAD
00243  *
00244  */
00245 class TAO_Export TAO_Transport
00246 {
00247 public:
00248 
00249   /// Default creator, requires the tag value be supplied.
00250   TAO_Transport (CORBA::ULong tag, TAO_ORB_Core *orb_core);
00251 
00252   /// Destructor
00253   virtual ~TAO_Transport (void);
00254 
00255   /// Return the protocol tag.
00256   /**
00257    * The OMG assigns unique tags (a 32-bit unsigned number) to each
00258    * protocol. New protocol tags can be obtained free of charge from
00259    * the OMG, check the documents in corbafwd.h for more details.
00260    */
00261   CORBA::ULong tag (void) const;
00262 
00263   /// Access the ORB that owns this connection.
00264   TAO_ORB_Core *orb_core (void) const;
00265 
00266   /// Get the TAO_Tranport_Mux_Strategy used by this object.
00267   /**
00268    * The role of the TAO_Transport_Mux_Strategy is described in more
00269    * detail in that class' documentation.  Enough is to say that the
00270    * class is used to control how many threads can have pending
00271    * requests over the same connection. Multiplexing multiple threads
00272    * over the same connection conserves resources and is almost
00273    * required for AMI, but having only one pending request per
00274    * connection is more efficient and reduces the possibilities of
00275    * priority inversions.
00276    */
00277   TAO_Transport_Mux_Strategy *tms (void) const;
00278 
00279   /// Return the TAO_Wait_Strategy used by this object.
00280   /**
00281    * The role of the TAO_Wait_Strategy is described in more detail in
00282    * that class' documentation. Enough is to say that the ORB can wait
00283    * for a reply blocking on read(), using the Reactor to wait for
00284    * multiple events concurrently or using the Leader/Followers
00285    * protocol.
00286    */
00287   TAO_Wait_Strategy *wait_strategy (void) const;
00288 
00289   /// Callback method to reactively drain the outgoing data queue
00290   int handle_output (void);
00291 
00292   /// Get the bidirectional flag
00293   int bidirectional_flag (void) const;
00294 
00295   /// Set the bidirectional flag
00296   void bidirectional_flag (int flag);
00297 
00298   /// Set the Cache Map entry
00299   void cache_map_entry (TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *entry);
00300 
00301   /// Get the Cache Map entry
00302   TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *cache_map_entry (void);
00303 
00304   /// Set and Get the identifier for this transport instance.
00305   /**
00306    * If not set, this will return an integer representation of
00307    * the <code>this</code> pointer for the instance on which
00308    * it's called.
00309    */
00310   size_t id (void) const;
00311   void id (size_t id);
00312 
00313   /**
00314    * Methods dealing with the role of the connection, e.g., CLIENT or SERVER.
00315    * See CORBA 2.6 Specification, Section 15.5.1 for origin of definitions.
00316    */
00317   TAO::Connection_Role opened_as (void) const;
00318   void opened_as (TAO::Connection_Role);
00319 
00320   /// Get and Set the purging order. The purging strategy uses the set
00321   /// version to set the purging order.
00322   unsigned long purging_order (void) const;
00323   void purging_order(unsigned long value);
00324 
00325   /// Check if there are messages pending in the queue
00326   /**
00327    * @return true if the queue is empty
00328    */
00329   bool queue_is_empty (void);
00330 
00331   /// Added event handler to the handlers set.
00332   /**
00333    * Called by the cache when the cache is closing.
00334    *
00335    * @param handlers The TAO_Connection_Handler_Set into which the
00336    *        transport should place its handler
00337    */
00338   void provide_handler (TAO::Connection_Handler_Set &handlers);
00339 
00340   /// Add event handlers corresponding to transports that have RW wait
00341   /// strategy to the handlers set.
00342   /**
00343    * Called by the cache when the ORB is shuting down.
00344    *
00345    * @param handlers The TAO_Connection_Handler_Set into which the
00346    *        transport should place its handler if the transport has RW
00347    *        strategy on.
00348    *
00349    * @return true indicates a handler was added to the handler set.
00350    *         false indocates that the transport did not have a
00351    *         blockable handler that could be added.
00352    */
00353   bool provide_blockable_handler (TAO::Connection_Handler_Set &handlers);
00354 
00355   /// Register the handler with the reactor.
00356   /**
00357    * Register the handler with the reactor. This method is used by the
00358    * Wait_On_Reactor strategy. The transport must register its event
00359    * handler with the ORB's Reactor.
00360    *
00361    * @todo I think this method is pretty much useless, the
00362    * connections are *always* registered with the Reactor, except in
00363    * thread-per-connection mode.  In that case putting the connection
00364    * in the Reactor would produce unpredictable results anyway.
00365    */
00366   virtual int register_handler (void);
00367 
00368   /// Write the complete Message_Block chain to the connection.
00369   /**
00370    * This method serializes on handler_lock_, guaranteeing that only
00371    * thread can execute it on the same instance concurrently.
00372    *
00373    * Often the implementation simply forwards the arguments to the
00374    * underlying ACE_Svc_Handler class. Using the code factored out
00375    * into ACE.
00376    *
00377    * Be careful with protocols that perform non-trivial
00378    * transformations of the data, such as SSLIOP or protocols that
00379    * compress the stream.
00380    *
00381    * @param iov contains the data that must be sent.
00382    *
00383    * @param timeout is the maximum time that the application is
00384    * willing to wait for the data to be sent, useful in platforms that
00385    * implement timed writes.
00386    * The timeout value is obtained from the policies set by the
00387    * application.
00388    *
00389    * @param bytes_transferred should return the total number of bytes
00390    * successfully transferred before the connection blocked.  This is
00391    * required because in some platforms and/or protocols multiple
00392    * system calls may be required to send the chain of message
00393    * blocks.  The first few calls can work successfully, but the final
00394    * one can fail or signal a flow control situation (via EAGAIN).
00395    * In this case the ORB expects the function to return -1, errno to
00396    * be appropriately set and this argument to return the number of
00397    * bytes already on the OS I/O subsystem.
00398    *
00399    * This call can also fail if the transport instance is no longer
00400    * associated with a connection (e.g., the connection handler closed
00401    * down).  In that case, it returns -1 and sets errno to
00402    * <code>ENOENT</code>.
00403    */
00404   virtual ssize_t send (iovec *iov,
00405                         int iovcnt,
00406                         size_t &bytes_transferred,
00407                         const ACE_Time_Value *timeout = 0) = 0;
00408 
00409 #if TAO_HAS_SENDFILE == 1
00410   /// Send data through zero-copy write mechanism, if available.
00411   /**
00412    * This method sends the data in the I/O vector through the platform
00413    * sendfile() function to perform a zero-copy write, if available.
00414    * Otherwise, the default fallback implementation simply delegates
00415    * to the TAO_Transport::send() method.
00416    *
00417    * @note This method is best used when sending very large blocks of
00418    *       data.
00419    */
00420   virtual ssize_t sendfile (TAO_MMAP_Allocator * allocator,
00421                             iovec * iov,
00422                             int iovcnt,
00423                             size_t &bytes_transferred,
00424                             ACE_Time_Value const * timeout = 0);
00425 #endif  /* TAO_HAS_SENDFILE==1 */
00426 
00427 
00428   /// Read len bytes from into buf.
00429   /**
00430    * This method serializes on handler_lock_, guaranteeing that only
00431    * thread can execute it on the same instance concurrently.
00432    *
00433    * @param buffer ORB allocated buffer where the data should be
00434    * @@ The ACE_Time_Value *s is just a place holder for now.  It is
00435    * not clear this this is the best place to specify this.  The actual
00436    * timeout values will be kept in the Policies.
00437    */
00438   virtual ssize_t recv (char *buffer,
00439                         size_t len,
00440                         const ACE_Time_Value *timeout = 0) = 0;
00441 
00442   /**
00443    * @name Control connection lifecycle
00444    *
00445    * These methods are routed through the TMS object. The TMS
00446    * strategies implement them correctly.
00447    */
00448   //@{
00449 
00450   /// Request has been just sent, but the reply is not received. Idle
00451   /// the transport now.
00452   bool idle_after_send (void);
00453 
00454   /// Request is sent and the reply is received. Idle the transport
00455   /// now.
00456   bool idle_after_reply (void);
00457 
00458   /// Call the implementation method after obtaining the lock.
00459   virtual void close_connection (void);
00460 
00461   //@}
00462 
00463   /** @name Template methods
00464    *
00465    * The Transport class uses the Template Method Pattern to implement
00466    * the protocol specific functionality.
00467    * Implementors of a pluggable protocol should override the
00468    * following methods with the semantics documented below.
00469    */
00470   /**
00471    * Initialising the messaging object. This would be used by the
00472    * connector side. On the acceptor side the connection handler
00473    * would take care of the messaging objects.
00474    */
00475   virtual int messaging_init (CORBA::Octet major, CORBA::Octet minor) = 0;
00476 
00477   /// Extracts the list of listen points from the @a cdr stream. The
00478   /// list would have the protocol specific details of the
00479   /// ListenPoints
00480   virtual int tear_listen_point_list (TAO_InputCDR &cdr);
00481 
00482   /// Hooks that can be overridden in concrete transports.
00483   /**
00484    * These hooks are invoked just after connection establishment (or
00485    * after a connection is fetched from cache). The
00486    * return value signifies whether the invoker should proceed  with
00487    * post connection establishment activities. Protocols like SSLIOP
00488    * need this to verify whether connections already established have
00489    * valid certificates. There are no pre_connect_hooks () since the
00490    * transport doesn't exist before a connection establishment. :-)
00491    *
00492    * @note The methods are not made const with a reason.
00493    */
00494   virtual bool post_connect_hook (void);
00495 
00496   /// Memory management routines.
00497   /*
00498    * Forwards to event handler.
00499    */
00500   ACE_Event_Handler::Reference_Count add_reference (void);
00501   ACE_Event_Handler::Reference_Count remove_reference (void);
00502 
00503   /// Return the messaging object that is used to format the data that
00504   /// needs to be sent.
00505   virtual TAO_Pluggable_Messaging * messaging_object (void) = 0;
00506 
00507   /** @name Template methods
00508    *
00509    * The Transport class uses the Template Method Pattern to implement
00510    * the protocol specific functionality.
00511    * Implementors of a pluggable protocol should override the
00512    * following methods with the semantics documented below.
00513    */
00514   //@{
00515 
00516   /// Return the event handler used to receive notifications from the
00517   /// Reactor.
00518   /**
00519    * Normally a concrete TAO_Transport object has-a ACE_Event_Handler
00520    * member that functions as an adapter between the ACE_Reactor
00521    * framework and the TAO pluggable protocol framework.
00522    * In all the protocols implemented so far this role is fullfilled
00523    * by an instance of ACE_Svc_Handler.
00524    *
00525    * @todo Since we only use a limited functionality of
00526    * ACE_Svc_Handler we could probably implement a generic
00527    * adapter class (TAO_Transport_Event_Handler or something), this
00528    * will reduce footprint and simplify the process of implementing a
00529    * pluggable protocol.
00530    *
00531    * @todo This method has to be renamed to event_handler()
00532    */
00533   virtual ACE_Event_Handler * event_handler_i (void) = 0;
00534 
00535   /// Is this transport really connected
00536   bool is_connected (void) const;
00537 
00538   /// Perform all the actions when this transport get opened
00539   bool post_open (size_t id);
00540 
00541   /// do what needs to be done when closing the transport
00542   void pre_close (void);
00543 
00544   /// Get the connection handler for this transport
00545   TAO_Connection_Handler * connection_handler (void);
00546 
00547   /// Accessor for the output CDR stream
00548   TAO_OutputCDR &out_stream (void);
00549 
00550   /*
00551    * Specialization hook to add public methods from
00552    * concrete transport implementations to TAO's transport
00553    * class
00554    */
00555   //@@ TAO_TRANSPORT_SPL_PUBLIC_METHODS_ADD_HOOK
00556 
00557 protected:
00558 
00559   virtual TAO_Connection_Handler * connection_handler_i (void) = 0;
00560 
00561 public:
00562 
00563   /// This is a request for the transport object to write a
00564   /// LocateRequest header before it is sent out.
00565   int generate_locate_request (TAO_Target_Specification &spec,
00566                                TAO_Operation_Details &opdetails,
00567                                TAO_OutputCDR &output);
00568 
00569   /// This is a request for the transport object to write a request
00570   /// header before it sends out the request
00571   virtual int generate_request_header (TAO_Operation_Details &opd,
00572                                        TAO_Target_Specification &spec,
00573                                        TAO_OutputCDR &msg);
00574 
00575   /// Recache ourselves in the cache
00576   int recache_transport (TAO_Transport_Descriptor_Interface* desc);
00577 
00578   /// Callback to read incoming data
00579   /**
00580    * The ACE_Event_Handler adapter invokes this method as part of its
00581    * handle_input() operation.
00582    *
00583    * @todo the method name is confusing! Calling it handle_input()
00584    * would probably make things easier to understand and follow!
00585    *
00586    * Once a complete message is read the Transport class delegates on
00587    * the Messaging layer to invoke the right upcall (on the server) or
00588    * the TAO_Reply_Dispatcher (on the client side).
00589    *
00590    * @param max_wait_time In some cases the I/O is synchronous, e.g. a
00591    * thread-per-connection server or when Wait_On_Read is enabled.  In
00592    * those cases a maximum read time can be specified.
00593    */
00594   virtual int handle_input (TAO_Resume_Handle &rh,
00595                             ACE_Time_Value *max_wait_time = 0);
00596 
00597   enum
00598     {
00599       TAO_ONEWAY_REQUEST = 0,
00600       TAO_TWOWAY_REQUEST = 1,
00601       TAO_REPLY
00602     };
00603 
00604   /// Prepare the waiting and demuxing strategy to receive a reply for
00605   /// a new request.
00606   /**
00607    * Preparing the ORB to receive the reply only once the request is
00608    * completely sent opens the system to some subtle race conditions:
00609    * suppose the ORB is running in a multi-threaded configuration,
00610    * thread A makes a request while thread B is using the Reactor to
00611    * process all incoming requests.
00612    * Thread A could be implemented as follows:
00613    * 1) send the request
00614    * 2) setup the ORB to receive the reply
00615    * 3) wait for the request
00616    *
00617    * but in this case thread B may receive the reply between step (1)
00618    * and (2), and drop it as an invalid or unexpected message.
00619    * Consequently the correct implementation is:
00620    * 1) setup the ORB to receive the reply
00621    * 2) send the request
00622    * 3) wait for the reply
00623    *
00624    * The following method encapsulates this idiom.
00625    *
00626    * @todo This is generic code, it should be factored out into the
00627    * Transport class.
00628    */
00629   // @nolock b/c this calls send_or_buffer
00630   virtual int send_request (TAO_Stub *stub,
00631                             TAO_ORB_Core *orb_core,
00632                             TAO_OutputCDR &stream,
00633                             int message_semantics,
00634                             ACE_Time_Value *max_time_wait) = 0;
00635 
00636   /// This method formats the stream and then sends the message on the
00637   /// transport.
00638   /**
00639    * Once the ORB is prepared to receive a reply (see send_request()
00640    * above), and all the arguments have been marshaled the CDR stream
00641    * must be 'formatted', i.e. the message_size field in the GIOP
00642    * header can finally be set to the proper value.
00643    *
00644    */
00645   virtual int send_message (TAO_OutputCDR &stream,
00646                             TAO_Stub *stub = 0,
00647                             int message_semantics = TAO_Transport::TAO_TWOWAY_REQUEST,
00648                             ACE_Time_Value *max_time_wait = 0) = 0;
00649 
00650   /// Sent the contents of @a message_block
00651   /**
00652    * @param stub The object reference used for this operation, useful
00653    *             to obtain the current policies.
00654    * @param message_semantics If this is set to TAO_TWO_REQUEST
00655    *        this method will block until the operation is completely
00656    *        written on the wire. If it is set to other values this
00657    *        operation could return.
00658    * @param message_block The CDR encapsulation of the GIOP message
00659    *             that must be sent.  The message may consist of
00660    *             multiple Message Blocks chained through the cont()
00661    *             field.
00662    * @param max_wait_time The maximum time that the operation can
00663    *             block, used in the implementation of timeouts.
00664    */
00665   virtual int send_message_shared (TAO_Stub *stub,
00666                                    int message_semantics,
00667                                    const ACE_Message_Block *message_block,
00668                                    ACE_Time_Value *max_wait_time);
00669 
00670 protected:
00671 
00672   /// Process the message by sending it to the higher layers of the
00673   /// ORB.
00674   int process_parsed_messages (TAO_Queued_Data *qd,
00675                                TAO_Resume_Handle &rh);
00676 
00677   /// Implement send_message_shared() assuming the handler_lock_ is
00678   /// held.
00679   int send_message_shared_i (TAO_Stub *stub,
00680                              int message_semantics,
00681                              const ACE_Message_Block *message_block,
00682                              ACE_Time_Value *max_wait_time);
00683 
00684   /// Queue a message for @a message_block
00685   /// @param max_wait_time The maximum time that the operation can
00686   ///            block, used in the implementation of timeouts.
00687   int queue_message_i (const ACE_Message_Block *message_block,
00688                        ACE_Time_Value *max_wait_time);
00689 
00690 public:
00691   /// Format and queue a message for @a stream
00692   /// @param max_wait_time The maximum time that the operation can
00693   ///            block, used in the implementation of timeouts.
00694   int format_queue_message (TAO_OutputCDR &stream,
00695                             ACE_Time_Value *max_wait_time);
00696 
00697   /// Send a message block chain,
00698   int send_message_block_chain (const ACE_Message_Block *message_block,
00699                                 size_t &bytes_transferred,
00700                                 ACE_Time_Value *max_wait_time = 0);
00701 
00702   /// Send a message block chain, assuming the lock is held
00703   int send_message_block_chain_i (const ACE_Message_Block *message_block,
00704                                   size_t &bytes_transferred,
00705                                   ACE_Time_Value *max_wait_time);
00706   /// Cache management
00707   int purge_entry (void);
00708 
00709   /// Cache management
00710   int make_idle (void);
00711 
00712   /// Cache management
00713   int update_transport (void);
00714 
00715   /// The timeout callback, invoked when any of the timers related to
00716   /// this transport expire.
00717   /**
00718    * @param current_time The current time as reported from the Reactor
00719    * @param act The Asynchronous Completion Token.  Currently it is
00720    *            interpreted as follows:
00721    * - If the ACT is the address of this->current_deadline_ the
00722    *   queueing timeout has expired and the queue should start
00723    *   flushing.
00724    *
00725    * @return Returns 0 if there are no problems, -1 if there is an
00726    *         error
00727    *
00728    * @todo In the future this function could be used to expire
00729    *       messages (oneways) that have been sitting for too long on
00730    *       the queue.
00731    */
00732   int handle_timeout (const ACE_Time_Value &current_time, const void* act);
00733 
00734   /// Accessor to recv_buffer_size_
00735   size_t recv_buffer_size (void) const;
00736 
00737   /// Accessor to sent_byte_count_
00738   size_t sent_byte_count (void) const;
00739 
00740   /// CodeSet Negotiation - Get the char codeset translator factory
00741   TAO_Codeset_Translator_Base *char_translator (void) const;
00742 
00743   /// CodeSet Negotiation - Get the wchar codeset translator factory
00744   TAO_Codeset_Translator_Base *wchar_translator (void) const;
00745 
00746   /// CodeSet negotiation - Set the char codeset translator factory
00747   void char_translator (TAO_Codeset_Translator_Base *);
00748 
00749   /// CodeSet negotiation - Set the wchar codeset translator factory
00750   void wchar_translator (TAO_Codeset_Translator_Base *);
00751 
00752   /// Use the Transport's codeset factories to set the translator for input
00753   /// and output CDRs.
00754   void assign_translators (TAO_InputCDR *, TAO_OutputCDR *);
00755 
00756   /// It is necessary to clear the codeset translator when a CDR stream
00757   /// is used for more than one GIOP message. This is required since the
00758   /// header must not be translated, whereas the body must be.
00759   void clear_translators (TAO_InputCDR *, TAO_OutputCDR *);
00760 
00761   /// Return true if the tcs has been set
00762   CORBA::Boolean is_tcs_set() const;
00763 
00764   /// Set the state of the first_request_ flag to 0
00765   void first_request_sent();
00766 
00767   /// Notify all the components inside a Transport when the underlying
00768   /// connection is closed.
00769   void send_connection_closed_notifications (void);
00770 
00771   /// Transport statistics
00772   TAO::Transport::Stats* stats (void) const;
00773 
00774 private:
00775 
00776   /// Helper method that returns the Transport Cache Manager.
00777   TAO::Transport_Cache_Manager &transport_cache_manager (void);
00778 
00779   /// Send some of the data in the queue.
00780   /**
00781    * As the outgoing data is drained this method is invoked to send as
00782    * much of the current message as possible.
00783    *
00784    * Returns 0 if there is more data to send, -1 if there was an error
00785    * and 1 if the message was completely sent.
00786    */
00787   int drain_queue (void);
00788 
00789   /// Implement drain_queue() assuming the lock is held
00790   int drain_queue_i (void);
00791 
00792   /// Check if there are messages pending in the queue
00793   /**
00794    * This version assumes that the lock is already held.  Use with
00795    * care!
00796    *
00797    * @return true if the queue is empty
00798    */
00799   bool queue_is_empty_i (void);
00800 
00801   /// A helper routine used in drain_queue_i()
00802   int drain_queue_helper (int &iovcnt, iovec iov[]);
00803 
00804   /// These classes need privileged access to:
00805   /// - schedule_output_i()
00806   /// - cancel_output_i()
00807   friend class TAO_Reactive_Flushing_Strategy;
00808   friend class TAO_Leader_Follower_Flushing_Strategy;
00809 
00810   /// Needs priveleged access to
00811   /// event_handler_i ()
00812   friend class TAO_Thread_Per_Connection_Handler;
00813 
00814   /// Schedule handle_output() callbacks
00815   int schedule_output_i (void);
00816 
00817   /// Cancel handle_output() callbacks
00818   int cancel_output_i (void);
00819 
00820   /// Cleanup the queue.
00821   /**
00822    * Exactly @a byte_count bytes have been sent, the queue must be
00823    * cleaned up as potentially several messages have been completely
00824    * sent out.
00825    * It leaves on head_ the next message to send out.
00826    */
00827   void cleanup_queue (size_t byte_count);
00828 
00829   /// Cleanup the complete queue
00830   void cleanup_queue_i ();
00831 
00832   /// Check if the buffering constraints have been reached
00833   int check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush);
00834 
00835   /// Send a synchronous message, i.e. block until the message is on
00836   /// the wire
00837   int send_synchronous_message_i (const ACE_Message_Block *message_block,
00838                                   ACE_Time_Value *max_wait_time);
00839 
00840   /// Send a reply message, i.e. do not block until the message is on
00841   /// the wire, but just return after adding them to the queue.
00842   int send_reply_message_i (const ACE_Message_Block *message_block,
00843                             ACE_Time_Value *max_wait_time);
00844 
00845   /// Send an asynchronous message, i.e. do not block until the message is on
00846   /// the wire
00847   int send_asynchronous_message_i (TAO_Stub *stub,
00848                                    const ACE_Message_Block *message_block,
00849                                    ACE_Time_Value *max_wait_time);
00850 
00851   /// A helper method used by send_synchronous_message_i() and
00852   /// send_reply_message_i(). Reusable code that could be used by both
00853   /// the methods.
00854   int send_synch_message_helper_i (TAO_Synch_Queued_Message &s,
00855                                    ACE_Time_Value *max_wait_time);
00856 
00857   /// Check if the flush timer is still pending
00858   int flush_timer_pending (void) const;
00859 
00860   /// The flush timer expired or was explicitly cancelled, mark it as
00861   /// not pending
00862   void reset_flush_timer (void);
00863 
00864   /// Print out error messages if the event handler is not valid
00865   void report_invalid_event_handler (const char *caller);
00866 
00867   /// Is invoked by handle_input operation. It consolidate message on
00868   /// top of incoming_message_stack.  The amount of missing data is
00869   /// known and recv operation copies data directly into message buffer,
00870   /// as much as a single recv-invocation provides.
00871   int handle_input_missing_data (TAO_Resume_Handle &rh,
00872                                  ACE_Time_Value *max_wait_time,
00873                                  TAO_Queued_Data *q_data);
00874 
00875   /// Is invoked by handle_input operation. It parses new messages from input stream
00876   /// or consolidates messages whose header has been partially read, the message
00877   /// size being unknown so far. It parses as much data as a single recv-invocation provides.
00878   int handle_input_parse_data (TAO_Resume_Handle &rh,
00879                                ACE_Time_Value *max_wait_time);
00880 
00881   /// Is invoked by handle_input_parse_data. Parses all messages remaining
00882   /// in @a message_block.
00883   int handle_input_parse_extra_messages (ACE_Message_Block &message_block);
00884 
00885   /// @return -1 error, otherwise 0
00886   int consolidate_enqueue_message (TAO_Queued_Data *qd);
00887 
00888   /// @return -1 error, otherwise 0
00889   int consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh);
00890 
00891   /*
00892    * Process the message that is in the head of the incoming queue.
00893    * If there are more messages in the queue, this method calls
00894    * this->notify_reactor () to wake up a thread
00895    * @retval -1 on error
00896    * @retval 0 if successfully processing enqueued messages
00897    * @retval 1 if no message present in queue
00898    */
00899   int process_queue_head (TAO_Resume_Handle &rh);
00900 
00901   /*
00902    * This call prepares a new handler for the notify call and sends a
00903    * notify () call to the reactor.
00904    */
00905   int notify_reactor (void);
00906 
00907   /// Assume the lock is held
00908   void send_connection_closed_notifications_i (void);
00909 
00910   /// Allocate a partial message block and store it in our
00911   /// partial_message_ data member.
00912   void allocate_partial_message_block (void);
00913 
00914   // Disallow copying and assignment.
00915   TAO_Transport (const TAO_Transport&);
00916   void operator= (const TAO_Transport&);
00917 
00918   /*
00919    * Specialization hook to add concrete private methods from
00920    * TAO's protocol implementation onto the base Transport class
00921    */
00922 
00923   //@@ TAO_TRANSPORT_SPL_PRIVATE_METHODS_ADD_HOOK
00924 
00925 protected:
00926 
00927   /// IOP protocol tag.
00928   CORBA::ULong const tag_;
00929 
00930   /// Global orbcore resource.
00931   TAO_ORB_Core * const orb_core_;
00932 
00933   /// Our entry in the cache. We don't own this. It is here for our
00934   /// convenience. We cannot just change things around.
00935   TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *cache_map_entry_;
00936 
00937   /// Strategy to decide whether multiple requests can be sent over the
00938   /// same connection or the connection is exclusive for a request.
00939   TAO_Transport_Mux_Strategy *tms_;
00940 
00941   /// Strategy for waiting for the reply after sending the request.
00942   TAO_Wait_Strategy *ws_;
00943 
00944   /// Use to check if bidirectional info has been synchronized with
00945   /// the peer.
00946   /**
00947    * Have we sent any info on bidirectional information or have we
00948    * received any info regarding making the connection served by this
00949    * transport bidirectional.
00950    * The flag is used as follows:
00951    * + We dont want to send the bidirectional context info more than
00952    *   once on the connection. Why? Waste of marshalling and
00953    *   demarshalling time on the client.
00954    * + On the server side -- once a client that has established the
00955    *   connection asks the server to use the connection both ways, we
00956    *   *dont* want the server to pack service info to the client. That
00957    *   is not allowed. We need a flag to prevent such a things from
00958    *   happening.
00959    *
00960    * The value of this flag will be 0 if the client sends info and 1
00961    * if the server receives the info.
00962    */
00963   int bidirectional_flag_;
00964 
00965   TAO::Connection_Role opening_connection_role_;
00966 
00967   /// Implement the outgoing data queue
00968   TAO_Queued_Message *head_;
00969   TAO_Queued_Message *tail_;
00970 
00971   /// Queue of the consolidated, incoming messages..
00972   TAO_Incoming_Message_Queue incoming_message_queue_;
00973 
00974   /// Stack of incoming fragments, consolidated messages
00975   /// are going to be enqueued in "incoming_message_queue_"
00976   TAO::Incoming_Message_Stack incoming_message_stack_;
00977 
00978   /// The queue will start draining no later than <queeing_deadline_>
00979   /// *if* the deadline is
00980   ACE_Time_Value current_deadline_;
00981 
00982   /// The timer ID
00983   long flush_timer_id_;
00984 
00985   /// The adapter used to receive timeout callbacks from the Reactor
00986   TAO_Transport_Timer transport_timer_;
00987 
00988   /// Lock that insures that activities that *might* use handler-related
00989   /// resources (such as a connection handler) get serialized.
00990   /**
00991    * This is an <code>ACE_Lock</code> that gets initialized from
00992    * @c TAO_ORB_Core::resource_factory()->create_cached_connection_lock().
00993    * This way, one can use a lock appropriate for the type of system, i.e.,
00994    * a null lock for single-threaded systems, and a real lock for
00995    * multi-threaded systems.
00996    */
00997   mutable ACE_Lock *handler_lock_;
00998 
00999   /// A unique identifier for the transport.
01000   /**
01001    * This never *never* changes over the lifespan, so we don't have to worry
01002    * about locking it.
01003    *
01004    * HINT: Protocol-specific transports that use connection handler
01005    * might choose to set this to the handle for their connection.
01006    */
01007   size_t id_;
01008 
01009   /// Used by the LRU, LFU and FIFO Connection Purging Strategies.
01010   unsigned long purging_order_;
01011 
01012   /// Size of the buffer received.
01013   size_t recv_buffer_size_;
01014 
01015   /// Number of bytes sent.
01016   size_t sent_byte_count_;
01017 
01018   /// Is this transport really connected or not. In case of oneways with
01019   /// SYNC_NONE Policy we don't wait until the connection is ready and we
01020   /// buffer the requests in this transport until the connection is ready
01021   bool is_connected_;
01022 
01023 private:
01024 
01025   /// @@Phil, I think it would be nice if we could think of a way to
01026   /// do the following.
01027   /// We have been trying to use the transport for marking about
01028   /// translator factories and such! IMHO this is a wrong encapulation
01029   /// ie. trying to populate the transport object with these
01030   /// details. We should probably have a class something like
01031   /// TAO_Message_Property or TAO_Message_Translator or whatever (I am
01032   /// sure you get the idea) and encapsulate all these
01033   /// details. Coupling these seems odd. if I have to be more cynical
01034   /// we can move this to the connection_handler and it may more sense
01035   /// with the DSCP stuff around there. Do you agree?
01036 
01037   /// Additional member values required to support codeset translation
01038   TAO_Codeset_Translator_Base *char_translator_;
01039   TAO_Codeset_Translator_Base *wchar_translator_;
01040 
01041   /// The tcs_set_ flag indicates that negotiation has occured and so the
01042   /// translators are correct, since a null translator is valid if both ends
01043   /// are using the same codeset, whatever that codeset might be.
01044   CORBA::Boolean tcs_set_;
01045 
01046   /// First_request_ is true until the first request is sent or received. This
01047   /// is necessary since codeset context information is necessary only on the
01048   /// first request. After that, the translators are fixed for the life of the
01049   /// connection.
01050   CORBA::Boolean first_request_;
01051 
01052   /// Holds the partial GIOP message (if there is one)
01053   ACE_Message_Block* partial_message_;
01054 
01055 #if TAO_HAS_SENDFILE == 1
01056   /// mmap()-based allocator used to allocator output CDR buffers.
01057   /**
01058    * If this pointer is non-zero, sendfile() will be used to send data
01059    * in a TAO_OutputCDR stream instance.
01060    */
01061   TAO_MMAP_Allocator * const mmap_allocator_;
01062 #endif  /* TAO_HAS_SENDFILE==1 */
01063 
01064 #if TAO_HAS_TRANSPORT_CURRENT == 1
01065   /// Statistics
01066   TAO::Transport::Stats* stats_;
01067 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
01068 
01069   /*
01070    * specialization hook to add class members from concrete
01071    * transport class onto the base transport class. Please
01072    * add any private members to this class *before* this hook.
01073    */
01074   //@@ TAO_TRANSPORT_SPL_DATA_MEMBERS_ADD_HOOK
01075 };
01076 
01077 /*
01078  * Hook to add external typedefs and specializations to
01079  * TAO's transport implementation.
01080  */
01081 
01082 //@@ TAO_TRANSPORT_SPL_EXTERN_ADD_HOOK
01083 
01084 #if TAO_HAS_TRANSPORT_CURRENT == 1
01085 namespace TAO
01086 {
01087   namespace Transport
01088   {
01089     /*
01090      * @class Stats
01091      *
01092      * @brief Used to collect stats on a transport.
01093      *
01094      * The base class in (potentialy) extensible hierarchy used to
01095      * specialize the information available for a specific protocol.
01096      *
01097      * This class is necessary for the implementation of the Transport
01098      * Current feature.
01099      *
01100      * <B>See Also:</B>
01101      *
01102      * https://svn.dre.vanderbilt.edu/viewvc/Middleware/trunk/TAO/docs/transport_current/index.html?revision=HEAD
01103      *
01104      */
01105     class TAO_Export Stats
01106     {
01107     public:
01108       Stats ();
01109       virtual ~Stats ();
01110 
01111       void messages_sent (size_t message_length);
01112       CORBA::LongLong messages_sent (void) const;
01113       CORBA::LongLong bytes_sent (void) const;
01114 
01115       void messages_received (size_t message_length);
01116       CORBA::LongLong messages_received (void) const;
01117       CORBA::LongLong bytes_received (void) const;
01118 
01119       void opened_since (const ACE_Time_Value& tv);
01120       const ACE_Time_Value& opened_since (void) const;
01121 
01122     private:
01123       // The bytes_rcvd_.samples_count() could have been used instead,
01124       // however there was a suspicion that 32 bits would be
01125       // insufficient.
01126       CORBA::LongLong messages_rcvd_;
01127 
01128       // The bytes_sent_.samples_count() could have been used instead,
01129       // however there was a suspicion that 32 bits would be
01130       // insufficient.
01131       CORBA::LongLong messages_sent_;
01132 
01133       ACE_Basic_Stats bytes_rcvd_;
01134       ACE_Basic_Stats bytes_sent_;
01135 
01136       ACE_Time_Value  opened_since_;
01137     };
01138   }
01139 }
01140 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
01141 
01142 TAO_END_VERSIONED_NAMESPACE_DECL
01143 
01144 #if defined (__ACE_INLINE__)
01145 # include "tao/Transport.inl"
01146 #endif /* __ACE_INLINE__ */
01147 
01148 #include /**/ "ace/post.h"
01149 
01150 #endif /* TAO_TRANSPORT_H */

Generated on Sun Jan 27 13:07:37 2008 for TAO by doxygen 1.3.6