Transport.h

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 
00003 //=============================================================================
00004 /**
00005  *  @file Transport.h
00006  *
00007  *  $Id: Transport.h 80603 2008-02-11 22:14:39Z johnc $
00008  *
00009  *  Define the interface for the Transport component in TAO's
00010  *  pluggable protocol framework.
00011  *
00012  *  @author  Fred Kuhns <fredk@cs.wustl.edu>
00013  */
00014 //=============================================================================
00015 
00016 #ifndef TAO_TRANSPORT_H
00017 #define TAO_TRANSPORT_H
00018 
00019 #include /**/ "ace/pre.h"
00020 
00021 #include "tao/Transport_Cache_Manager.h"
00022 
00023 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00024 # pragma once
00025 #endif /* ACE_LACKS_PRAGMA_ONCE */
00026 
00027 #include "tao/Transport_Timer.h"
00028 #include "tao/Incoming_Message_Queue.h"
00029 #include "tao/Incoming_Message_Stack.h"
00030 #include "ace/Time_Value.h"
00031 #include "ace/Basic_Stats.h"
00032 
00033 struct iovec;
00034 
00035 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00036 
00037 class TAO_ORB_Core;
00038 class TAO_Target_Specification;
00039 class TAO_Operation_Details;
00040 class TAO_Transport_Mux_Strategy;
00041 class TAO_Wait_Strategy;
00042 class TAO_Connection_Handler;
00043 class TAO_GIOP_Message_Base;
00044 class TAO_Codeset_Translator_Base;
00045 
00046 class TAO_Queued_Message;
00047 class TAO_Synch_Queued_Message;
00048 class TAO_Resume_Handle;
00049 class TAO_Stub;
00050 class TAO_MMAP_Allocator;
00051 
00052 namespace TAO
00053 {
00054   /**
00055    * @note Should this be in TAO namespace. Seems like a candidate
00056    * that should be in the transport
00057    */
00058   enum Connection_Role
00059     {
00060       TAO_UNSPECIFIED_ROLE = 0,
00061       TAO_SERVER_ROLE = 1,
00062       TAO_CLIENT_ROLE = 2
00063     };
00064 
00065   namespace Transport
00066   {
00067     /// Transport-level statistics. Initially introduced to support
00068     /// the "Transport Current" functionality.
00069     class Stats;
00070   }
00071 }
00072 
00073 /*
00074  * Specialization hook for the TAO's transport implementation.
00075  */
00076 //@@ TAO_TRANSPORT_SPL_INCLUDE_FORWARD_DECL_ADD_HOOK
00077 
00078 /**
00079  * @class TAO_Transport
00080  *
00081  * @brief Generic definitions for the Transport class.
00082  *
00083  * The transport object is created in the Service handler
00084  * constructor and deleted in the Service Handler's destructor!!
00085  *
00086  * The main responsability of a Transport object is to encapsulate a
00087  * connection, and provide a transport independent way to send and
00088  * receive data.  Since TAO is heavily based on the Reactor for all if
00089  * not all its I/O the Transport class is usually implemented with a
00090  * helper Connection Handler that adapts the generic Transport
00091  * interface to the Reactor types.
00092  *
00093  * <H3>The outgoing data path:</H3>
00094  *
00095  * One of the responsibilities of the TAO_Transport class is to send
00096  * out GIOP messages as efficiently as possible.  In most cases
00097  * messages are put out in FIFO order, the transport object will put
00098  * out the message using a single system call and return control to
00099  * the application.  However, for oneways and AMI requests it may be
00100  * more efficient (or required if the SYNC_NONE policy is in effect)
00101  * to queue the messages until a large enough data set is available.
00102  * Another reason to queue is that some applications cannot block for
00103  * I/O, yet they want to send messages so large that a single write()
00104  * operation would not be able to cope with them.  In such cases we
00105  * need to queue the data and use the Reactor to drain the queue.
00106  *
00107  * Therefore, the Transport class may need to use a queue to
00108  * temporarily hold the messages, and, in some configurations, it may
00109  * need to use the Reactor to concurrently drain such queues.
00110  *
00111  * <H4>Out of order messages:</H4> TAO provides explicit policies to
00112  * send 'urgent' messages.  Such messages may put at the head of the
00113  * queue. However, they cannot be sent immediately because the
00114  * transport may already be sending another message in a reactive
00115  * fashion.
00116  *
00117  * Consequently, the Transport must also know if the head of the queue
00118  * has been partially sent.  In that case new messages can only follow
00119  * the head. Only once the head is completely sent we can start
00120  * sending new messages.
00121  *
00122  * <H4>Waiting threads:</H4> One or more threads can be blocked
00123  * waiting for the connection to completely send the message.
00124  * The thread should return as soon as its message has been sent, so a
00125  * per-thread condition is required.  This suggest that simply using a
00126  * ACE_Message_Queue would not be enough:  there is a significant
00127  * amount of ancillary information, to keep on each message that the
00128  * Message_Block class does not provide room for.
00129  *
00130  * Blocking I/O is still attractive for some applications.  First, my
00131  * eliminating the Reactor overhead performance is improved when
00132  * sending large blocks of data.  Second, using the Reactor to send
00133  * out data opens the door for nested upcalls, yet some applications
00134  * cannot deal with the reentrancy issues in this case.
00135  *
00136  * <H4>Timeouts:</H4> Some or all messages could have a timeout period
00137  * attached to them.  The timeout source could either be some
00138  * high-level policy or maybe some strategy to prevent denial of
00139  * service attacks.  In any case the timeouts are per-message, and
00140  * later messages could have shorter timeouts.
00141  * In fact, some kind of scheduling (such as EDF) could be required in
00142  * a few applications.
00143  *
00144  * <H4>Conclusions:</H4> The outgoing data path consist in several
00145  * components:
00146  *
00147  * - A queue of pending messages
00148  * - A message currently being transmitted
00149  * - A per-transport 'send strategy' to choose between blocking on
00150  *   write, blocking on the reactor or blockin on leader/follower.
00151  * - A per-message 'waiting object'
00152  * - A per-message timeout
00153  *
00154  * The Transport object provides a single method to send request
00155  * messages (send_request_message ()).
00156  *
00157  * <H3>The incoming data path:</H3>
00158  *
00159  * One of the main responsibilities of the transport is to read and
00160  * process the incoming GIOP message as quickly and efficiently as
00161  * possible. There are other forces that needs to be given due
00162  * consideration. They are
00163  *  - Multiple threads should be able to traverse along the same data
00164  *    path but should not be able to read from the same handle at the
00165  *    same time ie. the handle should not be shared between threads at
00166  *    any instant.
00167  *  - Reads on the handle could give one or more messages.
00168  *  - Minimise locking and copying overhead when trying to attack the
00169  *    above.
00170  *
00171  * <H3> Parsing messages (GIOP) & processing the message:</H3>
00172  *
00173  * The messages should be checked for validity and the right
00174  * information should be sent to the higher layer for processing. The
00175  * process of doing a sanity check and preparing the messages for the
00176  * higher layers of the ORB are done by the messaging protocol.
00177  *
00178  * <H3> Design forces and Challenges </H3>
00179  *
00180  * To keep things as efficient as possible for medium sized requests,
00181  * it would be good to minimise data copying and locking along the
00182  * incoming path ie. from the time of reading the data from the handle
00183  * to the application. We achieve this by creating a buffer on stack
00184  * and reading the data from the handle into the buffer. We then pass
00185  * the same data block (the buffer is encapsulated into a data block)
00186  * to the higher layers of the ORB. The problems stem from the
00187  * following
00188  *  (a) Data is bigger than the buffer that we have on stack
00189  *  (b) Transports like TCP do not guarantee availability of the whole
00190  *      chunk of data in one shot. Data could trickle in byte by byte.
00191  *  (c) Single read gives multiple messages
00192  *
00193  * We solve the problems as follows
00194  *
00195  *   (a) First do a read with the buffer on stack. Query the underlying
00196  *       messaging object whether the message has any incomplete
00197  *       portion. If so, data will be copied into new buffer being able
00198  *       to hold full message and is queued; succeeding events will read
00199  *       data from socket and write directly into this buffer.
00200  *       Otherwise, if if the message in local buffer is complete, we free
00201  *       the handle and then send the message to the higher layers of the
00202  *       ORB for processing.
00203  *
00204  *   (b) If buffer with incomplete message has been enqueued, while trying
00205  *       to do the above, the reactor will call us back when the handle
00206  *       becomes read ready. The read-operation will copy data directly
00207  *       into the enqueued buffer.  If the message has bee read completely
00208  *       the message is sent to the higher layers of the ORB for processing.
00209  *
00210  *   (c) If we get multiple messages (possible if the client connected
00211  *       to the server sends oneways or AMI requests), we parse and
00212  *       split the messages. Every message is put in the queue. Once
00213  *       the messages are queued, the thread picks up one message to
00214  *       send to the higher layers of the ORB. Before doing that, if
00215  *       it finds more messages, it sends a notify to the reactor
00216  *       without resuming the handle. The next thread picks up a
00217  *       message from the queue and processes that. Once the queue
00218  *       is drained the last thread resumes the handle.
00219  *
00220  * <H3> Sending Replies </H3>
00221  *
00222  * We could use the outgoing path of the ORB to send replies. This
00223  * would allow us to reuse most of the code in the outgoing data
00224  * path. We were doing this till TAO-1.2.3. We run in to
00225  * problems. When writing the reply the ORB gets flow controlled, and the
00226  * ORB tries to flush the message by going into the reactor. This
00227  * resulted in unnecessary nesting. The thread that gets into the
00228  * Reactor could potentially handle other messages (incoming or
00229  * outgoing) and the stack starts growing leading to crashes.
00230  *
00231  * <H4> Solution to the nesting problem </H4>
00232  *
00233  * The solution that we (plan to) adopt is pretty straight
00234  * forward. The thread sending replies will not block to send the
00235  * replies but queue the replies and return to the Reactor. (Note the
00236  * careful usages of the terms "blocking in the Reactor" as opposed to
00237  * "return back to the Reactor".
00238  *
00239  *
00240  * <B>See Also:</B>
00241  *
00242  * https://svn.dre.vanderbilt.edu/viewvc/Middleware/trunk/TAO/docs/pluggable_protocols/index.html?revision=HEAD
00243  *
00244  */
00245 class TAO_Export TAO_Transport
00246 {
00247 public:
00248 
00249   /// Default creator, requires the tag value be supplied.
00250   TAO_Transport (CORBA::ULong tag,
00251                  TAO_ORB_Core *orb_core,
00252                  size_t input_cdr_size = ACE_CDR::DEFAULT_BUFSIZE);
00253 
00254   /// Destructor
00255   virtual ~TAO_Transport (void);
00256 
00257   /// Return the protocol tag.
00258   /**
00259    * The OMG assigns unique tags (a 32-bit unsigned number) to each
00260    * protocol. New protocol tags can be obtained free of charge from
00261    * the OMG, check the documents in corbafwd.h for more details.
00262    */
00263   CORBA::ULong tag (void) const;
00264 
00265   /// Access the ORB that owns this connection.
00266   TAO_ORB_Core *orb_core (void) const;
00267 
00268   /// Get the TAO_Tranport_Mux_Strategy used by this object.
00269   /**
00270    * The role of the TAO_Transport_Mux_Strategy is described in more
00271    * detail in that class' documentation.  Enough is to say that the
00272    * class is used to control how many threads can have pending
00273    * requests over the same connection. Multiplexing multiple threads
00274    * over the same connection conserves resources and is almost
00275    * required for AMI, but having only one pending request per
00276    * connection is more efficient and reduces the possibilities of
00277    * priority inversions.
00278    */
00279   TAO_Transport_Mux_Strategy *tms (void) const;
00280 
00281   /// Return the TAO_Wait_Strategy used by this object.
00282   /**
00283    * The role of the TAO_Wait_Strategy is described in more detail in
00284    * that class' documentation. Enough is to say that the ORB can wait
00285    * for a reply blocking on read(), using the Reactor to wait for
00286    * multiple events concurrently or using the Leader/Followers
00287    * protocol.
00288    */
00289   TAO_Wait_Strategy *wait_strategy (void) const;
00290 
00291   /// Callback method to reactively drain the outgoing data queue
00292   int handle_output (ACE_Time_Value *max_wait_time);
00293 
00294   /// Get the bidirectional flag
00295   int bidirectional_flag (void) const;
00296 
00297   /// Set the bidirectional flag
00298   void bidirectional_flag (int flag);
00299 
00300   /// Set the Cache Map entry
00301   void cache_map_entry (TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *entry);
00302 
00303   /// Get the Cache Map entry
00304   TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *cache_map_entry (void);
00305 
00306   /// Set and Get the identifier for this transport instance.
00307   /**
00308    * If not set, this will return an integer representation of
00309    * the <code>this</code> pointer for the instance on which
00310    * it's called.
00311    */
00312   size_t id (void) const;
00313   void id (size_t id);
00314 
00315   /**
00316    * Methods dealing with the role of the connection, e.g., CLIENT or SERVER.
00317    * See CORBA 2.6 Specification, Section 15.5.1 for origin of definitions.
00318    */
00319   TAO::Connection_Role opened_as (void) const;
00320   void opened_as (TAO::Connection_Role);
00321 
00322   /// Get and Set the purging order. The purging strategy uses the set
00323   /// version to set the purging order.
00324   unsigned long purging_order (void) const;
00325   void purging_order(unsigned long value);
00326 
00327   /// Check if there are messages pending in the queue
00328   /**
00329    * @return true if the queue is empty
00330    */
00331   bool queue_is_empty (void);
00332 
00333   /// Added event handler to the handlers set.
00334   /**
00335    * Called by the cache when the cache is closing.
00336    *
00337    * @param handlers The TAO_Connection_Handler_Set into which the
00338    *        transport should place its handler
00339    */
00340   void provide_handler (TAO::Connection_Handler_Set &handlers);
00341 
00342   /// Add event handlers corresponding to transports that have RW wait
00343   /// strategy to the handlers set.
00344   /**
00345    * Called by the cache when the ORB is shuting down.
00346    *
00347    * @param handlers The TAO_Connection_Handler_Set into which the
00348    *        transport should place its handler if the transport has RW
00349    *        strategy on.
00350    *
00351    * @return true indicates a handler was added to the handler set.
00352    *         false indocates that the transport did not have a
00353    *         blockable handler that could be added.
00354    */
00355   bool provide_blockable_handler (TAO::Connection_Handler_Set &handlers);
00356 
00357   /// Register the handler with the reactor.
00358   /**
00359    * Register the handler with the reactor. This method is used by the
00360    * Wait_On_Reactor strategy. The transport must register its event
00361    * handler with the ORB's Reactor.
00362    *
00363    * @todo I think this method is pretty much useless, the
00364    * connections are *always* registered with the Reactor, except in
00365    * thread-per-connection mode.  In that case putting the connection
00366    * in the Reactor would produce unpredictable results anyway.
00367    */
00368   virtual int register_handler (void);
00369 
00370   /// Write the complete Message_Block chain to the connection.
00371   /**
00372    * This method serializes on handler_lock_, guaranteeing that only
00373    * thread can execute it on the same instance concurrently.
00374    *
00375    * Often the implementation simply forwards the arguments to the
00376    * underlying ACE_Svc_Handler class. Using the code factored out
00377    * into ACE.
00378    *
00379    * Be careful with protocols that perform non-trivial
00380    * transformations of the data, such as SSLIOP or protocols that
00381    * compress the stream.
00382    *
00383    * @param iov contains the data that must be sent.
00384    *
00385    * @param timeout is the maximum time that the application is
00386    * willing to wait for the data to be sent, useful in platforms that
00387    * implement timed writes.
00388    * The timeout value is obtained from the policies set by the
00389    * application.
00390    *
00391    * @param bytes_transferred should return the total number of bytes
00392    * successfully transferred before the connection blocked.  This is
00393    * required because in some platforms and/or protocols multiple
00394    * system calls may be required to send the chain of message
00395    * blocks.  The first few calls can work successfully, but the final
00396    * one can fail or signal a flow control situation (via EAGAIN).
00397    * In this case the ORB expects the function to return -1, errno to
00398    * be appropriately set and this argument to return the number of
00399    * bytes already on the OS I/O subsystem.
00400    *
00401    * This call can also fail if the transport instance is no longer
00402    * associated with a connection (e.g., the connection handler closed
00403    * down).  In that case, it returns -1 and sets errno to
00404    * <code>ENOENT</code>.
00405    */
00406   virtual ssize_t send (iovec *iov,
00407                         int iovcnt,
00408                         size_t &bytes_transferred,
00409                         const ACE_Time_Value *timeout = 0) = 0;
00410 
00411 #if TAO_HAS_SENDFILE == 1
00412   /// Send data through zero-copy write mechanism, if available.
00413   /**
00414    * This method sends the data in the I/O vector through the platform
00415    * sendfile() function to perform a zero-copy write, if available.
00416    * Otherwise, the default fallback implementation simply delegates
00417    * to the TAO_Transport::send() method.
00418    *
00419    * @note This method is best used when sending very large blocks of
00420    *       data.
00421    */
00422   virtual ssize_t sendfile (TAO_MMAP_Allocator * allocator,
00423                             iovec * iov,
00424                             int iovcnt,
00425                             size_t &bytes_transferred,
00426                             ACE_Time_Value const * timeout = 0);
00427 #endif  /* TAO_HAS_SENDFILE==1 */
00428 
00429 
00430   /// Read len bytes from into buf.
00431   /**
00432    * This method serializes on handler_lock_, guaranteeing that only
00433    * thread can execute it on the same instance concurrently.
00434    *
00435    * @param buffer ORB allocated buffer where the data should be
00436    * @@ The ACE_Time_Value *s is just a place holder for now.  It is
00437    * not clear this this is the best place to specify this.  The actual
00438    * timeout values will be kept in the Policies.
00439    */
00440   virtual ssize_t recv (char *buffer,
00441                         size_t len,
00442                         const ACE_Time_Value *timeout = 0) = 0;
00443 
00444   /**
00445    * @name Control connection lifecycle
00446    *
00447    * These methods are routed through the TMS object. The TMS
00448    * strategies implement them correctly.
00449    */
00450   //@{
00451 
00452   /// Request has been just sent, but the reply is not received. Idle
00453   /// the transport now.
00454   bool idle_after_send (void);
00455 
00456   /// Request is sent and the reply is received. Idle the transport
00457   /// now.
00458   bool idle_after_reply (void);
00459 
00460   /// Call the implementation method after obtaining the lock.
00461   virtual void close_connection (void);
00462 
00463   //@}
00464 
00465   /** @name Template methods
00466    *
00467    * The Transport class uses the Template Method Pattern to implement
00468    * the protocol specific functionality.
00469    * Implementors of a pluggable protocol should override the
00470    * following methods with the semantics documented below.
00471    */
00472   /**
00473    * Initialising the messaging object. This would be used by the
00474    * connector side. On the acceptor side the connection handler
00475    * would take care of the messaging objects.
00476    */
00477   void messaging_init (TAO_GIOP_Message_Version const &version);
00478 
00479   /// Extracts the list of listen points from the @a cdr stream. The
00480   /// list would have the protocol specific details of the
00481   /// ListenPoints
00482   virtual int tear_listen_point_list (TAO_InputCDR &cdr);
00483 
00484   /// Hooks that can be overridden in concrete transports.
00485   /**
00486    * These hooks are invoked just after connection establishment (or
00487    * after a connection is fetched from cache). The
00488    * return value signifies whether the invoker should proceed  with
00489    * post connection establishment activities. Protocols like SSLIOP
00490    * need this to verify whether connections already established have
00491    * valid certificates. There are no pre_connect_hooks () since the
00492    * transport doesn't exist before a connection establishment. :-)
00493    *
00494    * @note The methods are not made const with a reason.
00495    */
00496   virtual bool post_connect_hook (void);
00497 
00498   /// Memory management routines.
00499   /*
00500    * Forwards to event handler.
00501    */
00502   ACE_Event_Handler::Reference_Count add_reference (void);
00503   ACE_Event_Handler::Reference_Count remove_reference (void);
00504 
00505   /// Return the messaging object that is used to format the data that
00506   /// needs to be sent.
00507   TAO_GIOP_Message_Base * messaging_object (void);
00508 
00509   /** @name Template methods
00510    *
00511    * The Transport class uses the Template Method Pattern to implement
00512    * the protocol specific functionality.
00513    * Implementors of a pluggable protocol should override the
00514    * following methods with the semantics documented below.
00515    */
00516   //@{
00517 
00518   /// Return the event handler used to receive notifications from the
00519   /// Reactor.
00520   /**
00521    * Normally a concrete TAO_Transport object has-a ACE_Event_Handler
00522    * member that functions as an adapter between the ACE_Reactor
00523    * framework and the TAO pluggable protocol framework.
00524    * In all the protocols implemented so far this role is fullfilled
00525    * by an instance of ACE_Svc_Handler.
00526    *
00527    * @todo Since we only use a limited functionality of
00528    * ACE_Svc_Handler we could probably implement a generic
00529    * adapter class (TAO_Transport_Event_Handler or something), this
00530    * will reduce footprint and simplify the process of implementing a
00531    * pluggable protocol.
00532    *
00533    * @todo This method has to be renamed to event_handler()
00534    */
00535   virtual ACE_Event_Handler * event_handler_i (void) = 0;
00536 
00537   /// Is this transport really connected
00538   bool is_connected (void) const;
00539 
00540   /// Perform all the actions when this transport get opened
00541   bool post_open (size_t id);
00542 
00543   /// do what needs to be done when closing the transport
00544   void pre_close (void);
00545 
00546   /// Get the connection handler for this transport
00547   TAO_Connection_Handler * connection_handler (void);
00548 
00549   /// Accessor for the output CDR stream
00550   TAO_OutputCDR &out_stream (void);
00551 
00552   /*
00553    * Specialization hook to add public methods from
00554    * concrete transport implementations to TAO's transport
00555    * class
00556    */
00557   //@@ TAO_TRANSPORT_SPL_PUBLIC_METHODS_ADD_HOOK
00558 
00559 protected:
00560 
00561   virtual TAO_Connection_Handler * connection_handler_i (void) = 0;
00562 
00563 public:
00564 
00565   /// This is a request for the transport object to write a
00566   /// LocateRequest header before it is sent out.
00567   int generate_locate_request (TAO_Target_Specification &spec,
00568                                TAO_Operation_Details &opdetails,
00569                                TAO_OutputCDR &output);
00570 
00571   /// This is a request for the transport object to write a request
00572   /// header before it sends out the request
00573   virtual int generate_request_header (TAO_Operation_Details &opd,
00574                                        TAO_Target_Specification &spec,
00575                                        TAO_OutputCDR &msg);
00576 
00577   /// Recache ourselves in the cache
00578   int recache_transport (TAO_Transport_Descriptor_Interface* desc);
00579 
00580   /// Callback to read incoming data
00581   /**
00582    * The ACE_Event_Handler adapter invokes this method as part of its
00583    * handle_input() operation.
00584    *
00585    * @todo the method name is confusing! Calling it handle_input()
00586    * would probably make things easier to understand and follow!
00587    *
00588    * Once a complete message is read the Transport class delegates on
00589    * the Messaging layer to invoke the right upcall (on the server) or
00590    * the TAO_Reply_Dispatcher (on the client side).
00591    *
00592    * @param max_wait_time In some cases the I/O is synchronous, e.g. a
00593    * thread-per-connection server or when Wait_On_Read is enabled.  In
00594    * those cases a maximum read time can be specified.
00595    */
00596   virtual int handle_input (TAO_Resume_Handle &rh,
00597                             ACE_Time_Value *max_wait_time = 0);
00598 
00599   enum TAO_Message_Semantics
00600     {
00601       TAO_ONEWAY_REQUEST = 0,
00602       TAO_TWOWAY_REQUEST = 1,
00603       TAO_REPLY
00604     };
00605 
00606   /// Prepare the waiting and demuxing strategy to receive a reply for
00607   /// a new request.
00608   /**
00609    * Preparing the ORB to receive the reply only once the request is
00610    * completely sent opens the system to some subtle race conditions:
00611    * suppose the ORB is running in a multi-threaded configuration,
00612    * thread A makes a request while thread B is using the Reactor to
00613    * process all incoming requests.
00614    * Thread A could be implemented as follows:
00615    * 1) send the request
00616    * 2) setup the ORB to receive the reply
00617    * 3) wait for the request
00618    *
00619    * but in this case thread B may receive the reply between step (1)
00620    * and (2), and drop it as an invalid or unexpected message.
00621    * Consequently the correct implementation is:
00622    * 1) setup the ORB to receive the reply
00623    * 2) send the request
00624    * 3) wait for the reply
00625    *
00626    * The following method encapsulates this idiom.
00627    *
00628    * @todo This is generic code, it should be factored out into the
00629    * Transport class.
00630    */
00631   // @nolock b/c this calls send_or_buffer
00632   virtual int send_request (TAO_Stub *stub,
00633                             TAO_ORB_Core *orb_core,
00634                             TAO_OutputCDR &stream,
00635                             TAO_Message_Semantics message_semantics,
00636                             ACE_Time_Value *max_time_wait) = 0;
00637 
00638   /// This method formats the stream and then sends the message on the
00639   /// transport.
00640   /**
00641    * Once the ORB is prepared to receive a reply (see send_request()
00642    * above), and all the arguments have been marshaled the CDR stream
00643    * must be 'formatted', i.e. the message_size field in the GIOP
00644    * header can finally be set to the proper value.
00645    *
00646    */
00647   virtual int send_message (TAO_OutputCDR &stream,
00648                             TAO_Stub *stub = 0,
00649                             TAO_Message_Semantics message_semantics = TAO_Transport::TAO_TWOWAY_REQUEST,
00650                             ACE_Time_Value *max_time_wait = 0) = 0;
00651 
00652   /// Sent the contents of @a message_block
00653   /**
00654    * @param stub The object reference used for this operation, useful
00655    *             to obtain the current policies.
00656    * @param message_semantics If this is set to TAO_TWO_REQUEST
00657    *        this method will block until the operation is completely
00658    *        written on the wire. If it is set to other values this
00659    *        operation could return.
00660    * @param message_block The CDR encapsulation of the GIOP message
00661    *             that must be sent.  The message may consist of
00662    *             multiple Message Blocks chained through the cont()
00663    *             field.
00664    * @param max_wait_time The maximum time that the operation can
00665    *             block, used in the implementation of timeouts.
00666    */
00667   virtual int send_message_shared (TAO_Stub *stub,
00668                                    TAO_Message_Semantics message_semantics,
00669                                    const ACE_Message_Block *message_block,
00670                                    ACE_Time_Value *max_wait_time);
00671 
00672 protected:
00673 
00674   /// Process the message by sending it to the higher layers of the
00675   /// ORB.
00676   int process_parsed_messages (TAO_Queued_Data *qd,
00677                                TAO_Resume_Handle &rh);
00678 
00679   /// Implement send_message_shared() assuming the handler_lock_ is
00680   /// held.
00681   int send_message_shared_i (TAO_Stub *stub,
00682                              TAO_Message_Semantics message_semantics,
00683                              const ACE_Message_Block *message_block,
00684                              ACE_Time_Value *max_wait_time);
00685 
00686   /// Queue a message for @a message_block
00687   /// @param max_wait_time The maximum time that the operation can
00688   ///            block, used in the implementation of timeouts.
00689   /// @param back If true, the message will be pushed to the back of the queue.
00690   ///        If false, the message will be pushed to the front of the queue.
00691   int queue_message_i (const ACE_Message_Block *message_block,
00692                        ACE_Time_Value *max_wait_time, bool back=true);
00693 
00694 public:
00695   /// Format and queue a message for @a stream
00696   /// @param max_wait_time The maximum time that the operation can
00697   ///            block, used in the implementation of timeouts.
00698   int format_queue_message (TAO_OutputCDR &stream,
00699                             ACE_Time_Value *max_wait_time);
00700 
00701   /// Send a message block chain,
00702   int send_message_block_chain (const ACE_Message_Block *message_block,
00703                                 size_t &bytes_transferred,
00704                                 ACE_Time_Value *max_wait_time = 0);
00705 
00706   /// Send a message block chain, assuming the lock is held
00707   int send_message_block_chain_i (const ACE_Message_Block *message_block,
00708                                   size_t &bytes_transferred,
00709                                   ACE_Time_Value *max_wait_time);
00710   /// Cache management
00711   int purge_entry (void);
00712 
00713   /// Cache management
00714   int make_idle (void);
00715 
00716   /// Cache management
00717   int update_transport (void);
00718 
00719   /// The timeout callback, invoked when any of the timers related to
00720   /// this transport expire.
00721   /**
00722    * @param current_time The current time as reported from the Reactor
00723    * @param act The Asynchronous Completion Token.  Currently it is
00724    *            interpreted as follows:
00725    * - If the ACT is the address of this->current_deadline_ the
00726    *   queueing timeout has expired and the queue should start
00727    *   flushing.
00728    *
00729    * @return Returns 0 if there are no problems, -1 if there is an
00730    *         error
00731    *
00732    * @todo In the future this function could be used to expire
00733    *       messages (oneways) that have been sitting for too long on
00734    *       the queue.
00735    */
00736   int handle_timeout (const ACE_Time_Value &current_time, const void* act);
00737 
00738   /// Accessor to recv_buffer_size_
00739   size_t recv_buffer_size (void) const;
00740 
00741   /// Accessor to sent_byte_count_
00742   size_t sent_byte_count (void) const;
00743 
00744   /// CodeSet Negotiation - Get the char codeset translator factory
00745   TAO_Codeset_Translator_Base *char_translator (void) const;
00746 
00747   /// CodeSet Negotiation - Get the wchar codeset translator factory
00748   TAO_Codeset_Translator_Base *wchar_translator (void) const;
00749 
00750   /// CodeSet negotiation - Set the char codeset translator factory
00751   void char_translator (TAO_Codeset_Translator_Base *);
00752 
00753   /// CodeSet negotiation - Set the wchar codeset translator factory
00754   void wchar_translator (TAO_Codeset_Translator_Base *);
00755 
00756   /// Use the Transport's codeset factories to set the translator for input
00757   /// and output CDRs.
00758   void assign_translators (TAO_InputCDR *, TAO_OutputCDR *);
00759 
00760   /// It is necessary to clear the codeset translator when a CDR stream
00761   /// is used for more than one GIOP message. This is required since the
00762   /// header must not be translated, whereas the body must be.
00763   void clear_translators (TAO_InputCDR *, TAO_OutputCDR *);
00764 
00765   /// Return true if the tcs has been set
00766   CORBA::Boolean is_tcs_set() const;
00767 
00768   /// Set the state of the first_request_ flag to 0
00769   void first_request_sent();
00770 
00771   /// Notify all the components inside a Transport when the underlying
00772   /// connection is closed.
00773   void send_connection_closed_notifications (void);
00774 
00775   /// Transport statistics
00776   TAO::Transport::Stats* stats (void) const;
00777 
00778 private:
00779 
00780   /// Helper method that returns the Transport Cache Manager.
00781   TAO::Transport_Cache_Manager &transport_cache_manager (void);
00782 
00783   /// Send some of the data in the queue.
00784   /**
00785    * As the outgoing data is drained this method is invoked to send as
00786    * much of the current message as possible.
00787    *
00788    * Returns 0 if there is more data to send, -1 if there was an error
00789    * and 1 if the message was completely sent.
00790    */
00791   int drain_queue (ACE_Time_Value *max_wait_time);
00792 
00793   /// Implement drain_queue() assuming the lock is held
00794   int drain_queue_i (ACE_Time_Value *max_wait_time);
00795 
00796   /// Check if there are messages pending in the queue
00797   /**
00798    * This version assumes that the lock is already held.  Use with
00799    * care!
00800    *
00801    * @return true if the queue is empty
00802    */
00803   bool queue_is_empty_i (void);
00804 
00805   /// A helper routine used in drain_queue_i()
00806   int drain_queue_helper (int &iovcnt, iovec iov[], ACE_Time_Value *max_wait_time);
00807 
00808   /// These classes need privileged access to:
00809   /// - schedule_output_i()
00810   /// - cancel_output_i()
00811   friend class TAO_Reactive_Flushing_Strategy;
00812   friend class TAO_Leader_Follower_Flushing_Strategy;
00813 
00814   /// Needs priveleged access to
00815   /// event_handler_i ()
00816   friend class TAO_Thread_Per_Connection_Handler;
00817 
00818   /// Schedule handle_output() callbacks
00819   int schedule_output_i (void);
00820 
00821   /// Cancel handle_output() callbacks
00822   int cancel_output_i (void);
00823 
00824   /// Cleanup the queue.
00825   /**
00826    * Exactly @a byte_count bytes have been sent, the queue must be
00827    * cleaned up as potentially several messages have been completely
00828    * sent out.
00829    * It leaves on head_ the next message to send out.
00830    */
00831   void cleanup_queue (size_t byte_count);
00832 
00833   /// Cleanup the complete queue
00834   void cleanup_queue_i ();
00835 
00836   /// Check if the buffering constraints have been reached
00837   int check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush);
00838 
00839   /// Send a synchronous message, i.e. block until the message is on
00840   /// the wire
00841   int send_synchronous_message_i (const ACE_Message_Block *message_block,
00842                                   ACE_Time_Value *max_wait_time);
00843 
00844   /// Send a reply message, i.e. do not block until the message is on
00845   /// the wire, but just return after adding them to the queue.
00846   int send_reply_message_i (const ACE_Message_Block *message_block,
00847                             ACE_Time_Value *max_wait_time);
00848 
00849   /// Send an asynchronous message, i.e. do not block until the message is on
00850   /// the wire
00851   int send_asynchronous_message_i (TAO_Stub *stub,
00852                                    const ACE_Message_Block *message_block,
00853                                    ACE_Time_Value *max_wait_time);
00854 
00855   /// A helper method used by send_synchronous_message_i() and
00856   /// send_reply_message_i(). Reusable code that could be used by both
00857   /// the methods.
00858   int send_synch_message_helper_i (TAO_Synch_Queued_Message &s,
00859                                    ACE_Time_Value *max_wait_time);
00860 
00861   /// Check if the flush timer is still pending
00862   int flush_timer_pending (void) const;
00863 
00864   /// The flush timer expired or was explicitly cancelled, mark it as
00865   /// not pending
00866   void reset_flush_timer (void);
00867 
00868   /// Print out error messages if the event handler is not valid
00869   void report_invalid_event_handler (const char *caller);
00870 
00871   /// Is invoked by handle_input operation. It consolidate message on
00872   /// top of incoming_message_stack.  The amount of missing data is
00873   /// known and recv operation copies data directly into message buffer,
00874   /// as much as a single recv-invocation provides.
00875   int handle_input_missing_data (TAO_Resume_Handle &rh,
00876                                  ACE_Time_Value *max_wait_time,
00877                                  TAO_Queued_Data *q_data);
00878 
00879   /// Is invoked by handle_input operation. It parses new messages from input stream
00880   /// or consolidates messages whose header has been partially read, the message
00881   /// size being unknown so far. It parses as much data as a single recv-invocation provides.
00882   int handle_input_parse_data (TAO_Resume_Handle &rh,
00883                                ACE_Time_Value *max_wait_time);
00884 
00885   /// Is invoked by handle_input_parse_data. Parses all messages remaining
00886   /// in @a message_block.
00887   int handle_input_parse_extra_messages (ACE_Message_Block &message_block);
00888 
00889   /// @return -1 error, otherwise 0
00890   int consolidate_enqueue_message (TAO_Queued_Data *qd);
00891 
00892   /// @return -1 error, otherwise 0
00893   int consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh);
00894 
00895   /*
00896    * Process the message that is in the head of the incoming queue.
00897    * If there are more messages in the queue, this method calls
00898    * this->notify_reactor () to wake up a thread
00899    * @retval -1 on error
00900    * @retval 0 if successfully processing enqueued messages
00901    * @retval 1 if no message present in queue
00902    */
00903   int process_queue_head (TAO_Resume_Handle &rh);
00904 
00905   /*
00906    * This call prepares a new handler for the notify call and sends a
00907    * notify () call to the reactor.
00908    */
00909   int notify_reactor (void);
00910 
00911   /// Assume the lock is held
00912   void send_connection_closed_notifications_i (void);
00913 
00914   /// Allocate a partial message block and store it in our
00915   /// partial_message_ data member.
00916   void allocate_partial_message_block (void);
00917 
00918   // Disallow copying and assignment.
00919   TAO_Transport (const TAO_Transport&);
00920   void operator= (const TAO_Transport&);
00921 
00922   /*
00923    * Specialization hook to add concrete private methods from
00924    * TAO's protocol implementation onto the base Transport class
00925    */
00926 
00927   //@@ TAO_TRANSPORT_SPL_PRIVATE_METHODS_ADD_HOOK
00928 
00929 protected:
00930 
00931   /// IOP protocol tag.
00932   CORBA::ULong const tag_;
00933 
00934   /// Global orbcore resource.
00935   TAO_ORB_Core * const orb_core_;
00936 
00937   /// Our entry in the cache. We don't own this. It is here for our
00938   /// convenience. We cannot just change things around.
00939   TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *cache_map_entry_;
00940 
00941   /// Strategy to decide whether multiple requests can be sent over the
00942   /// same connection or the connection is exclusive for a request.
00943   TAO_Transport_Mux_Strategy *tms_;
00944 
00945   /// Strategy for waiting for the reply after sending the request.
00946   TAO_Wait_Strategy *ws_;
00947 
00948   /// Use to check if bidirectional info has been synchronized with
00949   /// the peer.
00950   /**
00951    * Have we sent any info on bidirectional information or have we
00952    * received any info regarding making the connection served by this
00953    * transport bidirectional.
00954    * The flag is used as follows:
00955    * + We dont want to send the bidirectional context info more than
00956    *   once on the connection. Why? Waste of marshalling and
00957    *   demarshalling time on the client.
00958    * + On the server side -- once a client that has established the
00959    *   connection asks the server to use the connection both ways, we
00960    *   *dont* want the server to pack service info to the client. That
00961    *   is not allowed. We need a flag to prevent such a things from
00962    *   happening.
00963    *
00964    * The value of this flag will be 0 if the client sends info and 1
00965    * if the server receives the info.
00966    */
00967   int bidirectional_flag_;
00968 
00969   TAO::Connection_Role opening_connection_role_;
00970 
00971   /// Implement the outgoing data queue
00972   TAO_Queued_Message *head_;
00973   TAO_Queued_Message *tail_;
00974 
00975   /// Queue of the consolidated, incoming messages..
00976   TAO_Incoming_Message_Queue incoming_message_queue_;
00977 
00978   /// Stack of incoming fragments, consolidated messages
00979   /// are going to be enqueued in "incoming_message_queue_"
00980   TAO::Incoming_Message_Stack incoming_message_stack_;
00981 
00982   /// The queue will start draining no later than <queeing_deadline_>
00983   /// *if* the deadline is
00984   ACE_Time_Value current_deadline_;
00985 
00986   /// The timer ID
00987   long flush_timer_id_;
00988 
00989   /// The adapter used to receive timeout callbacks from the Reactor
00990   TAO_Transport_Timer transport_timer_;
00991 
00992   /// Lock that insures that activities that *might* use handler-related
00993   /// resources (such as a connection handler) get serialized.
00994   /**
00995    * This is an <code>ACE_Lock</code> that gets initialized from
00996    * @c TAO_ORB_Core::resource_factory()->create_cached_connection_lock().
00997    * This way, one can use a lock appropriate for the type of system, i.e.,
00998    * a null lock for single-threaded systems, and a real lock for
00999    * multi-threaded systems.
01000    */
01001   mutable ACE_Lock *handler_lock_;
01002 
01003   /// A unique identifier for the transport.
01004   /**
01005    * This never *never* changes over the lifespan, so we don't have to worry
01006    * about locking it.
01007    *
01008    * HINT: Protocol-specific transports that use connection handler
01009    * might choose to set this to the handle for their connection.
01010    */
01011   size_t id_;
01012 
01013   /// Used by the LRU, LFU and FIFO Connection Purging Strategies.
01014   unsigned long purging_order_;
01015 
01016   /// Size of the buffer received.
01017   size_t recv_buffer_size_;
01018 
01019   /// Number of bytes sent.
01020   size_t sent_byte_count_;
01021 
01022   /// Is this transport really connected or not. In case of oneways with
01023   /// SYNC_NONE Policy we don't wait until the connection is ready and we
01024   /// buffer the requests in this transport until the connection is ready
01025   bool is_connected_;
01026 
01027   /// Our messaging object.
01028   TAO_GIOP_Message_Base *messaging_object_;
01029 
01030 private:
01031 
01032   /// @@Phil, I think it would be nice if we could think of a way to
01033   /// do the following.
01034   /// We have been trying to use the transport for marking about
01035   /// translator factories and such! IMHO this is a wrong encapulation
01036   /// ie. trying to populate the transport object with these
01037   /// details. We should probably have a class something like
01038   /// TAO_Message_Property or TAO_Message_Translator or whatever (I am
01039   /// sure you get the idea) and encapsulate all these
01040   /// details. Coupling these seems odd. if I have to be more cynical
01041   /// we can move this to the connection_handler and it may more sense
01042   /// with the DSCP stuff around there. Do you agree?
01043 
01044   /// Additional member values required to support codeset translation
01045   TAO_Codeset_Translator_Base *char_translator_;
01046   TAO_Codeset_Translator_Base *wchar_translator_;
01047 
01048   /// The tcs_set_ flag indicates that negotiation has occured and so the
01049   /// translators are correct, since a null translator is valid if both ends
01050   /// are using the same codeset, whatever that codeset might be.
01051   CORBA::Boolean tcs_set_;
01052 
01053   /// First_request_ is true until the first request is sent or received. This
01054   /// is necessary since codeset context information is necessary only on the
01055   /// first request. After that, the translators are fixed for the life of the
01056   /// connection.
01057   CORBA::Boolean first_request_;
01058 
01059   /// Holds the partial GIOP message (if there is one)
01060   ACE_Message_Block* partial_message_;
01061 
01062 #if TAO_HAS_SENDFILE == 1
01063   /// mmap()-based allocator used to allocator output CDR buffers.
01064   /**
01065    * If this pointer is non-zero, sendfile() will be used to send data
01066    * in a TAO_OutputCDR stream instance.
01067    */
01068   TAO_MMAP_Allocator * const mmap_allocator_;
01069 #endif  /* TAO_HAS_SENDFILE==1 */
01070 
01071 #if TAO_HAS_TRANSPORT_CURRENT == 1
01072   /// Statistics
01073   TAO::Transport::Stats* stats_;
01074 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
01075 
01076   /*
01077    * specialization hook to add class members from concrete
01078    * transport class onto the base transport class. Please
01079    * add any private members to this class *before* this hook.
01080    */
01081   //@@ TAO_TRANSPORT_SPL_DATA_MEMBERS_ADD_HOOK
01082 };
01083 
01084 /*
01085  * Hook to add external typedefs and specializations to
01086  * TAO's transport implementation.
01087  */
01088 
01089 //@@ TAO_TRANSPORT_SPL_EXTERN_ADD_HOOK
01090 
01091 #if TAO_HAS_TRANSPORT_CURRENT == 1
01092 namespace TAO
01093 {
01094   namespace Transport
01095   {
01096     /*
01097      * @class Stats
01098      *
01099      * @brief Used to collect stats on a transport.
01100      *
01101      * The base class in (potentialy) extensible hierarchy used to
01102      * specialize the information available for a specific protocol.
01103      *
01104      * This class is necessary for the implementation of the Transport
01105      * Current feature.
01106      *
01107      * <B>See Also:</B>
01108      *
01109      * https://svn.dre.vanderbilt.edu/viewvc/Middleware/trunk/TAO/docs/transport_current/index.html?revision=HEAD
01110      *
01111      */
01112     class TAO_Export Stats
01113     {
01114     public:
01115       Stats ();
01116       virtual ~Stats ();
01117 
01118       void messages_sent (size_t message_length);
01119       CORBA::LongLong messages_sent (void) const;
01120       CORBA::LongLong bytes_sent (void) const;
01121 
01122       void messages_received (size_t message_length);
01123       CORBA::LongLong messages_received (void) const;
01124       CORBA::LongLong bytes_received (void) const;
01125 
01126       void opened_since (const ACE_Time_Value& tv);
01127       const ACE_Time_Value& opened_since (void) const;
01128 
01129     private:
01130       // The bytes_rcvd_.samples_count() could have been used instead,
01131       // however there was a suspicion that 32 bits would be
01132       // insufficient.
01133       CORBA::LongLong messages_rcvd_;
01134 
01135       // The bytes_sent_.samples_count() could have been used instead,
01136       // however there was a suspicion that 32 bits would be
01137       // insufficient.
01138       CORBA::LongLong messages_sent_;
01139 
01140       ACE_Basic_Stats bytes_rcvd_;
01141       ACE_Basic_Stats bytes_sent_;
01142 
01143       ACE_Time_Value  opened_since_;
01144     };
01145   }
01146 }
01147 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
01148 
01149 TAO_END_VERSIONED_NAMESPACE_DECL
01150 
01151 #if defined (__ACE_INLINE__)
01152 # include "tao/Transport.inl"
01153 #endif /* __ACE_INLINE__ */
01154 
01155 #include /**/ "ace/post.h"
01156 
01157 #endif /* TAO_TRANSPORT_H */

Generated on Tue Feb 2 17:37:53 2010 for TAO by  doxygen 1.4.7