Transport.h

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 
00003 //=============================================================================
00004 /**
00005  *  @file Transport.h
00006  *
00007  *  Transport.h,v 1.89 2006/06/20 06:33:40 jwillemsen Exp
00008  *
00009  *  Define the interface for the Transport component in TAO's
00010  *  pluggable protocol framework.
00011  *
00012  *  @author  Fred Kuhns <fredk@cs.wustl.edu>
00013  */
00014 //=============================================================================
00015 
00016 #ifndef TAO_TRANSPORT_H
00017 #define TAO_TRANSPORT_H
00018 
00019 #include /**/ "ace/pre.h"
00020 
00021 #include "tao/Transport_Cache_Manager.h"
00022 
00023 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00024 # pragma once
00025 #endif /* ACE_LACKS_PRAGMA_ONCE */
00026 
00027 #include "tao/Transport_Timer.h"
00028 #include "tao/Incoming_Message_Queue.h"
00029 #include "tao/Incoming_Message_Stack.h"
00030 #include "ace/Time_Value.h"
00031 
00032 struct iovec;
00033 
00034 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00035 
00036 class TAO_ORB_Core;
00037 class TAO_Target_Specification;
00038 class TAO_Operation_Details;
00039 class TAO_Transport_Mux_Strategy;
00040 class TAO_Wait_Strategy;
00041 class TAO_Connection_Handler;
00042 class TAO_Pluggable_Messaging;
00043 class TAO_Codeset_Translator_Base;
00044 
00045 class TAO_Queued_Message;
00046 class TAO_Synch_Queued_Message;
00047 class TAO_Resume_Handle;
00048 class TAO_Stub;
00049 class TAO_MMAP_Allocator;
00050 
00051 namespace TAO
00052 {
00053   /**
00054    * @note Should this be in TAO namespace. Seems like a candidate
00055    * that should be in the transport
00056    */
00057   enum Connection_Role
00058     {
00059       TAO_UNSPECIFIED_ROLE = 0,
00060       TAO_SERVER_ROLE = 1,
00061       TAO_CLIENT_ROLE = 2
00062     };
00063 }
00064 
00065 /*
00066  * Specialization hook for the TAO's transport implementation.
00067  */
00068 //@@ TAO_TRANSPORT_SPL_INCLUDE_FORWARD_DECL_ADD_HOOK
00069 
00070 /**
00071  * @class TAO_Transport
00072  *
00073  * @brief Generic definitions for the Transport class.
00074  *
00075  * The transport object is created in the Service handler
00076  * constructor and deleted in the Service Handler's destructor!!
00077  *
00078  * The main responsability of a Transport object is to encapsulate a
00079  * connection, and provide a transport independent way to send and
00080  * receive data.  Since TAO is heavily based on the Reactor for all if
00081  * not all its I/O the Transport class is usually implemented with a
00082  * helper Connection Handler that adapts the generic Transport
00083  * interface to the Reactor types.
00084  *
00085  * <H3>The outgoing data path:</H3>
00086  *
00087  * One of the responsibilities of the TAO_Transport class is to send
00088  * out GIOP messages as efficiently as possible.  In most cases
00089  * messages are put out in FIFO order, the transport object will put
00090  * out the message using a single system call and return control to
00091  * the application.  However, for oneways and AMI requests it may be
00092  * more efficient (or required if the SYNC_NONE policy is in effect)
00093  * to queue the messages until a large enough data set is available.
00094  * Another reason to queue is that some applications cannot block for
00095  * I/O, yet they want to send messages so large that a single write()
00096  * operation would not be able to cope with them.  In such cases we
00097  * need to queue the data and use the Reactor to drain the queue.
00098  *
00099  * Therefore, the Transport class may need to use a queue to
00100  * temporarily hold the messages, and, in some configurations, it may
00101  * need to use the Reactor to concurrently drain such queues.
00102  *
00103  * <H4>Out of order messages:</H4> TAO provides explicit policies to
00104  * send 'urgent' messages.  Such messages may put at the head of the
00105  * queue. However, they cannot be sent immediately because the
00106  * transport may already be sending another message in a reactive
00107  * fashion.
00108  *
00109  * Consequently, the Transport must also know if the head of the queue
00110  * has been partially sent.  In that case new messages can only follow
00111  * the head. Only once the head is completely sent we can start
00112  * sending new messages.
00113  *
00114  * <H4>Waiting threads:</H4> One or more threads can be blocked
00115  * waiting for the connection to completely send the message.
00116  * The thread should return as soon as its message has been sent, so a
00117  * per-thread condition is required.  This suggest that simply using a
00118  * ACE_Message_Queue would not be enough:  there is a significant
00119  * amount of ancillary information, to keep on each message that the
00120  * Message_Block class does not provide room for.
00121  *
00122  * Blocking I/O is still attractive for some applications.  First, my
00123  * eliminating the Reactor overhead performance is improved when
00124  * sending large blocks of data.  Second, using the Reactor to send
00125  * out data opens the door for nested upcalls, yet some applications
00126  * cannot deal with the reentrancy issues in this case.
00127  *
00128  * <H4>Timeouts:</H4> Some or all messages could have a timeout period
00129  * attached to them.  The timeout source could either be some
00130  * high-level policy or maybe some strategy to prevent denial of
00131  * service attacks.  In any case the timeouts are per-message, and
00132  * later messages could have shorter timeouts.
00133  * In fact, some kind of scheduling (such as EDF) could be required in
00134  * a few applications.
00135  *
00136  * <H4>Conclusions:</H4> The outgoing data path consist in several
00137  * components:
00138  *
00139  * - A queue of pending messages
00140  * - A message currently being transmitted
00141  * - A per-transport 'send strategy' to choose between blocking on
00142  *   write, blocking on the reactor or blockin on leader/follower.
00143  * - A per-message 'waiting object'
00144  * - A per-message timeout
00145  *
00146  * The Transport object provides a single method to send request
00147  * messages (send_request_message ()).
00148  *
00149  * <H3>The incoming data path:</H3>
00150  *
00151  * One of the main responsibilities of the transport is to read and
00152  * process the incoming GIOP message as quickly and efficiently as
00153  * possible. There are other forces that needs to be given due
00154  * consideration. They are
00155  *  - Multiple threads should be able to traverse along the same data
00156  *    path but should not be able to read from the same handle at the
00157  *    same time ie. the handle should not be shared between threads at
00158  *    any instant.
00159  *  - Reads on the handle could give one or more messages.
00160  *  - Minimise locking and copying overhead when trying to attack the
00161  *    above.
00162  *
00163  * <H3> Parsing messages (GIOP) & processing the message:</H3>
00164  *
00165  * The messages should be checked for validity and the right
00166  * information should be sent to the higher layer for processing. The
00167  * process of doing a sanity check and preparing the messages for the
00168  * higher layers of the ORB are done by the messaging protocol.
00169  *
00170  * <H3> Design forces and Challenges </H3>
00171  *
00172  * To keep things as efficient as possible for medium sized requests,
00173  * it would be good to minimise data copying and locking along the
00174  * incoming path ie. from the time of reading the data from the handle
00175  * to the application. We achieve this by creating a buffer on stack
00176  * and reading the data from the handle into the buffer. We then pass
00177  * the same data block (the buffer is encapsulated into a data block)
00178  * to the higher layers of the ORB. The problems stem from the
00179  * following
00180  *  (a) Data is bigger than the buffer that we have on stack
00181  *  (b) Transports like TCP do not guarantee availability of the whole
00182  *      chunk of data in one shot. Data could trickle in byte by byte.
00183  *  (c) Single read gives multiple messages
00184  *
00185  * We solve the problems as follows
00186  *
00187  *   (a) First do a read with the buffer on stack. Query the underlying
00188  *       messaging object whether the message has any incomplete
00189  *       portion. If so, data will be copied into new buffer being able
00190  *       to hold full message and is queued; succeeding events will read
00191  *       data from socket and write directly into this buffer.
00192  *       Otherwise, if if the message in local buffer is complete, we free
00193  *       the handle and then send the message to the higher layers of the
00194  *       ORB for processing.
00195  *
00196  *   (b) If buffer with incomplete message has been enqueued, while trying
00197  *       to do the above, the reactor will call us back when the handle
00198  *       becomes read ready. The read-operation will copy data directly
00199  *       into the enqueued buffer.  If the message has bee read completely
00200  *       the message is sent to the higher layers of the ORB for processing.
00201  *
00202  *   (c) If we get multiple messages (possible if the client connected
00203  *       to the server sends oneways or AMI requests), we parse and
00204  *       split the messages. Every message is put in the queue. Once
00205  *       the messages are queued, the thread picks up one message to
00206  *       send to the higher layers of the ORB. Before doing that, if
00207  *       it finds more messages, it sends a notify to the reactor
00208  *       without resuming the handle. The next thread picks up a
00209  *       message from the queue and processes that. Once the queue
00210  *       is drained the last thread resumes the handle.
00211  *
00212  * <H3> Sending Replies </H3>
00213  *
00214  * We could use the outgoing path of the ORB to send replies. This
00215  * would allow us to reuse most of the code in the outgoing data
00216  * path. We were doing this till TAO-1.2.3. We run in to
00217  * problems. When writing the reply the ORB gets flow controlled, and the
00218  * ORB tries to flush the message by going into the reactor. This
00219  * resulted in unnecessary nesting. The thread that gets into the
00220  * Reactor could potentially handle other messages (incoming or
00221  * outgoing) and the stack starts growing leading to crashes.
00222  *
00223  * <H4> Solution to the nesting problem </H4>
00224  *
00225  * The solution that we (plan to) adopt is pretty straight
00226  * forward. The thread sending replies will not block to send the
00227  * replies but queue the replies and return to the Reactor. (Note the
00228  * careful usages of the terms "blocking in the Reactor" as opposed to
00229  * "return back to the Reactor".
00230  *
00231  *
00232  * <B>See Also:</B>
00233  *
00234  * http://cvs.doc.wustl.edu/ace-latest.cgi/ACE_wrappers/TAO/docs/pluggable_protocols/index.html
00235  *
00236  */
00237 class TAO_Export TAO_Transport
00238 {
00239 public:
00240 
00241   /// Default creator, requires the tag value be supplied.
00242   TAO_Transport (CORBA::ULong tag,
00243                  TAO_ORB_Core *orb_core);
00244 
00245   /// Destructor
00246   virtual ~TAO_Transport (void);
00247 
00248   /// Return the protocol tag.
00249   /**
00250    * The OMG assigns unique tags (a 32-bit unsigned number) to each
00251    * protocol. New protocol tags can be obtained free of charge from
00252    * the OMG, check the documents in corbafwd.h for more details.
00253    */
00254   CORBA::ULong tag (void) const;
00255 
00256   /// Access the ORB that owns this connection.
00257   TAO_ORB_Core *orb_core (void) const;
00258 
00259   /// Get the TAO_Tranport_Mux_Strategy used by this object.
00260   /**
00261    * The role of the TAO_Transport_Mux_Strategy is described in more
00262    * detail in that class' documentation.  Enough is to say that the
00263    * class is used to control how many threads can have pending
00264    * requests over the same connection. Multiplexing multiple threads
00265    * over the same connection conserves resources and is almost
00266    * required for AMI, but having only one pending request per
00267    * connection is more efficient and reduces the possibilities of
00268    * priority inversions.
00269    */
00270   TAO_Transport_Mux_Strategy *tms (void) const;
00271 
00272   /// Return the TAO_Wait_Strategy used by this object.
00273   /**
00274    * The role of the TAO_Wait_Strategy is described in more detail in
00275    * that class' documentation. Enough is to say that the ORB can wait
00276    * for a reply blocking on read(), using the Reactor to wait for
00277    * multiple events concurrently or using the Leader/Followers
00278    * protocol.
00279    */
00280   TAO_Wait_Strategy *wait_strategy (void) const;
00281 
00282   /// Callback method to reactively drain the outgoing data queue
00283   int handle_output (void);
00284 
00285   /// Get the bidirectional flag
00286   int bidirectional_flag (void) const;
00287 
00288   /// Set the bidirectional flag
00289   void bidirectional_flag (int flag);
00290 
00291   /// Set the Cache Map entry
00292   void cache_map_entry (TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *entry);
00293 
00294   /// Get the Cache Map entry
00295   TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *cache_map_entry (void);
00296 
00297   /// Set and Get the identifier for this transport instance.
00298   /**
00299    * If not set, this will return an integer representation of
00300    * the <code>this</code> pointer for the instance on which
00301    * it's called.
00302    */
00303   size_t id (void) const;
00304   void id (size_t id);
00305 
00306   /**
00307    * Methods dealing with the role of the connection, e.g., CLIENT or SERVER.
00308    * See CORBA 2.6 Specification, Section 15.5.1 for origin of definitions.
00309    */
00310   TAO::Connection_Role opened_as (void) const;
00311   void opened_as (TAO::Connection_Role);
00312 
00313   /// Get and Set the purging order. The purging strategy uses the set
00314   /// version to set the purging order.
00315   unsigned long purging_order (void) const;
00316   void purging_order(unsigned long value);
00317 
00318   /// Check if there are messages pending in the queue
00319   /**
00320    * @return 1 if the queue is empty
00321    */
00322   int queue_is_empty (void);
00323 
00324   /// Added event handler to the handlers set.
00325   /**
00326    * Called by the cache when the cache is closing.
00327    *
00328    * @param handlers The TAO_Connection_Handler_Set into which the
00329    *        transport should place its handler
00330    */
00331   void provide_handler (TAO::Connection_Handler_Set &handlers);
00332 
00333   /// Add event handlers corresponding to transports that have RW wait
00334   /// strategy to the handlers set.
00335   /**
00336    * Called by the cache when the ORB is shuting down.
00337    *
00338    * @param handlers The TAO_Connection_Handler_Set into which the
00339    *        transport should place its handler if the transport has RW
00340    *        strategy on.
00341    *
00342    * @return true indicates a handler was added to the handler set.
00343    *         false indocates that the transport did not have a
00344    *         blockable handler that could be added.
00345    */
00346   bool provide_blockable_handler (TAO::Connection_Handler_Set &handlers);
00347 
00348   /// Register the handler with the reactor.
00349   /**
00350    * Register the handler with the reactor. This method is used by the
00351    * Wait_On_Reactor strategy. The transport must register its event
00352    * handler with the ORB's Reactor.
00353    *
00354    * @todo I think this method is pretty much useless, the
00355    * connections are *always* registered with the Reactor, except in
00356    * thread-per-connection mode.  In that case putting the connection
00357    * in the Reactor would produce unpredictable results anyway.
00358    */
00359   virtual int register_handler (void);
00360 
00361   /// Write the complete Message_Block chain to the connection.
00362   /**
00363    * This method serializes on handler_lock_, guaranteeing that only
00364    * thread can execute it on the same instance concurrently.
00365    *
00366    * Often the implementation simply forwards the arguments to the
00367    * underlying ACE_Svc_Handler class. Using the code factored out
00368    * into ACE.
00369    *
00370    * Be careful with protocols that perform non-trivial
00371    * transformations of the data, such as SSLIOP or protocols that
00372    * compress the stream.
00373    *
00374    * @param iov contains the data that must be sent.
00375    *
00376    * @param timeout is the maximum time that the application is
00377    * willing to wait for the data to be sent, useful in platforms that
00378    * implement timed writes.
00379    * The timeout value is obtained from the policies set by the
00380    * application.
00381    *
00382    * @param bytes_transferred should return the total number of bytes
00383    * successfully transferred before the connection blocked.  This is
00384    * required because in some platforms and/or protocols multiple
00385    * system calls may be required to send the chain of message
00386    * blocks.  The first few calls can work successfully, but the final
00387    * one can fail or signal a flow control situation (via EAGAIN).
00388    * In this case the ORB expects the function to return -1, errno to
00389    * be appropriately set and this argument to return the number of
00390    * bytes already on the OS I/O subsystem.
00391    *
00392    * This call can also fail if the transport instance is no longer
00393    * associated with a connection (e.g., the connection handler closed
00394    * down).  In that case, it returns -1 and sets errno to
00395    * <code>ENOENT</code>.
00396    */
00397   virtual ssize_t send (iovec *iov,
00398                         int iovcnt,
00399                         size_t &bytes_transferred,
00400                         const ACE_Time_Value *timeout = 0) = 0;
00401 
00402 #ifdef ACE_HAS_SENDFILE
00403   /// Send data through zero-copy write mechanism, if available.
00404   /**
00405    * This method sends the data in the I/O vector through the platform
00406    * sendfile() function to perform a zero-copy write, if available.
00407    * Otherwise, the default fallback implementation simply delegates
00408    * to the TAO_Transport::send() method.
00409    *
00410    * @note This method is best used when sending very large blocks of
00411    *       data.
00412    */
00413   virtual ssize_t sendfile (TAO_MMAP_Allocator * allocator,
00414                             iovec * iov,
00415                             int iovcnt,
00416                             size_t &bytes_transferred,
00417                             ACE_Time_Value const * timeout = 0);
00418 #endif  /* ACE_HAS_SENDFILE */
00419 
00420 
00421   /// Read len bytes from into buf.
00422   /**
00423    * This method serializes on handler_lock_, guaranteeing that only
00424    * thread can execute it on the same instance concurrently.
00425    *
00426    * @param buffer ORB allocated buffer where the data should be
00427    * @@ The ACE_Time_Value *s is just a place holder for now.  It is
00428    * not clear this this is the best place to specify this.  The actual
00429    * timeout values will be kept in the Policies.
00430    */
00431   virtual ssize_t recv (char *buffer,
00432                         size_t len,
00433                         const ACE_Time_Value *timeout = 0) = 0;
00434 
00435   /**
00436    * @name Control connection lifecycle
00437    *
00438    * These methods are routed through the TMS object. The TMS
00439    * strategies implement them correctly.
00440    */
00441   //@{
00442 
00443   /// Request has been just sent, but the reply is not received. Idle
00444   /// the transport now.
00445   bool idle_after_send (void);
00446 
00447   /// Request is sent and the reply is received. Idle the transport
00448   /// now.
00449   bool idle_after_reply (void);
00450 
00451   /// Call the implementation method after obtaining the lock.
00452   virtual void close_connection (void);
00453 
00454   //@}
00455 
00456   /** @name Template methods
00457    *
00458    * The Transport class uses the Template Method Pattern to implement
00459    * the protocol specific functionality.
00460    * Implementors of a pluggable protocol should override the
00461    * following methods with the semantics documented below.
00462    */
00463   /**
00464    * Initialising the messaging object. This would be used by the
00465    * connector side. On the acceptor side the connection handler
00466    * would take care of the messaging objects.
00467    */
00468   virtual int messaging_init (CORBA::Octet major,
00469                               CORBA::Octet minor) = 0;
00470 
00471   /// Extracts the list of listen points from the @a cdr stream. The
00472   /// list would have the protocol specific details of the
00473   /// ListenPoints
00474   virtual int tear_listen_point_list (TAO_InputCDR &cdr);
00475 
00476   /// Hooks that can be overridden in concrete transports.
00477   /**
00478    * These hooks are invoked just after connection establishment (or
00479    * after a connection is fetched from cache). The
00480    * return value signifies whether the invoker should proceed  with
00481    * post connection establishment activities. Protocols like SSLIOP
00482    * need this to verify whether connections already established have
00483    * valid certificates. There are no pre_connect_hooks () since the
00484    * transport doesn't exist before a connection establishment. :-)
00485    *
00486    * @note The methods are not made const with a reason.
00487    */
00488   virtual bool post_connect_hook (void);
00489 
00490   /// Memory management routines.
00491   /*
00492    * Forwards to event handler.
00493    */
00494   ACE_Event_Handler::Reference_Count add_reference (void);
00495   ACE_Event_Handler::Reference_Count remove_reference (void);
00496 
00497   /// Return the messaging object that is used to format the data that
00498   /// needs to be sent.
00499   virtual TAO_Pluggable_Messaging * messaging_object (void) = 0;
00500 
00501   /** @name Template methods
00502    *
00503    * The Transport class uses the Template Method Pattern to implement
00504    * the protocol specific functionality.
00505    * Implementors of a pluggable protocol should override the
00506    * following methods with the semantics documented below.
00507    */
00508   //@{
00509 
00510   /// Return the event handler used to receive notifications from the
00511   /// Reactor.
00512   /**
00513    * Normally a concrete TAO_Transport object has-a ACE_Event_Handler
00514    * member that functions as an adapter between the ACE_Reactor
00515    * framework and the TAO pluggable protocol framework.
00516    * In all the protocols implemented so far this role is fullfilled
00517    * by an instance of ACE_Svc_Handler.
00518    *
00519    * @todo Since we only use a limited functionality of
00520    * ACE_Svc_Handler we could probably implement a generic
00521    * adapter class (TAO_Transport_Event_Handler or something), this
00522    * will reduce footprint and simplify the process of implementing a
00523    * pluggable protocol.
00524    *
00525    * @todo This method has to be renamed to event_handler()
00526    */
00527   virtual ACE_Event_Handler * event_handler_i (void) = 0;
00528 
00529   /// Is this transport really connected
00530   bool is_connected (void) const;
00531 
00532   /// Perform all the actions when this transport get opened
00533   bool post_open (size_t id);
00534 
00535   /// Get the connection handler for this transport
00536   TAO_Connection_Handler * connection_handler (void);
00537 
00538   /// Accessor for the output CDR stream
00539   TAO_OutputCDR &out_stream (void);
00540 
00541   /*
00542    * Specialization hook to add public methods from
00543    * concrete transport implementations to TAO's transport
00544    * class
00545    */
00546   //@@ TAO_TRANSPORT_SPL_PUBLIC_METHODS_ADD_HOOK
00547 
00548 protected:
00549 
00550   virtual TAO_Connection_Handler * connection_handler_i (void) = 0;
00551 
00552 public:
00553 
00554   /// This is a request for the transport object to write a
00555   /// LocateRequest header before it is sent out.
00556   int generate_locate_request (TAO_Target_Specification &spec,
00557                                TAO_Operation_Details &opdetails,
00558                                TAO_OutputCDR &output);
00559 
00560   /// This is a request for the transport object to write a request
00561   /// header before it sends out the request
00562   virtual int generate_request_header (TAO_Operation_Details &opd,
00563                                        TAO_Target_Specification &spec,
00564                                        TAO_OutputCDR &msg);
00565 
00566   /// Recache ourselves in the cache
00567   int recache_transport (TAO_Transport_Descriptor_Interface* desc);
00568 
00569   /// Callback to read incoming data
00570   /**
00571    * The ACE_Event_Handler adapter invokes this method as part of its
00572    * handle_input() operation.
00573    *
00574    * @todo the method name is confusing! Calling it handle_input()
00575    * would probably make things easier to understand and follow!
00576    *
00577    * Once a complete message is read the Transport class delegates on
00578    * the Messaging layer to invoke the right upcall (on the server) or
00579    * the TAO_Reply_Dispatcher (on the client side).
00580    *
00581    * @param max_wait_time In some cases the I/O is synchronous, e.g. a
00582    * thread-per-connection server or when Wait_On_Read is enabled.  In
00583    * those cases a maximum read time can be specified.
00584    *
00585    * @param block Is deprecated and ignored.
00586    *
00587    */
00588   virtual int handle_input (TAO_Resume_Handle &rh,
00589                             ACE_Time_Value *max_wait_time = 0,
00590                             int block = 0);
00591 
00592   enum
00593     {
00594       TAO_ONEWAY_REQUEST = 0,
00595       TAO_TWOWAY_REQUEST = 1,
00596       TAO_REPLY
00597     };
00598 
00599   /// Prepare the waiting and demuxing strategy to receive a reply for
00600   /// a new request.
00601   /**
00602    * Preparing the ORB to receive the reply only once the request is
00603    * completely sent opens the system to some subtle race conditions:
00604    * suppose the ORB is running in a multi-threaded configuration,
00605    * thread A makes a request while thread B is using the Reactor to
00606    * process all incoming requests.
00607    * Thread A could be implemented as follows:
00608    * 1) send the request
00609    * 2) setup the ORB to receive the reply
00610    * 3) wait for the request
00611    *
00612    * but in this case thread B may receive the reply between step (1)
00613    * and (2), and drop it as an invalid or unexpected message.
00614    * Consequently the correct implementation is:
00615    * 1) setup the ORB to receive the reply
00616    * 2) send the request
00617    * 3) wait for the reply
00618    *
00619    * The following method encapsulates this idiom.
00620    *
00621    * @todo This is generic code, it should be factored out into the
00622    * Transport class.
00623    */
00624   // @nolock b/c this calls send_or_buffer
00625   virtual int send_request (TAO_Stub *stub,
00626                             TAO_ORB_Core *orb_core,
00627                             TAO_OutputCDR &stream,
00628                             int message_semantics,
00629                             ACE_Time_Value *max_time_wait) = 0;
00630 
00631 
00632 
00633   /// This method formats the stream and then sends the message on the
00634   /// transport.
00635   /**
00636    * Once the ORB is prepared to receive a reply (see send_request()
00637    * above), and all the arguments have been marshaled the CDR stream
00638    * must be 'formatted', i.e. the message_size field in the GIOP
00639    * header can finally be set to the proper value.
00640    *
00641    */
00642   virtual int send_message (TAO_OutputCDR &stream,
00643                             TAO_Stub *stub = 0,
00644                             int message_semantics = TAO_Transport::TAO_TWOWAY_REQUEST,
00645                             ACE_Time_Value *max_time_wait = 0) = 0;
00646 
00647 
00648   /// Sent the contents of @a message_block
00649   /**
00650    * @param stub The object reference used for this operation, useful
00651    *             to obtain the current policies.
00652    * @param message_semantics If this is set to TAO_TWO_REQUEST
00653    *        this method will block until the operation is completely
00654    *        written on the wire. If it is set to other values this
00655    *        operation could return.
00656    * @param message_block The CDR encapsulation of the GIOP message
00657    *             that must be sent.  The message may consist of
00658    *             multiple Message Blocks chained through the cont()
00659    *             field.
00660    * @param max_wait_time The maximum time that the operation can
00661    *             block, used in the implementation of timeouts.
00662    */
00663   virtual int send_message_shared (TAO_Stub *stub,
00664                                    int message_semantics,
00665                                    const ACE_Message_Block *message_block,
00666                                    ACE_Time_Value *max_wait_time);
00667 
00668 protected:
00669 
00670   /// Process the message by sending it to the higher layers of the
00671   /// ORB.
00672   int process_parsed_messages (TAO_Queued_Data *qd,
00673                                TAO_Resume_Handle &rh);
00674 
00675   /// Implement send_message_shared() assuming the handler_lock_ is
00676   /// held.
00677   int send_message_shared_i (TAO_Stub *stub,
00678                              int message_semantics,
00679                              const ACE_Message_Block *message_block,
00680                              ACE_Time_Value *max_wait_time);
00681 
00682   /// Queue a message for @a message_block
00683   int queue_message_i (const ACE_Message_Block *message_block);
00684 
00685 public:
00686   /// Format and queue a message for @a stream
00687   int format_queue_message (TAO_OutputCDR &stream);
00688 
00689   /// Send a message block chain,
00690   int send_message_block_chain (const ACE_Message_Block *message_block,
00691                                 size_t &bytes_transferred,
00692                                 ACE_Time_Value *max_wait_time = 0);
00693 
00694   /// Send a message block chain, assuming the lock is held
00695   int send_message_block_chain_i (const ACE_Message_Block *message_block,
00696                                   size_t &bytes_transferred,
00697                                   ACE_Time_Value *max_wait_time);
00698   /// Cache management
00699   int purge_entry (void);
00700 
00701   /// Cache management
00702   int make_idle (void);
00703 
00704   /// Cache management
00705   int update_transport (void);
00706 
00707   /// The timeout callback, invoked when any of the timers related to
00708   /// this transport expire.
00709   /**
00710    * @param current_time The current time as reported from the Reactor
00711    * @param act The Asynchronous Completion Token.  Currently it is
00712    *            interpreted as follows:
00713    * - If the ACT is the address of this->current_deadline_ the
00714    *   queueing timeout has expired and the queue should start
00715    *   flushing.
00716    *
00717    * @return Returns 0 if there are no problems, -1 if there is an
00718    *         error
00719    *
00720    * @todo In the future this function could be used to expire
00721    *       messages (oneways) that have been sitting for too long on
00722    *       the queue.
00723    */
00724   int handle_timeout (const ACE_Time_Value &current_time,
00725                       const void* act);
00726 
00727   /// Accessor to recv_buffer_size_
00728   size_t recv_buffer_size (void) const;
00729 
00730   /// Accessor to sent_byte_count_
00731   size_t sent_byte_count (void) const;
00732 
00733   /// CodeSet Negotiation - Get the char codeset translator factory
00734   TAO_Codeset_Translator_Base *char_translator (void) const;
00735 
00736   /// CodeSet Negotiation - Get the wchar codeset translator factory
00737   TAO_Codeset_Translator_Base *wchar_translator (void) const;
00738 
00739   /// CodeSet negotiation - Set the char codeset translator factory
00740   void char_translator (TAO_Codeset_Translator_Base *);
00741 
00742   /// CodeSet negotiation - Set the wchar codeset translator factory
00743   void wchar_translator (TAO_Codeset_Translator_Base *);
00744 
00745   /// Use the Transport's codeset factories to set the translator for input
00746   /// and output CDRs.
00747   void assign_translators (TAO_InputCDR *, TAO_OutputCDR *);
00748 
00749   /// It is necessary to clear the codeset translator when a CDR stream
00750   /// is used for more than one GIOP message. This is required since the
00751   /// header must not be translated, whereas the body must be.
00752   void clear_translators (TAO_InputCDR *, TAO_OutputCDR *);
00753 
00754   /// Return true if the tcs has been set
00755   CORBA::Boolean is_tcs_set() const;
00756 
00757   /// Set the state of the first_request_ flag to 0
00758   void first_request_sent();
00759 
00760   /// Notify all the components inside a Transport when the underlying
00761   /// connection is closed.
00762   void send_connection_closed_notifications (void);
00763 
00764 private:
00765 
00766   /// Helper method that returns the Transport Cache Manager.
00767   TAO::Transport_Cache_Manager &transport_cache_manager (void);
00768 
00769   /// Send some of the data in the queue.
00770   /**
00771    * As the outgoing data is drained this method is invoked to send as
00772    * much of the current message as possible.
00773    *
00774    * Returns 0 if there is more data to send, -1 if there was an error
00775    * and 1 if the message was completely sent.
00776    */
00777   int drain_queue (void);
00778 
00779   /// Implement drain_queue() assuming the lock is held
00780   int drain_queue_i (void);
00781 
00782   /// This class needs priviledged access to
00783   /// - queue_is_empty_i()
00784   /// - drain_queue_i()
00785   friend class TAO_Block_Flushing_Strategy;
00786 
00787   /// Check if there are messages pending in the queue
00788   /**
00789    * This version assumes that the lock is already held.  Use with
00790    * care!
00791    *
00792    * @return 1 if the queue is empty
00793    */
00794   int queue_is_empty_i (void);
00795 
00796   /// A helper routine used in drain_queue_i()
00797   int drain_queue_helper (int &iovcnt, iovec iov[]);
00798 
00799   /// These classes need privileged access to:
00800   /// - schedule_output_i()
00801   /// - cancel_output_i()
00802   friend class TAO_Reactive_Flushing_Strategy;
00803   friend class TAO_Leader_Follower_Flushing_Strategy;
00804 
00805   /// Needs priveleged access to
00806   /// event_handler_i ()
00807   friend class TAO_Thread_Per_Connection_Handler;
00808 
00809   /// Schedule handle_output() callbacks
00810   int schedule_output_i (void);
00811 
00812   /// Cancel handle_output() callbacks
00813   int cancel_output_i (void);
00814 
00815   /// Cleanup the queue.
00816   /**
00817    * Exactly @a byte_count bytes have been sent, the queue must be
00818    * cleaned up as potentially several messages have been completely
00819    * sent out.
00820    * It leaves on head_ the next message to send out.
00821    */
00822   void cleanup_queue (size_t byte_count);
00823 
00824   /// Cleanup the complete queue
00825   void cleanup_queue_i ();
00826 
00827   /// Check if the buffering constraints have been reached
00828   int check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush);
00829 
00830   /// Send a synchronous message, i.e. block until the message is on
00831   /// the wire
00832   int send_synchronous_message_i (const ACE_Message_Block *message_block,
00833                                   ACE_Time_Value *max_wait_time);
00834 
00835   /// Send a reply message, i.e. do not block until the message is on
00836   /// the wire, but just return after adding them to the queue.
00837   int send_reply_message_i (const ACE_Message_Block *message_block,
00838                             ACE_Time_Value *max_wait_time);
00839 
00840   /// Send an asynchronous message, i.e. do not block until the message is on
00841   /// the wire
00842   int send_asynchronous_message_i (TAO_Stub *stub,
00843                                    const ACE_Message_Block *message_block,
00844                                    ACE_Time_Value *max_wait_time);
00845 
00846   /// A helper method used by send_synchronous_message_i() and
00847   /// send_reply_message_i(). Reusable code that could be used by both
00848   /// the methods.
00849   int send_synch_message_helper_i (TAO_Synch_Queued_Message &s,
00850                                    ACE_Time_Value *max_wait_time);
00851 
00852   /// Check if the flush timer is still pending
00853   int flush_timer_pending (void) const;
00854 
00855   /// The flush timer expired or was explicitly cancelled, mark it as
00856   /// not pending
00857   void reset_flush_timer (void);
00858 
00859   /// Print out error messages if the event handler is not valid
00860   void report_invalid_event_handler (const char *caller);
00861 
00862   /// Is invoked by handle_input operation. It consolidate message on
00863   /// top of incoming_message_stack.  The amount of missing data is
00864   /// known and recv operation copies data directly into message buffer,
00865   /// as much as a single recv-invocation provides.
00866   int handle_input_missing_data (TAO_Resume_Handle &rh,
00867                                  ACE_Time_Value *max_wait_time,
00868                                  TAO_Queued_Data *q_data);
00869 
00870   /// Is invoked by handle_input operation. It parses new messages from input stream
00871   /// or consolidates messages whose header has been partially read, the message
00872   /// size being unknown so far. It parses as much data as a single recv-invocation provides.
00873   int handle_input_parse_data (TAO_Resume_Handle &rh,
00874                                ACE_Time_Value *max_wait_time);
00875 
00876   /// Is invoked by handle_input_parse_data. Parses all messages remaining
00877   /// in @a message_block.
00878   int handle_input_parse_extra_messages (ACE_Message_Block &message_block);
00879 
00880   /// @return -1 error, otherwise 0
00881   int consolidate_enqueue_message (TAO_Queued_Data *qd);
00882 
00883   /// @return -1 error, otherwise 0
00884   int consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh);
00885 
00886   /*
00887    * Process the message that is in the head of the incoming queue.
00888    * If there are more messages in the queue, this method calls
00889    * this->notify_reactor () to wake up a thread
00890    * @retval -1 on error
00891    * @retval 0 if successfully processing enqueued messages
00892    * @retval 1 if no message present in queue
00893    */
00894   int process_queue_head (TAO_Resume_Handle &rh);
00895 
00896   /*
00897    * This call prepares a new handler for the notify call and sends a
00898    * notify () call to the reactor.
00899    */
00900   int notify_reactor (void);
00901 
00902   /// Assume the lock is held
00903   void send_connection_closed_notifications_i (void);
00904 
00905   /// Allocate a partial message block and store it in our
00906   /// partial_message_ data member.
00907   void allocate_partial_message_block (void);
00908 
00909   // Disallow copying and assignment.
00910   TAO_Transport (const TAO_Transport&);
00911   void operator= (const TAO_Transport&);
00912 
00913   /*
00914    * Specialization hook to add concrete private methods from
00915    * TAO's protocol implementation onto the base Transport class
00916    */
00917 
00918   //@@ TAO_TRANSPORT_SPL_PRIVATE_METHODS_ADD_HOOK
00919 
00920 protected:
00921 
00922   /// IOP protocol tag.
00923   CORBA::ULong const tag_;
00924 
00925   /// Global orbcore resource.
00926   TAO_ORB_Core * const orb_core_;
00927 
00928   /// Our entry in the cache. We don't own this. It is here for our
00929   /// convenience. We cannot just change things around.
00930   TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *cache_map_entry_;
00931 
00932   /// Strategy to decide whether multiple requests can be sent over the
00933   /// same connection or the connection is exclusive for a request.
00934   TAO_Transport_Mux_Strategy *tms_;
00935 
00936   /// Strategy for waiting for the reply after sending the request.
00937   TAO_Wait_Strategy *ws_;
00938 
00939   /// Use to check if bidirectional info has been synchronized with
00940   /// the peer.
00941   /**
00942    * Have we sent any info on bidirectional information or have we
00943    * received any info regarding making the connection served by this
00944    * transport bidirectional.
00945    * The flag is used as follows:
00946    * + We dont want to send the bidirectional context info more than
00947    *   once on the connection. Why? Waste of marshalling and
00948    *   demarshalling time on the client.
00949    * + On the server side -- once a client that has established the
00950    *   connection asks the server to use the connection both ways, we
00951    *   *dont* want the server to pack service info to the client. That
00952    *   is not allowed. We need a flag to prevent such a things from
00953    *   happening.
00954    *
00955    * The value of this flag will be 0 if the client sends info and 1
00956    * if the server receives the info.
00957    */
00958   int bidirectional_flag_;
00959 
00960   TAO::Connection_Role opening_connection_role_;
00961 
00962   /// Implement the outgoing data queue
00963   TAO_Queued_Message *head_;
00964   TAO_Queued_Message *tail_;
00965 
00966   /// Queue of the consolidated, incoming messages..
00967   TAO_Incoming_Message_Queue incoming_message_queue_;
00968 
00969   /// Stack of incoming fragments, consolidated messages
00970   /// are going to be enqueued in "incoming_message_queue_"
00971   TAO::Incoming_Message_Stack incoming_message_stack_;
00972 
00973   /// The queue will start draining no later than <queeing_deadline_>
00974   /// *if* the deadline is
00975   ACE_Time_Value current_deadline_;
00976 
00977   /// The timer ID
00978   long flush_timer_id_;
00979 
00980   /// The adapter used to receive timeout callbacks from the Reactor
00981   TAO_Transport_Timer transport_timer_;
00982 
00983   /// Lock that insures that activities that *might* use handler-related
00984   /// resources (such as a connection handler) get serialized.
00985   /**
00986    * This is an <code>ACE_Lock</code> that gets initialized from
00987    * @c TAO_ORB_Core::resource_factory()->create_cached_connection_lock().
00988    * This way, one can use a lock appropriate for the type of system, i.e.,
00989    * a null lock for single-threaded systems, and a real lock for
00990    * multi-threaded systems.
00991    */
00992   mutable ACE_Lock *handler_lock_;
00993 
00994   /// A unique identifier for the transport.
00995   /**
00996    * This never *never* changes over the lifespan, so we don't have to worry
00997    * about locking it.
00998    *
00999    * HINT: Protocol-specific transports that use connection handler
01000    * might choose to set this to the handle for their connection.
01001    */
01002   size_t id_;
01003 
01004   /// Used by the LRU, LFU and FIFO Connection Purging Strategies.
01005   unsigned long purging_order_;
01006 
01007   /// Size of the buffer received.
01008   size_t recv_buffer_size_;
01009 
01010   /// Number of bytes sent.
01011   size_t sent_byte_count_;
01012 
01013   /// Is this transport really connected or not. In case of oneways with
01014   /// SYNC_NONE Policy we don't wait until the connection is ready and we
01015   /// buffer the requests in this transport until the connection is ready
01016   bool is_connected_;
01017 
01018 private:
01019 
01020   /// @@Phil, I think it would be nice if we could think of a way to
01021   /// do the following.
01022   /// We have been trying to use the transport for marking about
01023   /// translator factories and such! IMHO this is a wrong encapulation
01024   /// ie. trying to populate the transport object with these
01025   /// details. We should probably have a class something like
01026   /// TAO_Message_Property or TAO_Message_Translator or whatever (I am
01027   /// sure you get the idea) and encapsulate all these
01028   /// details. Coupling these seems odd. if I have to be more cynical
01029   /// we can move this to the connection_handler and it may more sense
01030   /// with the DSCP stuff around there. Do you agree?
01031 
01032   /// Additional member values required to support codeset translation
01033   TAO_Codeset_Translator_Base *char_translator_;
01034   TAO_Codeset_Translator_Base *wchar_translator_;
01035 
01036   /// The tcs_set_ flag indicates that negotiation has occured and so the
01037   /// translators are correct, since a null translator is valid if both ends
01038   /// are using the same codeset, whatever that codeset might be.
01039   CORBA::Boolean tcs_set_;
01040 
01041   /// First_request_ is true until the first request is sent or received. This
01042   /// is necessary since codeset context information is necessary only on the
01043   /// first request. After that, the translators are fixed for the life of the
01044   /// connection.
01045   CORBA::Boolean first_request_;
01046 
01047   /// Holds the partial GIOP message (if there is one)
01048   ACE_Message_Block* partial_message_;
01049 
01050 #ifdef ACE_HAS_SENDFILE
01051   /// mmap()-based allocator used to allocator output CDR buffers.
01052   /**
01053    * If this pointer is non-zero, sendfile() will be used to send data
01054    * in a TAO_OutputCDR stream instance.
01055    */
01056   TAO_MMAP_Allocator * const mmap_allocator_;
01057 #endif  /* ACE_HAS_SENDFILE */
01058 
01059   /*
01060    * specialization hook to add class members from concrete
01061    * transport class onto the base transport class. Please
01062    * add any private members to this class *before* this hook.
01063    */
01064   //@@ TAO_TRANSPORT_SPL_DATA_MEMBERS_ADD_HOOK
01065 };
01066 
01067 /*
01068  * Hook to add external typedefs and specializations to
01069  * TAO's transport implementation.
01070  */
01071 
01072 //@@ TAO_TRANSPORT_SPL_EXTERN_ADD_HOOK
01073 
01074 TAO_END_VERSIONED_NAMESPACE_DECL
01075 
01076 #if defined (__ACE_INLINE__)
01077 # include "tao/Transport.inl"
01078 #endif /* __ACE_INLINE__ */
01079 
01080 #include /**/ "ace/post.h"
01081 
01082 #endif /* TAO_TRANSPORT_H */

Generated on Thu Nov 9 11:54:26 2006 for TAO by doxygen 1.3.6