00001 // -*- C++ -*- 00002 00003 //============================================================================= 00004 /** 00005 * @file Transport.h 00006 * 00007 * $Id: Transport.h 80603 2008-02-11 22:14:39Z johnc $ 00008 * 00009 * Define the interface for the Transport component in TAO's 00010 * pluggable protocol framework. 00011 * 00012 * @author Fred Kuhns <fredk@cs.wustl.edu> 00013 */ 00014 //============================================================================= 00015 00016 #ifndef TAO_TRANSPORT_H 00017 #define TAO_TRANSPORT_H 00018 00019 #include /**/ "ace/pre.h" 00020 00021 #include "tao/Transport_Cache_Manager.h" 00022 00023 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00024 # pragma once 00025 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00026 00027 #include "tao/Transport_Timer.h" 00028 #include "tao/Incoming_Message_Queue.h" 00029 #include "tao/Incoming_Message_Stack.h" 00030 #include "ace/Time_Value.h" 00031 #include "ace/Basic_Stats.h" 00032 00033 struct iovec; 00034 00035 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00036 00037 class TAO_ORB_Core; 00038 class TAO_Target_Specification; 00039 class TAO_Operation_Details; 00040 class TAO_Transport_Mux_Strategy; 00041 class TAO_Wait_Strategy; 00042 class TAO_Connection_Handler; 00043 class TAO_GIOP_Message_Base; 00044 class TAO_Codeset_Translator_Base; 00045 00046 class TAO_Queued_Message; 00047 class TAO_Synch_Queued_Message; 00048 class TAO_Resume_Handle; 00049 class TAO_Stub; 00050 class TAO_MMAP_Allocator; 00051 00052 namespace TAO 00053 { 00054 /** 00055 * @note Should this be in TAO namespace. Seems like a candidate 00056 * that should be in the transport 00057 */ 00058 enum Connection_Role 00059 { 00060 TAO_UNSPECIFIED_ROLE = 0, 00061 TAO_SERVER_ROLE = 1, 00062 TAO_CLIENT_ROLE = 2 00063 }; 00064 00065 namespace Transport 00066 { 00067 /// Transport-level statistics. Initially introduced to support 00068 /// the "Transport Current" functionality. 00069 class Stats; 00070 } 00071 } 00072 00073 /* 00074 * Specialization hook for the TAO's transport implementation. 00075 */ 00076 //@@ TAO_TRANSPORT_SPL_INCLUDE_FORWARD_DECL_ADD_HOOK 00077 00078 /** 00079 * @class TAO_Transport 00080 * 00081 * @brief Generic definitions for the Transport class. 00082 * 00083 * The transport object is created in the Service handler 00084 * constructor and deleted in the Service Handler's destructor!! 00085 * 00086 * The main responsability of a Transport object is to encapsulate a 00087 * connection, and provide a transport independent way to send and 00088 * receive data. Since TAO is heavily based on the Reactor for all if 00089 * not all its I/O the Transport class is usually implemented with a 00090 * helper Connection Handler that adapts the generic Transport 00091 * interface to the Reactor types. 00092 * 00093 * <H3>The outgoing data path:</H3> 00094 * 00095 * One of the responsibilities of the TAO_Transport class is to send 00096 * out GIOP messages as efficiently as possible. In most cases 00097 * messages are put out in FIFO order, the transport object will put 00098 * out the message using a single system call and return control to 00099 * the application. However, for oneways and AMI requests it may be 00100 * more efficient (or required if the SYNC_NONE policy is in effect) 00101 * to queue the messages until a large enough data set is available. 00102 * Another reason to queue is that some applications cannot block for 00103 * I/O, yet they want to send messages so large that a single write() 00104 * operation would not be able to cope with them. In such cases we 00105 * need to queue the data and use the Reactor to drain the queue. 00106 * 00107 * Therefore, the Transport class may need to use a queue to 00108 * temporarily hold the messages, and, in some configurations, it may 00109 * need to use the Reactor to concurrently drain such queues. 00110 * 00111 * <H4>Out of order messages:</H4> TAO provides explicit policies to 00112 * send 'urgent' messages. Such messages may put at the head of the 00113 * queue. However, they cannot be sent immediately because the 00114 * transport may already be sending another message in a reactive 00115 * fashion. 00116 * 00117 * Consequently, the Transport must also know if the head of the queue 00118 * has been partially sent. In that case new messages can only follow 00119 * the head. Only once the head is completely sent we can start 00120 * sending new messages. 00121 * 00122 * <H4>Waiting threads:</H4> One or more threads can be blocked 00123 * waiting for the connection to completely send the message. 00124 * The thread should return as soon as its message has been sent, so a 00125 * per-thread condition is required. This suggest that simply using a 00126 * ACE_Message_Queue would not be enough: there is a significant 00127 * amount of ancillary information, to keep on each message that the 00128 * Message_Block class does not provide room for. 00129 * 00130 * Blocking I/O is still attractive for some applications. First, my 00131 * eliminating the Reactor overhead performance is improved when 00132 * sending large blocks of data. Second, using the Reactor to send 00133 * out data opens the door for nested upcalls, yet some applications 00134 * cannot deal with the reentrancy issues in this case. 00135 * 00136 * <H4>Timeouts:</H4> Some or all messages could have a timeout period 00137 * attached to them. The timeout source could either be some 00138 * high-level policy or maybe some strategy to prevent denial of 00139 * service attacks. In any case the timeouts are per-message, and 00140 * later messages could have shorter timeouts. 00141 * In fact, some kind of scheduling (such as EDF) could be required in 00142 * a few applications. 00143 * 00144 * <H4>Conclusions:</H4> The outgoing data path consist in several 00145 * components: 00146 * 00147 * - A queue of pending messages 00148 * - A message currently being transmitted 00149 * - A per-transport 'send strategy' to choose between blocking on 00150 * write, blocking on the reactor or blockin on leader/follower. 00151 * - A per-message 'waiting object' 00152 * - A per-message timeout 00153 * 00154 * The Transport object provides a single method to send request 00155 * messages (send_request_message ()). 00156 * 00157 * <H3>The incoming data path:</H3> 00158 * 00159 * One of the main responsibilities of the transport is to read and 00160 * process the incoming GIOP message as quickly and efficiently as 00161 * possible. There are other forces that needs to be given due 00162 * consideration. They are 00163 * - Multiple threads should be able to traverse along the same data 00164 * path but should not be able to read from the same handle at the 00165 * same time ie. the handle should not be shared between threads at 00166 * any instant. 00167 * - Reads on the handle could give one or more messages. 00168 * - Minimise locking and copying overhead when trying to attack the 00169 * above. 00170 * 00171 * <H3> Parsing messages (GIOP) & processing the message:</H3> 00172 * 00173 * The messages should be checked for validity and the right 00174 * information should be sent to the higher layer for processing. The 00175 * process of doing a sanity check and preparing the messages for the 00176 * higher layers of the ORB are done by the messaging protocol. 00177 * 00178 * <H3> Design forces and Challenges </H3> 00179 * 00180 * To keep things as efficient as possible for medium sized requests, 00181 * it would be good to minimise data copying and locking along the 00182 * incoming path ie. from the time of reading the data from the handle 00183 * to the application. We achieve this by creating a buffer on stack 00184 * and reading the data from the handle into the buffer. We then pass 00185 * the same data block (the buffer is encapsulated into a data block) 00186 * to the higher layers of the ORB. The problems stem from the 00187 * following 00188 * (a) Data is bigger than the buffer that we have on stack 00189 * (b) Transports like TCP do not guarantee availability of the whole 00190 * chunk of data in one shot. Data could trickle in byte by byte. 00191 * (c) Single read gives multiple messages 00192 * 00193 * We solve the problems as follows 00194 * 00195 * (a) First do a read with the buffer on stack. Query the underlying 00196 * messaging object whether the message has any incomplete 00197 * portion. If so, data will be copied into new buffer being able 00198 * to hold full message and is queued; succeeding events will read 00199 * data from socket and write directly into this buffer. 00200 * Otherwise, if if the message in local buffer is complete, we free 00201 * the handle and then send the message to the higher layers of the 00202 * ORB for processing. 00203 * 00204 * (b) If buffer with incomplete message has been enqueued, while trying 00205 * to do the above, the reactor will call us back when the handle 00206 * becomes read ready. The read-operation will copy data directly 00207 * into the enqueued buffer. If the message has bee read completely 00208 * the message is sent to the higher layers of the ORB for processing. 00209 * 00210 * (c) If we get multiple messages (possible if the client connected 00211 * to the server sends oneways or AMI requests), we parse and 00212 * split the messages. Every message is put in the queue. Once 00213 * the messages are queued, the thread picks up one message to 00214 * send to the higher layers of the ORB. Before doing that, if 00215 * it finds more messages, it sends a notify to the reactor 00216 * without resuming the handle. The next thread picks up a 00217 * message from the queue and processes that. Once the queue 00218 * is drained the last thread resumes the handle. 00219 * 00220 * <H3> Sending Replies </H3> 00221 * 00222 * We could use the outgoing path of the ORB to send replies. This 00223 * would allow us to reuse most of the code in the outgoing data 00224 * path. We were doing this till TAO-1.2.3. We run in to 00225 * problems. When writing the reply the ORB gets flow controlled, and the 00226 * ORB tries to flush the message by going into the reactor. This 00227 * resulted in unnecessary nesting. The thread that gets into the 00228 * Reactor could potentially handle other messages (incoming or 00229 * outgoing) and the stack starts growing leading to crashes. 00230 * 00231 * <H4> Solution to the nesting problem </H4> 00232 * 00233 * The solution that we (plan to) adopt is pretty straight 00234 * forward. The thread sending replies will not block to send the 00235 * replies but queue the replies and return to the Reactor. (Note the 00236 * careful usages of the terms "blocking in the Reactor" as opposed to 00237 * "return back to the Reactor". 00238 * 00239 * 00240 * <B>See Also:</B> 00241 * 00242 * https://svn.dre.vanderbilt.edu/viewvc/Middleware/trunk/TAO/docs/pluggable_protocols/index.html?revision=HEAD 00243 * 00244 */ 00245 class TAO_Export TAO_Transport 00246 { 00247 public: 00248 00249 /// Default creator, requires the tag value be supplied. 00250 TAO_Transport (CORBA::ULong tag, 00251 TAO_ORB_Core *orb_core, 00252 size_t input_cdr_size = ACE_CDR::DEFAULT_BUFSIZE); 00253 00254 /// Destructor 00255 virtual ~TAO_Transport (void); 00256 00257 /// Return the protocol tag. 00258 /** 00259 * The OMG assigns unique tags (a 32-bit unsigned number) to each 00260 * protocol. New protocol tags can be obtained free of charge from 00261 * the OMG, check the documents in corbafwd.h for more details. 00262 */ 00263 CORBA::ULong tag (void) const; 00264 00265 /// Access the ORB that owns this connection. 00266 TAO_ORB_Core *orb_core (void) const; 00267 00268 /// Get the TAO_Tranport_Mux_Strategy used by this object. 00269 /** 00270 * The role of the TAO_Transport_Mux_Strategy is described in more 00271 * detail in that class' documentation. Enough is to say that the 00272 * class is used to control how many threads can have pending 00273 * requests over the same connection. Multiplexing multiple threads 00274 * over the same connection conserves resources and is almost 00275 * required for AMI, but having only one pending request per 00276 * connection is more efficient and reduces the possibilities of 00277 * priority inversions. 00278 */ 00279 TAO_Transport_Mux_Strategy *tms (void) const; 00280 00281 /// Return the TAO_Wait_Strategy used by this object. 00282 /** 00283 * The role of the TAO_Wait_Strategy is described in more detail in 00284 * that class' documentation. Enough is to say that the ORB can wait 00285 * for a reply blocking on read(), using the Reactor to wait for 00286 * multiple events concurrently or using the Leader/Followers 00287 * protocol. 00288 */ 00289 TAO_Wait_Strategy *wait_strategy (void) const; 00290 00291 /// Callback method to reactively drain the outgoing data queue 00292 int handle_output (ACE_Time_Value *max_wait_time); 00293 00294 /// Get the bidirectional flag 00295 int bidirectional_flag (void) const; 00296 00297 /// Set the bidirectional flag 00298 void bidirectional_flag (int flag); 00299 00300 /// Set the Cache Map entry 00301 void cache_map_entry (TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *entry); 00302 00303 /// Get the Cache Map entry 00304 TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *cache_map_entry (void); 00305 00306 /// Set and Get the identifier for this transport instance. 00307 /** 00308 * If not set, this will return an integer representation of 00309 * the <code>this</code> pointer for the instance on which 00310 * it's called. 00311 */ 00312 size_t id (void) const; 00313 void id (size_t id); 00314 00315 /** 00316 * Methods dealing with the role of the connection, e.g., CLIENT or SERVER. 00317 * See CORBA 2.6 Specification, Section 15.5.1 for origin of definitions. 00318 */ 00319 TAO::Connection_Role opened_as (void) const; 00320 void opened_as (TAO::Connection_Role); 00321 00322 /// Get and Set the purging order. The purging strategy uses the set 00323 /// version to set the purging order. 00324 unsigned long purging_order (void) const; 00325 void purging_order(unsigned long value); 00326 00327 /// Check if there are messages pending in the queue 00328 /** 00329 * @return true if the queue is empty 00330 */ 00331 bool queue_is_empty (void); 00332 00333 /// Added event handler to the handlers set. 00334 /** 00335 * Called by the cache when the cache is closing. 00336 * 00337 * @param handlers The TAO_Connection_Handler_Set into which the 00338 * transport should place its handler 00339 */ 00340 void provide_handler (TAO::Connection_Handler_Set &handlers); 00341 00342 /// Add event handlers corresponding to transports that have RW wait 00343 /// strategy to the handlers set. 00344 /** 00345 * Called by the cache when the ORB is shuting down. 00346 * 00347 * @param handlers The TAO_Connection_Handler_Set into which the 00348 * transport should place its handler if the transport has RW 00349 * strategy on. 00350 * 00351 * @return true indicates a handler was added to the handler set. 00352 * false indocates that the transport did not have a 00353 * blockable handler that could be added. 00354 */ 00355 bool provide_blockable_handler (TAO::Connection_Handler_Set &handlers); 00356 00357 /// Register the handler with the reactor. 00358 /** 00359 * Register the handler with the reactor. This method is used by the 00360 * Wait_On_Reactor strategy. The transport must register its event 00361 * handler with the ORB's Reactor. 00362 * 00363 * @todo I think this method is pretty much useless, the 00364 * connections are *always* registered with the Reactor, except in 00365 * thread-per-connection mode. In that case putting the connection 00366 * in the Reactor would produce unpredictable results anyway. 00367 */ 00368 virtual int register_handler (void); 00369 00370 /// Write the complete Message_Block chain to the connection. 00371 /** 00372 * This method serializes on handler_lock_, guaranteeing that only 00373 * thread can execute it on the same instance concurrently. 00374 * 00375 * Often the implementation simply forwards the arguments to the 00376 * underlying ACE_Svc_Handler class. Using the code factored out 00377 * into ACE. 00378 * 00379 * Be careful with protocols that perform non-trivial 00380 * transformations of the data, such as SSLIOP or protocols that 00381 * compress the stream. 00382 * 00383 * @param iov contains the data that must be sent. 00384 * 00385 * @param timeout is the maximum time that the application is 00386 * willing to wait for the data to be sent, useful in platforms that 00387 * implement timed writes. 00388 * The timeout value is obtained from the policies set by the 00389 * application. 00390 * 00391 * @param bytes_transferred should return the total number of bytes 00392 * successfully transferred before the connection blocked. This is 00393 * required because in some platforms and/or protocols multiple 00394 * system calls may be required to send the chain of message 00395 * blocks. The first few calls can work successfully, but the final 00396 * one can fail or signal a flow control situation (via EAGAIN). 00397 * In this case the ORB expects the function to return -1, errno to 00398 * be appropriately set and this argument to return the number of 00399 * bytes already on the OS I/O subsystem. 00400 * 00401 * This call can also fail if the transport instance is no longer 00402 * associated with a connection (e.g., the connection handler closed 00403 * down). In that case, it returns -1 and sets errno to 00404 * <code>ENOENT</code>. 00405 */ 00406 virtual ssize_t send (iovec *iov, 00407 int iovcnt, 00408 size_t &bytes_transferred, 00409 const ACE_Time_Value *timeout = 0) = 0; 00410 00411 #if TAO_HAS_SENDFILE == 1 00412 /// Send data through zero-copy write mechanism, if available. 00413 /** 00414 * This method sends the data in the I/O vector through the platform 00415 * sendfile() function to perform a zero-copy write, if available. 00416 * Otherwise, the default fallback implementation simply delegates 00417 * to the TAO_Transport::send() method. 00418 * 00419 * @note This method is best used when sending very large blocks of 00420 * data. 00421 */ 00422 virtual ssize_t sendfile (TAO_MMAP_Allocator * allocator, 00423 iovec * iov, 00424 int iovcnt, 00425 size_t &bytes_transferred, 00426 ACE_Time_Value const * timeout = 0); 00427 #endif /* TAO_HAS_SENDFILE==1 */ 00428 00429 00430 /// Read len bytes from into buf. 00431 /** 00432 * This method serializes on handler_lock_, guaranteeing that only 00433 * thread can execute it on the same instance concurrently. 00434 * 00435 * @param buffer ORB allocated buffer where the data should be 00436 * @@ The ACE_Time_Value *s is just a place holder for now. It is 00437 * not clear this this is the best place to specify this. The actual 00438 * timeout values will be kept in the Policies. 00439 */ 00440 virtual ssize_t recv (char *buffer, 00441 size_t len, 00442 const ACE_Time_Value *timeout = 0) = 0; 00443 00444 /** 00445 * @name Control connection lifecycle 00446 * 00447 * These methods are routed through the TMS object. The TMS 00448 * strategies implement them correctly. 00449 */ 00450 //@{ 00451 00452 /// Request has been just sent, but the reply is not received. Idle 00453 /// the transport now. 00454 bool idle_after_send (void); 00455 00456 /// Request is sent and the reply is received. Idle the transport 00457 /// now. 00458 bool idle_after_reply (void); 00459 00460 /// Call the implementation method after obtaining the lock. 00461 virtual void close_connection (void); 00462 00463 //@} 00464 00465 /** @name Template methods 00466 * 00467 * The Transport class uses the Template Method Pattern to implement 00468 * the protocol specific functionality. 00469 * Implementors of a pluggable protocol should override the 00470 * following methods with the semantics documented below. 00471 */ 00472 /** 00473 * Initialising the messaging object. This would be used by the 00474 * connector side. On the acceptor side the connection handler 00475 * would take care of the messaging objects. 00476 */ 00477 void messaging_init (TAO_GIOP_Message_Version const &version); 00478 00479 /// Extracts the list of listen points from the @a cdr stream. The 00480 /// list would have the protocol specific details of the 00481 /// ListenPoints 00482 virtual int tear_listen_point_list (TAO_InputCDR &cdr); 00483 00484 /// Hooks that can be overridden in concrete transports. 00485 /** 00486 * These hooks are invoked just after connection establishment (or 00487 * after a connection is fetched from cache). The 00488 * return value signifies whether the invoker should proceed with 00489 * post connection establishment activities. Protocols like SSLIOP 00490 * need this to verify whether connections already established have 00491 * valid certificates. There are no pre_connect_hooks () since the 00492 * transport doesn't exist before a connection establishment. :-) 00493 * 00494 * @note The methods are not made const with a reason. 00495 */ 00496 virtual bool post_connect_hook (void); 00497 00498 /// Memory management routines. 00499 /* 00500 * Forwards to event handler. 00501 */ 00502 ACE_Event_Handler::Reference_Count add_reference (void); 00503 ACE_Event_Handler::Reference_Count remove_reference (void); 00504 00505 /// Return the messaging object that is used to format the data that 00506 /// needs to be sent. 00507 TAO_GIOP_Message_Base * messaging_object (void); 00508 00509 /** @name Template methods 00510 * 00511 * The Transport class uses the Template Method Pattern to implement 00512 * the protocol specific functionality. 00513 * Implementors of a pluggable protocol should override the 00514 * following methods with the semantics documented below. 00515 */ 00516 //@{ 00517 00518 /// Return the event handler used to receive notifications from the 00519 /// Reactor. 00520 /** 00521 * Normally a concrete TAO_Transport object has-a ACE_Event_Handler 00522 * member that functions as an adapter between the ACE_Reactor 00523 * framework and the TAO pluggable protocol framework. 00524 * In all the protocols implemented so far this role is fullfilled 00525 * by an instance of ACE_Svc_Handler. 00526 * 00527 * @todo Since we only use a limited functionality of 00528 * ACE_Svc_Handler we could probably implement a generic 00529 * adapter class (TAO_Transport_Event_Handler or something), this 00530 * will reduce footprint and simplify the process of implementing a 00531 * pluggable protocol. 00532 * 00533 * @todo This method has to be renamed to event_handler() 00534 */ 00535 virtual ACE_Event_Handler * event_handler_i (void) = 0; 00536 00537 /// Is this transport really connected 00538 bool is_connected (void) const; 00539 00540 /// Perform all the actions when this transport get opened 00541 bool post_open (size_t id); 00542 00543 /// do what needs to be done when closing the transport 00544 void pre_close (void); 00545 00546 /// Get the connection handler for this transport 00547 TAO_Connection_Handler * connection_handler (void); 00548 00549 /// Accessor for the output CDR stream 00550 TAO_OutputCDR &out_stream (void); 00551 00552 /* 00553 * Specialization hook to add public methods from 00554 * concrete transport implementations to TAO's transport 00555 * class 00556 */ 00557 //@@ TAO_TRANSPORT_SPL_PUBLIC_METHODS_ADD_HOOK 00558 00559 protected: 00560 00561 virtual TAO_Connection_Handler * connection_handler_i (void) = 0; 00562 00563 public: 00564 00565 /// This is a request for the transport object to write a 00566 /// LocateRequest header before it is sent out. 00567 int generate_locate_request (TAO_Target_Specification &spec, 00568 TAO_Operation_Details &opdetails, 00569 TAO_OutputCDR &output); 00570 00571 /// This is a request for the transport object to write a request 00572 /// header before it sends out the request 00573 virtual int generate_request_header (TAO_Operation_Details &opd, 00574 TAO_Target_Specification &spec, 00575 TAO_OutputCDR &msg); 00576 00577 /// Recache ourselves in the cache 00578 int recache_transport (TAO_Transport_Descriptor_Interface* desc); 00579 00580 /// Callback to read incoming data 00581 /** 00582 * The ACE_Event_Handler adapter invokes this method as part of its 00583 * handle_input() operation. 00584 * 00585 * @todo the method name is confusing! Calling it handle_input() 00586 * would probably make things easier to understand and follow! 00587 * 00588 * Once a complete message is read the Transport class delegates on 00589 * the Messaging layer to invoke the right upcall (on the server) or 00590 * the TAO_Reply_Dispatcher (on the client side). 00591 * 00592 * @param max_wait_time In some cases the I/O is synchronous, e.g. a 00593 * thread-per-connection server or when Wait_On_Read is enabled. In 00594 * those cases a maximum read time can be specified. 00595 */ 00596 virtual int handle_input (TAO_Resume_Handle &rh, 00597 ACE_Time_Value *max_wait_time = 0); 00598 00599 enum TAO_Message_Semantics 00600 { 00601 TAO_ONEWAY_REQUEST = 0, 00602 TAO_TWOWAY_REQUEST = 1, 00603 TAO_REPLY 00604 }; 00605 00606 /// Prepare the waiting and demuxing strategy to receive a reply for 00607 /// a new request. 00608 /** 00609 * Preparing the ORB to receive the reply only once the request is 00610 * completely sent opens the system to some subtle race conditions: 00611 * suppose the ORB is running in a multi-threaded configuration, 00612 * thread A makes a request while thread B is using the Reactor to 00613 * process all incoming requests. 00614 * Thread A could be implemented as follows: 00615 * 1) send the request 00616 * 2) setup the ORB to receive the reply 00617 * 3) wait for the request 00618 * 00619 * but in this case thread B may receive the reply between step (1) 00620 * and (2), and drop it as an invalid or unexpected message. 00621 * Consequently the correct implementation is: 00622 * 1) setup the ORB to receive the reply 00623 * 2) send the request 00624 * 3) wait for the reply 00625 * 00626 * The following method encapsulates this idiom. 00627 * 00628 * @todo This is generic code, it should be factored out into the 00629 * Transport class. 00630 */ 00631 // @nolock b/c this calls send_or_buffer 00632 virtual int send_request (TAO_Stub *stub, 00633 TAO_ORB_Core *orb_core, 00634 TAO_OutputCDR &stream, 00635 TAO_Message_Semantics message_semantics, 00636 ACE_Time_Value *max_time_wait) = 0; 00637 00638 /// This method formats the stream and then sends the message on the 00639 /// transport. 00640 /** 00641 * Once the ORB is prepared to receive a reply (see send_request() 00642 * above), and all the arguments have been marshaled the CDR stream 00643 * must be 'formatted', i.e. the message_size field in the GIOP 00644 * header can finally be set to the proper value. 00645 * 00646 */ 00647 virtual int send_message (TAO_OutputCDR &stream, 00648 TAO_Stub *stub = 0, 00649 TAO_Message_Semantics message_semantics = TAO_Transport::TAO_TWOWAY_REQUEST, 00650 ACE_Time_Value *max_time_wait = 0) = 0; 00651 00652 /// Sent the contents of @a message_block 00653 /** 00654 * @param stub The object reference used for this operation, useful 00655 * to obtain the current policies. 00656 * @param message_semantics If this is set to TAO_TWO_REQUEST 00657 * this method will block until the operation is completely 00658 * written on the wire. If it is set to other values this 00659 * operation could return. 00660 * @param message_block The CDR encapsulation of the GIOP message 00661 * that must be sent. The message may consist of 00662 * multiple Message Blocks chained through the cont() 00663 * field. 00664 * @param max_wait_time The maximum time that the operation can 00665 * block, used in the implementation of timeouts. 00666 */ 00667 virtual int send_message_shared (TAO_Stub *stub, 00668 TAO_Message_Semantics message_semantics, 00669 const ACE_Message_Block *message_block, 00670 ACE_Time_Value *max_wait_time); 00671 00672 protected: 00673 00674 /// Process the message by sending it to the higher layers of the 00675 /// ORB. 00676 int process_parsed_messages (TAO_Queued_Data *qd, 00677 TAO_Resume_Handle &rh); 00678 00679 /// Implement send_message_shared() assuming the handler_lock_ is 00680 /// held. 00681 int send_message_shared_i (TAO_Stub *stub, 00682 TAO_Message_Semantics message_semantics, 00683 const ACE_Message_Block *message_block, 00684 ACE_Time_Value *max_wait_time); 00685 00686 /// Queue a message for @a message_block 00687 /// @param max_wait_time The maximum time that the operation can 00688 /// block, used in the implementation of timeouts. 00689 /// @param back If true, the message will be pushed to the back of the queue. 00690 /// If false, the message will be pushed to the front of the queue. 00691 int queue_message_i (const ACE_Message_Block *message_block, 00692 ACE_Time_Value *max_wait_time, bool back=true); 00693 00694 public: 00695 /// Format and queue a message for @a stream 00696 /// @param max_wait_time The maximum time that the operation can 00697 /// block, used in the implementation of timeouts. 00698 int format_queue_message (TAO_OutputCDR &stream, 00699 ACE_Time_Value *max_wait_time); 00700 00701 /// Send a message block chain, 00702 int send_message_block_chain (const ACE_Message_Block *message_block, 00703 size_t &bytes_transferred, 00704 ACE_Time_Value *max_wait_time = 0); 00705 00706 /// Send a message block chain, assuming the lock is held 00707 int send_message_block_chain_i (const ACE_Message_Block *message_block, 00708 size_t &bytes_transferred, 00709 ACE_Time_Value *max_wait_time); 00710 /// Cache management 00711 int purge_entry (void); 00712 00713 /// Cache management 00714 int make_idle (void); 00715 00716 /// Cache management 00717 int update_transport (void); 00718 00719 /// The timeout callback, invoked when any of the timers related to 00720 /// this transport expire. 00721 /** 00722 * @param current_time The current time as reported from the Reactor 00723 * @param act The Asynchronous Completion Token. Currently it is 00724 * interpreted as follows: 00725 * - If the ACT is the address of this->current_deadline_ the 00726 * queueing timeout has expired and the queue should start 00727 * flushing. 00728 * 00729 * @return Returns 0 if there are no problems, -1 if there is an 00730 * error 00731 * 00732 * @todo In the future this function could be used to expire 00733 * messages (oneways) that have been sitting for too long on 00734 * the queue. 00735 */ 00736 int handle_timeout (const ACE_Time_Value ¤t_time, const void* act); 00737 00738 /// Accessor to recv_buffer_size_ 00739 size_t recv_buffer_size (void) const; 00740 00741 /// Accessor to sent_byte_count_ 00742 size_t sent_byte_count (void) const; 00743 00744 /// CodeSet Negotiation - Get the char codeset translator factory 00745 TAO_Codeset_Translator_Base *char_translator (void) const; 00746 00747 /// CodeSet Negotiation - Get the wchar codeset translator factory 00748 TAO_Codeset_Translator_Base *wchar_translator (void) const; 00749 00750 /// CodeSet negotiation - Set the char codeset translator factory 00751 void char_translator (TAO_Codeset_Translator_Base *); 00752 00753 /// CodeSet negotiation - Set the wchar codeset translator factory 00754 void wchar_translator (TAO_Codeset_Translator_Base *); 00755 00756 /// Use the Transport's codeset factories to set the translator for input 00757 /// and output CDRs. 00758 void assign_translators (TAO_InputCDR *, TAO_OutputCDR *); 00759 00760 /// It is necessary to clear the codeset translator when a CDR stream 00761 /// is used for more than one GIOP message. This is required since the 00762 /// header must not be translated, whereas the body must be. 00763 void clear_translators (TAO_InputCDR *, TAO_OutputCDR *); 00764 00765 /// Return true if the tcs has been set 00766 CORBA::Boolean is_tcs_set() const; 00767 00768 /// Set the state of the first_request_ flag to 0 00769 void first_request_sent(); 00770 00771 /// Notify all the components inside a Transport when the underlying 00772 /// connection is closed. 00773 void send_connection_closed_notifications (void); 00774 00775 /// Transport statistics 00776 TAO::Transport::Stats* stats (void) const; 00777 00778 private: 00779 00780 /// Helper method that returns the Transport Cache Manager. 00781 TAO::Transport_Cache_Manager &transport_cache_manager (void); 00782 00783 /// Send some of the data in the queue. 00784 /** 00785 * As the outgoing data is drained this method is invoked to send as 00786 * much of the current message as possible. 00787 * 00788 * Returns 0 if there is more data to send, -1 if there was an error 00789 * and 1 if the message was completely sent. 00790 */ 00791 int drain_queue (ACE_Time_Value *max_wait_time); 00792 00793 /// Implement drain_queue() assuming the lock is held 00794 int drain_queue_i (ACE_Time_Value *max_wait_time); 00795 00796 /// Check if there are messages pending in the queue 00797 /** 00798 * This version assumes that the lock is already held. Use with 00799 * care! 00800 * 00801 * @return true if the queue is empty 00802 */ 00803 bool queue_is_empty_i (void); 00804 00805 /// A helper routine used in drain_queue_i() 00806 int drain_queue_helper (int &iovcnt, iovec iov[], ACE_Time_Value *max_wait_time); 00807 00808 /// These classes need privileged access to: 00809 /// - schedule_output_i() 00810 /// - cancel_output_i() 00811 friend class TAO_Reactive_Flushing_Strategy; 00812 friend class TAO_Leader_Follower_Flushing_Strategy; 00813 00814 /// Needs priveleged access to 00815 /// event_handler_i () 00816 friend class TAO_Thread_Per_Connection_Handler; 00817 00818 /// Schedule handle_output() callbacks 00819 int schedule_output_i (void); 00820 00821 /// Cancel handle_output() callbacks 00822 int cancel_output_i (void); 00823 00824 /// Cleanup the queue. 00825 /** 00826 * Exactly @a byte_count bytes have been sent, the queue must be 00827 * cleaned up as potentially several messages have been completely 00828 * sent out. 00829 * It leaves on head_ the next message to send out. 00830 */ 00831 void cleanup_queue (size_t byte_count); 00832 00833 /// Cleanup the complete queue 00834 void cleanup_queue_i (); 00835 00836 /// Check if the buffering constraints have been reached 00837 int check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush); 00838 00839 /// Send a synchronous message, i.e. block until the message is on 00840 /// the wire 00841 int send_synchronous_message_i (const ACE_Message_Block *message_block, 00842 ACE_Time_Value *max_wait_time); 00843 00844 /// Send a reply message, i.e. do not block until the message is on 00845 /// the wire, but just return after adding them to the queue. 00846 int send_reply_message_i (const ACE_Message_Block *message_block, 00847 ACE_Time_Value *max_wait_time); 00848 00849 /// Send an asynchronous message, i.e. do not block until the message is on 00850 /// the wire 00851 int send_asynchronous_message_i (TAO_Stub *stub, 00852 const ACE_Message_Block *message_block, 00853 ACE_Time_Value *max_wait_time); 00854 00855 /// A helper method used by send_synchronous_message_i() and 00856 /// send_reply_message_i(). Reusable code that could be used by both 00857 /// the methods. 00858 int send_synch_message_helper_i (TAO_Synch_Queued_Message &s, 00859 ACE_Time_Value *max_wait_time); 00860 00861 /// Check if the flush timer is still pending 00862 int flush_timer_pending (void) const; 00863 00864 /// The flush timer expired or was explicitly cancelled, mark it as 00865 /// not pending 00866 void reset_flush_timer (void); 00867 00868 /// Print out error messages if the event handler is not valid 00869 void report_invalid_event_handler (const char *caller); 00870 00871 /// Is invoked by handle_input operation. It consolidate message on 00872 /// top of incoming_message_stack. The amount of missing data is 00873 /// known and recv operation copies data directly into message buffer, 00874 /// as much as a single recv-invocation provides. 00875 int handle_input_missing_data (TAO_Resume_Handle &rh, 00876 ACE_Time_Value *max_wait_time, 00877 TAO_Queued_Data *q_data); 00878 00879 /// Is invoked by handle_input operation. It parses new messages from input stream 00880 /// or consolidates messages whose header has been partially read, the message 00881 /// size being unknown so far. It parses as much data as a single recv-invocation provides. 00882 int handle_input_parse_data (TAO_Resume_Handle &rh, 00883 ACE_Time_Value *max_wait_time); 00884 00885 /// Is invoked by handle_input_parse_data. Parses all messages remaining 00886 /// in @a message_block. 00887 int handle_input_parse_extra_messages (ACE_Message_Block &message_block); 00888 00889 /// @return -1 error, otherwise 0 00890 int consolidate_enqueue_message (TAO_Queued_Data *qd); 00891 00892 /// @return -1 error, otherwise 0 00893 int consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh); 00894 00895 /* 00896 * Process the message that is in the head of the incoming queue. 00897 * If there are more messages in the queue, this method calls 00898 * this->notify_reactor () to wake up a thread 00899 * @retval -1 on error 00900 * @retval 0 if successfully processing enqueued messages 00901 * @retval 1 if no message present in queue 00902 */ 00903 int process_queue_head (TAO_Resume_Handle &rh); 00904 00905 /* 00906 * This call prepares a new handler for the notify call and sends a 00907 * notify () call to the reactor. 00908 */ 00909 int notify_reactor (void); 00910 00911 /// Assume the lock is held 00912 void send_connection_closed_notifications_i (void); 00913 00914 /// Allocate a partial message block and store it in our 00915 /// partial_message_ data member. 00916 void allocate_partial_message_block (void); 00917 00918 // Disallow copying and assignment. 00919 TAO_Transport (const TAO_Transport&); 00920 void operator= (const TAO_Transport&); 00921 00922 /* 00923 * Specialization hook to add concrete private methods from 00924 * TAO's protocol implementation onto the base Transport class 00925 */ 00926 00927 //@@ TAO_TRANSPORT_SPL_PRIVATE_METHODS_ADD_HOOK 00928 00929 protected: 00930 00931 /// IOP protocol tag. 00932 CORBA::ULong const tag_; 00933 00934 /// Global orbcore resource. 00935 TAO_ORB_Core * const orb_core_; 00936 00937 /// Our entry in the cache. We don't own this. It is here for our 00938 /// convenience. We cannot just change things around. 00939 TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *cache_map_entry_; 00940 00941 /// Strategy to decide whether multiple requests can be sent over the 00942 /// same connection or the connection is exclusive for a request. 00943 TAO_Transport_Mux_Strategy *tms_; 00944 00945 /// Strategy for waiting for the reply after sending the request. 00946 TAO_Wait_Strategy *ws_; 00947 00948 /// Use to check if bidirectional info has been synchronized with 00949 /// the peer. 00950 /** 00951 * Have we sent any info on bidirectional information or have we 00952 * received any info regarding making the connection served by this 00953 * transport bidirectional. 00954 * The flag is used as follows: 00955 * + We dont want to send the bidirectional context info more than 00956 * once on the connection. Why? Waste of marshalling and 00957 * demarshalling time on the client. 00958 * + On the server side -- once a client that has established the 00959 * connection asks the server to use the connection both ways, we 00960 * *dont* want the server to pack service info to the client. That 00961 * is not allowed. We need a flag to prevent such a things from 00962 * happening. 00963 * 00964 * The value of this flag will be 0 if the client sends info and 1 00965 * if the server receives the info. 00966 */ 00967 int bidirectional_flag_; 00968 00969 TAO::Connection_Role opening_connection_role_; 00970 00971 /// Implement the outgoing data queue 00972 TAO_Queued_Message *head_; 00973 TAO_Queued_Message *tail_; 00974 00975 /// Queue of the consolidated, incoming messages.. 00976 TAO_Incoming_Message_Queue incoming_message_queue_; 00977 00978 /// Stack of incoming fragments, consolidated messages 00979 /// are going to be enqueued in "incoming_message_queue_" 00980 TAO::Incoming_Message_Stack incoming_message_stack_; 00981 00982 /// The queue will start draining no later than <queeing_deadline_> 00983 /// *if* the deadline is 00984 ACE_Time_Value current_deadline_; 00985 00986 /// The timer ID 00987 long flush_timer_id_; 00988 00989 /// The adapter used to receive timeout callbacks from the Reactor 00990 TAO_Transport_Timer transport_timer_; 00991 00992 /// Lock that insures that activities that *might* use handler-related 00993 /// resources (such as a connection handler) get serialized. 00994 /** 00995 * This is an <code>ACE_Lock</code> that gets initialized from 00996 * @c TAO_ORB_Core::resource_factory()->create_cached_connection_lock(). 00997 * This way, one can use a lock appropriate for the type of system, i.e., 00998 * a null lock for single-threaded systems, and a real lock for 00999 * multi-threaded systems. 01000 */ 01001 mutable ACE_Lock *handler_lock_; 01002 01003 /// A unique identifier for the transport. 01004 /** 01005 * This never *never* changes over the lifespan, so we don't have to worry 01006 * about locking it. 01007 * 01008 * HINT: Protocol-specific transports that use connection handler 01009 * might choose to set this to the handle for their connection. 01010 */ 01011 size_t id_; 01012 01013 /// Used by the LRU, LFU and FIFO Connection Purging Strategies. 01014 unsigned long purging_order_; 01015 01016 /// Size of the buffer received. 01017 size_t recv_buffer_size_; 01018 01019 /// Number of bytes sent. 01020 size_t sent_byte_count_; 01021 01022 /// Is this transport really connected or not. In case of oneways with 01023 /// SYNC_NONE Policy we don't wait until the connection is ready and we 01024 /// buffer the requests in this transport until the connection is ready 01025 bool is_connected_; 01026 01027 /// Our messaging object. 01028 TAO_GIOP_Message_Base *messaging_object_; 01029 01030 private: 01031 01032 /// @@Phil, I think it would be nice if we could think of a way to 01033 /// do the following. 01034 /// We have been trying to use the transport for marking about 01035 /// translator factories and such! IMHO this is a wrong encapulation 01036 /// ie. trying to populate the transport object with these 01037 /// details. We should probably have a class something like 01038 /// TAO_Message_Property or TAO_Message_Translator or whatever (I am 01039 /// sure you get the idea) and encapsulate all these 01040 /// details. Coupling these seems odd. if I have to be more cynical 01041 /// we can move this to the connection_handler and it may more sense 01042 /// with the DSCP stuff around there. Do you agree? 01043 01044 /// Additional member values required to support codeset translation 01045 TAO_Codeset_Translator_Base *char_translator_; 01046 TAO_Codeset_Translator_Base *wchar_translator_; 01047 01048 /// The tcs_set_ flag indicates that negotiation has occured and so the 01049 /// translators are correct, since a null translator is valid if both ends 01050 /// are using the same codeset, whatever that codeset might be. 01051 CORBA::Boolean tcs_set_; 01052 01053 /// First_request_ is true until the first request is sent or received. This 01054 /// is necessary since codeset context information is necessary only on the 01055 /// first request. After that, the translators are fixed for the life of the 01056 /// connection. 01057 CORBA::Boolean first_request_; 01058 01059 /// Holds the partial GIOP message (if there is one) 01060 ACE_Message_Block* partial_message_; 01061 01062 #if TAO_HAS_SENDFILE == 1 01063 /// mmap()-based allocator used to allocator output CDR buffers. 01064 /** 01065 * If this pointer is non-zero, sendfile() will be used to send data 01066 * in a TAO_OutputCDR stream instance. 01067 */ 01068 TAO_MMAP_Allocator * const mmap_allocator_; 01069 #endif /* TAO_HAS_SENDFILE==1 */ 01070 01071 #if TAO_HAS_TRANSPORT_CURRENT == 1 01072 /// Statistics 01073 TAO::Transport::Stats* stats_; 01074 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 01075 01076 /* 01077 * specialization hook to add class members from concrete 01078 * transport class onto the base transport class. Please 01079 * add any private members to this class *before* this hook. 01080 */ 01081 //@@ TAO_TRANSPORT_SPL_DATA_MEMBERS_ADD_HOOK 01082 }; 01083 01084 /* 01085 * Hook to add external typedefs and specializations to 01086 * TAO's transport implementation. 01087 */ 01088 01089 //@@ TAO_TRANSPORT_SPL_EXTERN_ADD_HOOK 01090 01091 #if TAO_HAS_TRANSPORT_CURRENT == 1 01092 namespace TAO 01093 { 01094 namespace Transport 01095 { 01096 /* 01097 * @class Stats 01098 * 01099 * @brief Used to collect stats on a transport. 01100 * 01101 * The base class in (potentialy) extensible hierarchy used to 01102 * specialize the information available for a specific protocol. 01103 * 01104 * This class is necessary for the implementation of the Transport 01105 * Current feature. 01106 * 01107 * <B>See Also:</B> 01108 * 01109 * https://svn.dre.vanderbilt.edu/viewvc/Middleware/trunk/TAO/docs/transport_current/index.html?revision=HEAD 01110 * 01111 */ 01112 class TAO_Export Stats 01113 { 01114 public: 01115 Stats (); 01116 virtual ~Stats (); 01117 01118 void messages_sent (size_t message_length); 01119 CORBA::LongLong messages_sent (void) const; 01120 CORBA::LongLong bytes_sent (void) const; 01121 01122 void messages_received (size_t message_length); 01123 CORBA::LongLong messages_received (void) const; 01124 CORBA::LongLong bytes_received (void) const; 01125 01126 void opened_since (const ACE_Time_Value& tv); 01127 const ACE_Time_Value& opened_since (void) const; 01128 01129 private: 01130 // The bytes_rcvd_.samples_count() could have been used instead, 01131 // however there was a suspicion that 32 bits would be 01132 // insufficient. 01133 CORBA::LongLong messages_rcvd_; 01134 01135 // The bytes_sent_.samples_count() could have been used instead, 01136 // however there was a suspicion that 32 bits would be 01137 // insufficient. 01138 CORBA::LongLong messages_sent_; 01139 01140 ACE_Basic_Stats bytes_rcvd_; 01141 ACE_Basic_Stats bytes_sent_; 01142 01143 ACE_Time_Value opened_since_; 01144 }; 01145 } 01146 } 01147 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 01148 01149 TAO_END_VERSIONED_NAMESPACE_DECL 01150 01151 #if defined (__ACE_INLINE__) 01152 # include "tao/Transport.inl" 01153 #endif /* __ACE_INLINE__ */ 01154 01155 #include /**/ "ace/post.h" 01156 01157 #endif /* TAO_TRANSPORT_H */