00001 // -*- C++ -*- 00002 00003 //============================================================================= 00004 /** 00005 * @file Transport.h 00006 * 00007 * $Id: Transport.h 79388 2007-08-17 16:05:00Z wilsond $ 00008 * 00009 * Define the interface for the Transport component in TAO's 00010 * pluggable protocol framework. 00011 * 00012 * @author Fred Kuhns <fredk@cs.wustl.edu> 00013 */ 00014 //============================================================================= 00015 00016 #ifndef TAO_TRANSPORT_H 00017 #define TAO_TRANSPORT_H 00018 00019 #include /**/ "ace/pre.h" 00020 00021 #include "tao/Transport_Cache_Manager.h" 00022 00023 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00024 # pragma once 00025 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00026 00027 #include "tao/Transport_Timer.h" 00028 #include "tao/Incoming_Message_Queue.h" 00029 #include "tao/Incoming_Message_Stack.h" 00030 #include "ace/Time_Value.h" 00031 #include "ace/Basic_Stats.h" 00032 00033 struct iovec; 00034 00035 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00036 00037 class TAO_ORB_Core; 00038 class TAO_Target_Specification; 00039 class TAO_Operation_Details; 00040 class TAO_Transport_Mux_Strategy; 00041 class TAO_Wait_Strategy; 00042 class TAO_Connection_Handler; 00043 class TAO_Pluggable_Messaging; 00044 class TAO_Codeset_Translator_Base; 00045 00046 class TAO_Queued_Message; 00047 class TAO_Synch_Queued_Message; 00048 class TAO_Resume_Handle; 00049 class TAO_Stub; 00050 class TAO_MMAP_Allocator; 00051 00052 namespace TAO 00053 { 00054 /** 00055 * @note Should this be in TAO namespace. Seems like a candidate 00056 * that should be in the transport 00057 */ 00058 enum Connection_Role 00059 { 00060 TAO_UNSPECIFIED_ROLE = 0, 00061 TAO_SERVER_ROLE = 1, 00062 TAO_CLIENT_ROLE = 2 00063 }; 00064 00065 namespace Transport 00066 { 00067 /// Transport-level statistics. Initially introduced to support 00068 /// the "Transport Current" functionality. 00069 class Stats; 00070 } 00071 } 00072 00073 /* 00074 * Specialization hook for the TAO's transport implementation. 00075 */ 00076 //@@ TAO_TRANSPORT_SPL_INCLUDE_FORWARD_DECL_ADD_HOOK 00077 00078 /** 00079 * @class TAO_Transport 00080 * 00081 * @brief Generic definitions for the Transport class. 00082 * 00083 * The transport object is created in the Service handler 00084 * constructor and deleted in the Service Handler's destructor!! 00085 * 00086 * The main responsability of a Transport object is to encapsulate a 00087 * connection, and provide a transport independent way to send and 00088 * receive data. Since TAO is heavily based on the Reactor for all if 00089 * not all its I/O the Transport class is usually implemented with a 00090 * helper Connection Handler that adapts the generic Transport 00091 * interface to the Reactor types. 00092 * 00093 * <H3>The outgoing data path:</H3> 00094 * 00095 * One of the responsibilities of the TAO_Transport class is to send 00096 * out GIOP messages as efficiently as possible. In most cases 00097 * messages are put out in FIFO order, the transport object will put 00098 * out the message using a single system call and return control to 00099 * the application. However, for oneways and AMI requests it may be 00100 * more efficient (or required if the SYNC_NONE policy is in effect) 00101 * to queue the messages until a large enough data set is available. 00102 * Another reason to queue is that some applications cannot block for 00103 * I/O, yet they want to send messages so large that a single write() 00104 * operation would not be able to cope with them. In such cases we 00105 * need to queue the data and use the Reactor to drain the queue. 00106 * 00107 * Therefore, the Transport class may need to use a queue to 00108 * temporarily hold the messages, and, in some configurations, it may 00109 * need to use the Reactor to concurrently drain such queues. 00110 * 00111 * <H4>Out of order messages:</H4> TAO provides explicit policies to 00112 * send 'urgent' messages. Such messages may put at the head of the 00113 * queue. However, they cannot be sent immediately because the 00114 * transport may already be sending another message in a reactive 00115 * fashion. 00116 * 00117 * Consequently, the Transport must also know if the head of the queue 00118 * has been partially sent. In that case new messages can only follow 00119 * the head. Only once the head is completely sent we can start 00120 * sending new messages. 00121 * 00122 * <H4>Waiting threads:</H4> One or more threads can be blocked 00123 * waiting for the connection to completely send the message. 00124 * The thread should return as soon as its message has been sent, so a 00125 * per-thread condition is required. This suggest that simply using a 00126 * ACE_Message_Queue would not be enough: there is a significant 00127 * amount of ancillary information, to keep on each message that the 00128 * Message_Block class does not provide room for. 00129 * 00130 * Blocking I/O is still attractive for some applications. First, my 00131 * eliminating the Reactor overhead performance is improved when 00132 * sending large blocks of data. Second, using the Reactor to send 00133 * out data opens the door for nested upcalls, yet some applications 00134 * cannot deal with the reentrancy issues in this case. 00135 * 00136 * <H4>Timeouts:</H4> Some or all messages could have a timeout period 00137 * attached to them. The timeout source could either be some 00138 * high-level policy or maybe some strategy to prevent denial of 00139 * service attacks. In any case the timeouts are per-message, and 00140 * later messages could have shorter timeouts. 00141 * In fact, some kind of scheduling (such as EDF) could be required in 00142 * a few applications. 00143 * 00144 * <H4>Conclusions:</H4> The outgoing data path consist in several 00145 * components: 00146 * 00147 * - A queue of pending messages 00148 * - A message currently being transmitted 00149 * - A per-transport 'send strategy' to choose between blocking on 00150 * write, blocking on the reactor or blockin on leader/follower. 00151 * - A per-message 'waiting object' 00152 * - A per-message timeout 00153 * 00154 * The Transport object provides a single method to send request 00155 * messages (send_request_message ()). 00156 * 00157 * <H3>The incoming data path:</H3> 00158 * 00159 * One of the main responsibilities of the transport is to read and 00160 * process the incoming GIOP message as quickly and efficiently as 00161 * possible. There are other forces that needs to be given due 00162 * consideration. They are 00163 * - Multiple threads should be able to traverse along the same data 00164 * path but should not be able to read from the same handle at the 00165 * same time ie. the handle should not be shared between threads at 00166 * any instant. 00167 * - Reads on the handle could give one or more messages. 00168 * - Minimise locking and copying overhead when trying to attack the 00169 * above. 00170 * 00171 * <H3> Parsing messages (GIOP) & processing the message:</H3> 00172 * 00173 * The messages should be checked for validity and the right 00174 * information should be sent to the higher layer for processing. The 00175 * process of doing a sanity check and preparing the messages for the 00176 * higher layers of the ORB are done by the messaging protocol. 00177 * 00178 * <H3> Design forces and Challenges </H3> 00179 * 00180 * To keep things as efficient as possible for medium sized requests, 00181 * it would be good to minimise data copying and locking along the 00182 * incoming path ie. from the time of reading the data from the handle 00183 * to the application. We achieve this by creating a buffer on stack 00184 * and reading the data from the handle into the buffer. We then pass 00185 * the same data block (the buffer is encapsulated into a data block) 00186 * to the higher layers of the ORB. The problems stem from the 00187 * following 00188 * (a) Data is bigger than the buffer that we have on stack 00189 * (b) Transports like TCP do not guarantee availability of the whole 00190 * chunk of data in one shot. Data could trickle in byte by byte. 00191 * (c) Single read gives multiple messages 00192 * 00193 * We solve the problems as follows 00194 * 00195 * (a) First do a read with the buffer on stack. Query the underlying 00196 * messaging object whether the message has any incomplete 00197 * portion. If so, data will be copied into new buffer being able 00198 * to hold full message and is queued; succeeding events will read 00199 * data from socket and write directly into this buffer. 00200 * Otherwise, if if the message in local buffer is complete, we free 00201 * the handle and then send the message to the higher layers of the 00202 * ORB for processing. 00203 * 00204 * (b) If buffer with incomplete message has been enqueued, while trying 00205 * to do the above, the reactor will call us back when the handle 00206 * becomes read ready. The read-operation will copy data directly 00207 * into the enqueued buffer. If the message has bee read completely 00208 * the message is sent to the higher layers of the ORB for processing. 00209 * 00210 * (c) If we get multiple messages (possible if the client connected 00211 * to the server sends oneways or AMI requests), we parse and 00212 * split the messages. Every message is put in the queue. Once 00213 * the messages are queued, the thread picks up one message to 00214 * send to the higher layers of the ORB. Before doing that, if 00215 * it finds more messages, it sends a notify to the reactor 00216 * without resuming the handle. The next thread picks up a 00217 * message from the queue and processes that. Once the queue 00218 * is drained the last thread resumes the handle. 00219 * 00220 * <H3> Sending Replies </H3> 00221 * 00222 * We could use the outgoing path of the ORB to send replies. This 00223 * would allow us to reuse most of the code in the outgoing data 00224 * path. We were doing this till TAO-1.2.3. We run in to 00225 * problems. When writing the reply the ORB gets flow controlled, and the 00226 * ORB tries to flush the message by going into the reactor. This 00227 * resulted in unnecessary nesting. The thread that gets into the 00228 * Reactor could potentially handle other messages (incoming or 00229 * outgoing) and the stack starts growing leading to crashes. 00230 * 00231 * <H4> Solution to the nesting problem </H4> 00232 * 00233 * The solution that we (plan to) adopt is pretty straight 00234 * forward. The thread sending replies will not block to send the 00235 * replies but queue the replies and return to the Reactor. (Note the 00236 * careful usages of the terms "blocking in the Reactor" as opposed to 00237 * "return back to the Reactor". 00238 * 00239 * 00240 * <B>See Also:</B> 00241 * 00242 * https://svn.dre.vanderbilt.edu/viewvc/Middleware/trunk/TAO/docs/pluggable_protocols/index.html?revision=HEAD 00243 * 00244 */ 00245 class TAO_Export TAO_Transport 00246 { 00247 public: 00248 00249 /// Default creator, requires the tag value be supplied. 00250 TAO_Transport (CORBA::ULong tag, TAO_ORB_Core *orb_core); 00251 00252 /// Destructor 00253 virtual ~TAO_Transport (void); 00254 00255 /// Return the protocol tag. 00256 /** 00257 * The OMG assigns unique tags (a 32-bit unsigned number) to each 00258 * protocol. New protocol tags can be obtained free of charge from 00259 * the OMG, check the documents in corbafwd.h for more details. 00260 */ 00261 CORBA::ULong tag (void) const; 00262 00263 /// Access the ORB that owns this connection. 00264 TAO_ORB_Core *orb_core (void) const; 00265 00266 /// Get the TAO_Tranport_Mux_Strategy used by this object. 00267 /** 00268 * The role of the TAO_Transport_Mux_Strategy is described in more 00269 * detail in that class' documentation. Enough is to say that the 00270 * class is used to control how many threads can have pending 00271 * requests over the same connection. Multiplexing multiple threads 00272 * over the same connection conserves resources and is almost 00273 * required for AMI, but having only one pending request per 00274 * connection is more efficient and reduces the possibilities of 00275 * priority inversions. 00276 */ 00277 TAO_Transport_Mux_Strategy *tms (void) const; 00278 00279 /// Return the TAO_Wait_Strategy used by this object. 00280 /** 00281 * The role of the TAO_Wait_Strategy is described in more detail in 00282 * that class' documentation. Enough is to say that the ORB can wait 00283 * for a reply blocking on read(), using the Reactor to wait for 00284 * multiple events concurrently or using the Leader/Followers 00285 * protocol. 00286 */ 00287 TAO_Wait_Strategy *wait_strategy (void) const; 00288 00289 /// Callback method to reactively drain the outgoing data queue 00290 int handle_output (void); 00291 00292 /// Get the bidirectional flag 00293 int bidirectional_flag (void) const; 00294 00295 /// Set the bidirectional flag 00296 void bidirectional_flag (int flag); 00297 00298 /// Set the Cache Map entry 00299 void cache_map_entry (TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *entry); 00300 00301 /// Get the Cache Map entry 00302 TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *cache_map_entry (void); 00303 00304 /// Set and Get the identifier for this transport instance. 00305 /** 00306 * If not set, this will return an integer representation of 00307 * the <code>this</code> pointer for the instance on which 00308 * it's called. 00309 */ 00310 size_t id (void) const; 00311 void id (size_t id); 00312 00313 /** 00314 * Methods dealing with the role of the connection, e.g., CLIENT or SERVER. 00315 * See CORBA 2.6 Specification, Section 15.5.1 for origin of definitions. 00316 */ 00317 TAO::Connection_Role opened_as (void) const; 00318 void opened_as (TAO::Connection_Role); 00319 00320 /// Get and Set the purging order. The purging strategy uses the set 00321 /// version to set the purging order. 00322 unsigned long purging_order (void) const; 00323 void purging_order(unsigned long value); 00324 00325 /// Check if there are messages pending in the queue 00326 /** 00327 * @return true if the queue is empty 00328 */ 00329 bool queue_is_empty (void); 00330 00331 /// Added event handler to the handlers set. 00332 /** 00333 * Called by the cache when the cache is closing. 00334 * 00335 * @param handlers The TAO_Connection_Handler_Set into which the 00336 * transport should place its handler 00337 */ 00338 void provide_handler (TAO::Connection_Handler_Set &handlers); 00339 00340 /// Add event handlers corresponding to transports that have RW wait 00341 /// strategy to the handlers set. 00342 /** 00343 * Called by the cache when the ORB is shuting down. 00344 * 00345 * @param handlers The TAO_Connection_Handler_Set into which the 00346 * transport should place its handler if the transport has RW 00347 * strategy on. 00348 * 00349 * @return true indicates a handler was added to the handler set. 00350 * false indocates that the transport did not have a 00351 * blockable handler that could be added. 00352 */ 00353 bool provide_blockable_handler (TAO::Connection_Handler_Set &handlers); 00354 00355 /// Register the handler with the reactor. 00356 /** 00357 * Register the handler with the reactor. This method is used by the 00358 * Wait_On_Reactor strategy. The transport must register its event 00359 * handler with the ORB's Reactor. 00360 * 00361 * @todo I think this method is pretty much useless, the 00362 * connections are *always* registered with the Reactor, except in 00363 * thread-per-connection mode. In that case putting the connection 00364 * in the Reactor would produce unpredictable results anyway. 00365 */ 00366 virtual int register_handler (void); 00367 00368 /// Write the complete Message_Block chain to the connection. 00369 /** 00370 * This method serializes on handler_lock_, guaranteeing that only 00371 * thread can execute it on the same instance concurrently. 00372 * 00373 * Often the implementation simply forwards the arguments to the 00374 * underlying ACE_Svc_Handler class. Using the code factored out 00375 * into ACE. 00376 * 00377 * Be careful with protocols that perform non-trivial 00378 * transformations of the data, such as SSLIOP or protocols that 00379 * compress the stream. 00380 * 00381 * @param iov contains the data that must be sent. 00382 * 00383 * @param timeout is the maximum time that the application is 00384 * willing to wait for the data to be sent, useful in platforms that 00385 * implement timed writes. 00386 * The timeout value is obtained from the policies set by the 00387 * application. 00388 * 00389 * @param bytes_transferred should return the total number of bytes 00390 * successfully transferred before the connection blocked. This is 00391 * required because in some platforms and/or protocols multiple 00392 * system calls may be required to send the chain of message 00393 * blocks. The first few calls can work successfully, but the final 00394 * one can fail or signal a flow control situation (via EAGAIN). 00395 * In this case the ORB expects the function to return -1, errno to 00396 * be appropriately set and this argument to return the number of 00397 * bytes already on the OS I/O subsystem. 00398 * 00399 * This call can also fail if the transport instance is no longer 00400 * associated with a connection (e.g., the connection handler closed 00401 * down). In that case, it returns -1 and sets errno to 00402 * <code>ENOENT</code>. 00403 */ 00404 virtual ssize_t send (iovec *iov, 00405 int iovcnt, 00406 size_t &bytes_transferred, 00407 const ACE_Time_Value *timeout = 0) = 0; 00408 00409 #if TAO_HAS_SENDFILE == 1 00410 /// Send data through zero-copy write mechanism, if available. 00411 /** 00412 * This method sends the data in the I/O vector through the platform 00413 * sendfile() function to perform a zero-copy write, if available. 00414 * Otherwise, the default fallback implementation simply delegates 00415 * to the TAO_Transport::send() method. 00416 * 00417 * @note This method is best used when sending very large blocks of 00418 * data. 00419 */ 00420 virtual ssize_t sendfile (TAO_MMAP_Allocator * allocator, 00421 iovec * iov, 00422 int iovcnt, 00423 size_t &bytes_transferred, 00424 ACE_Time_Value const * timeout = 0); 00425 #endif /* TAO_HAS_SENDFILE==1 */ 00426 00427 00428 /// Read len bytes from into buf. 00429 /** 00430 * This method serializes on handler_lock_, guaranteeing that only 00431 * thread can execute it on the same instance concurrently. 00432 * 00433 * @param buffer ORB allocated buffer where the data should be 00434 * @@ The ACE_Time_Value *s is just a place holder for now. It is 00435 * not clear this this is the best place to specify this. The actual 00436 * timeout values will be kept in the Policies. 00437 */ 00438 virtual ssize_t recv (char *buffer, 00439 size_t len, 00440 const ACE_Time_Value *timeout = 0) = 0; 00441 00442 /** 00443 * @name Control connection lifecycle 00444 * 00445 * These methods are routed through the TMS object. The TMS 00446 * strategies implement them correctly. 00447 */ 00448 //@{ 00449 00450 /// Request has been just sent, but the reply is not received. Idle 00451 /// the transport now. 00452 bool idle_after_send (void); 00453 00454 /// Request is sent and the reply is received. Idle the transport 00455 /// now. 00456 bool idle_after_reply (void); 00457 00458 /// Call the implementation method after obtaining the lock. 00459 virtual void close_connection (void); 00460 00461 //@} 00462 00463 /** @name Template methods 00464 * 00465 * The Transport class uses the Template Method Pattern to implement 00466 * the protocol specific functionality. 00467 * Implementors of a pluggable protocol should override the 00468 * following methods with the semantics documented below. 00469 */ 00470 /** 00471 * Initialising the messaging object. This would be used by the 00472 * connector side. On the acceptor side the connection handler 00473 * would take care of the messaging objects. 00474 */ 00475 virtual int messaging_init (CORBA::Octet major, CORBA::Octet minor) = 0; 00476 00477 /// Extracts the list of listen points from the @a cdr stream. The 00478 /// list would have the protocol specific details of the 00479 /// ListenPoints 00480 virtual int tear_listen_point_list (TAO_InputCDR &cdr); 00481 00482 /// Hooks that can be overridden in concrete transports. 00483 /** 00484 * These hooks are invoked just after connection establishment (or 00485 * after a connection is fetched from cache). The 00486 * return value signifies whether the invoker should proceed with 00487 * post connection establishment activities. Protocols like SSLIOP 00488 * need this to verify whether connections already established have 00489 * valid certificates. There are no pre_connect_hooks () since the 00490 * transport doesn't exist before a connection establishment. :-) 00491 * 00492 * @note The methods are not made const with a reason. 00493 */ 00494 virtual bool post_connect_hook (void); 00495 00496 /// Memory management routines. 00497 /* 00498 * Forwards to event handler. 00499 */ 00500 ACE_Event_Handler::Reference_Count add_reference (void); 00501 ACE_Event_Handler::Reference_Count remove_reference (void); 00502 00503 /// Return the messaging object that is used to format the data that 00504 /// needs to be sent. 00505 virtual TAO_Pluggable_Messaging * messaging_object (void) = 0; 00506 00507 /** @name Template methods 00508 * 00509 * The Transport class uses the Template Method Pattern to implement 00510 * the protocol specific functionality. 00511 * Implementors of a pluggable protocol should override the 00512 * following methods with the semantics documented below. 00513 */ 00514 //@{ 00515 00516 /// Return the event handler used to receive notifications from the 00517 /// Reactor. 00518 /** 00519 * Normally a concrete TAO_Transport object has-a ACE_Event_Handler 00520 * member that functions as an adapter between the ACE_Reactor 00521 * framework and the TAO pluggable protocol framework. 00522 * In all the protocols implemented so far this role is fullfilled 00523 * by an instance of ACE_Svc_Handler. 00524 * 00525 * @todo Since we only use a limited functionality of 00526 * ACE_Svc_Handler we could probably implement a generic 00527 * adapter class (TAO_Transport_Event_Handler or something), this 00528 * will reduce footprint and simplify the process of implementing a 00529 * pluggable protocol. 00530 * 00531 * @todo This method has to be renamed to event_handler() 00532 */ 00533 virtual ACE_Event_Handler * event_handler_i (void) = 0; 00534 00535 /// Is this transport really connected 00536 bool is_connected (void) const; 00537 00538 /// Perform all the actions when this transport get opened 00539 bool post_open (size_t id); 00540 00541 /// do what needs to be done when closing the transport 00542 void pre_close (void); 00543 00544 /// Get the connection handler for this transport 00545 TAO_Connection_Handler * connection_handler (void); 00546 00547 /// Accessor for the output CDR stream 00548 TAO_OutputCDR &out_stream (void); 00549 00550 /* 00551 * Specialization hook to add public methods from 00552 * concrete transport implementations to TAO's transport 00553 * class 00554 */ 00555 //@@ TAO_TRANSPORT_SPL_PUBLIC_METHODS_ADD_HOOK 00556 00557 protected: 00558 00559 virtual TAO_Connection_Handler * connection_handler_i (void) = 0; 00560 00561 public: 00562 00563 /// This is a request for the transport object to write a 00564 /// LocateRequest header before it is sent out. 00565 int generate_locate_request (TAO_Target_Specification &spec, 00566 TAO_Operation_Details &opdetails, 00567 TAO_OutputCDR &output); 00568 00569 /// This is a request for the transport object to write a request 00570 /// header before it sends out the request 00571 virtual int generate_request_header (TAO_Operation_Details &opd, 00572 TAO_Target_Specification &spec, 00573 TAO_OutputCDR &msg); 00574 00575 /// Recache ourselves in the cache 00576 int recache_transport (TAO_Transport_Descriptor_Interface* desc); 00577 00578 /// Callback to read incoming data 00579 /** 00580 * The ACE_Event_Handler adapter invokes this method as part of its 00581 * handle_input() operation. 00582 * 00583 * @todo the method name is confusing! Calling it handle_input() 00584 * would probably make things easier to understand and follow! 00585 * 00586 * Once a complete message is read the Transport class delegates on 00587 * the Messaging layer to invoke the right upcall (on the server) or 00588 * the TAO_Reply_Dispatcher (on the client side). 00589 * 00590 * @param max_wait_time In some cases the I/O is synchronous, e.g. a 00591 * thread-per-connection server or when Wait_On_Read is enabled. In 00592 * those cases a maximum read time can be specified. 00593 */ 00594 virtual int handle_input (TAO_Resume_Handle &rh, 00595 ACE_Time_Value *max_wait_time = 0); 00596 00597 enum 00598 { 00599 TAO_ONEWAY_REQUEST = 0, 00600 TAO_TWOWAY_REQUEST = 1, 00601 TAO_REPLY 00602 }; 00603 00604 /// Prepare the waiting and demuxing strategy to receive a reply for 00605 /// a new request. 00606 /** 00607 * Preparing the ORB to receive the reply only once the request is 00608 * completely sent opens the system to some subtle race conditions: 00609 * suppose the ORB is running in a multi-threaded configuration, 00610 * thread A makes a request while thread B is using the Reactor to 00611 * process all incoming requests. 00612 * Thread A could be implemented as follows: 00613 * 1) send the request 00614 * 2) setup the ORB to receive the reply 00615 * 3) wait for the request 00616 * 00617 * but in this case thread B may receive the reply between step (1) 00618 * and (2), and drop it as an invalid or unexpected message. 00619 * Consequently the correct implementation is: 00620 * 1) setup the ORB to receive the reply 00621 * 2) send the request 00622 * 3) wait for the reply 00623 * 00624 * The following method encapsulates this idiom. 00625 * 00626 * @todo This is generic code, it should be factored out into the 00627 * Transport class. 00628 */ 00629 // @nolock b/c this calls send_or_buffer 00630 virtual int send_request (TAO_Stub *stub, 00631 TAO_ORB_Core *orb_core, 00632 TAO_OutputCDR &stream, 00633 int message_semantics, 00634 ACE_Time_Value *max_time_wait) = 0; 00635 00636 /// This method formats the stream and then sends the message on the 00637 /// transport. 00638 /** 00639 * Once the ORB is prepared to receive a reply (see send_request() 00640 * above), and all the arguments have been marshaled the CDR stream 00641 * must be 'formatted', i.e. the message_size field in the GIOP 00642 * header can finally be set to the proper value. 00643 * 00644 */ 00645 virtual int send_message (TAO_OutputCDR &stream, 00646 TAO_Stub *stub = 0, 00647 int message_semantics = TAO_Transport::TAO_TWOWAY_REQUEST, 00648 ACE_Time_Value *max_time_wait = 0) = 0; 00649 00650 /// Sent the contents of @a message_block 00651 /** 00652 * @param stub The object reference used for this operation, useful 00653 * to obtain the current policies. 00654 * @param message_semantics If this is set to TAO_TWO_REQUEST 00655 * this method will block until the operation is completely 00656 * written on the wire. If it is set to other values this 00657 * operation could return. 00658 * @param message_block The CDR encapsulation of the GIOP message 00659 * that must be sent. The message may consist of 00660 * multiple Message Blocks chained through the cont() 00661 * field. 00662 * @param max_wait_time The maximum time that the operation can 00663 * block, used in the implementation of timeouts. 00664 */ 00665 virtual int send_message_shared (TAO_Stub *stub, 00666 int message_semantics, 00667 const ACE_Message_Block *message_block, 00668 ACE_Time_Value *max_wait_time); 00669 00670 protected: 00671 00672 /// Process the message by sending it to the higher layers of the 00673 /// ORB. 00674 int process_parsed_messages (TAO_Queued_Data *qd, 00675 TAO_Resume_Handle &rh); 00676 00677 /// Implement send_message_shared() assuming the handler_lock_ is 00678 /// held. 00679 int send_message_shared_i (TAO_Stub *stub, 00680 int message_semantics, 00681 const ACE_Message_Block *message_block, 00682 ACE_Time_Value *max_wait_time); 00683 00684 /// Queue a message for @a message_block 00685 /// @param max_wait_time The maximum time that the operation can 00686 /// block, used in the implementation of timeouts. 00687 int queue_message_i (const ACE_Message_Block *message_block, 00688 ACE_Time_Value *max_wait_time); 00689 00690 public: 00691 /// Format and queue a message for @a stream 00692 /// @param max_wait_time The maximum time that the operation can 00693 /// block, used in the implementation of timeouts. 00694 int format_queue_message (TAO_OutputCDR &stream, 00695 ACE_Time_Value *max_wait_time); 00696 00697 /// Send a message block chain, 00698 int send_message_block_chain (const ACE_Message_Block *message_block, 00699 size_t &bytes_transferred, 00700 ACE_Time_Value *max_wait_time = 0); 00701 00702 /// Send a message block chain, assuming the lock is held 00703 int send_message_block_chain_i (const ACE_Message_Block *message_block, 00704 size_t &bytes_transferred, 00705 ACE_Time_Value *max_wait_time); 00706 /// Cache management 00707 int purge_entry (void); 00708 00709 /// Cache management 00710 int make_idle (void); 00711 00712 /// Cache management 00713 int update_transport (void); 00714 00715 /// The timeout callback, invoked when any of the timers related to 00716 /// this transport expire. 00717 /** 00718 * @param current_time The current time as reported from the Reactor 00719 * @param act The Asynchronous Completion Token. Currently it is 00720 * interpreted as follows: 00721 * - If the ACT is the address of this->current_deadline_ the 00722 * queueing timeout has expired and the queue should start 00723 * flushing. 00724 * 00725 * @return Returns 0 if there are no problems, -1 if there is an 00726 * error 00727 * 00728 * @todo In the future this function could be used to expire 00729 * messages (oneways) that have been sitting for too long on 00730 * the queue. 00731 */ 00732 int handle_timeout (const ACE_Time_Value ¤t_time, const void* act); 00733 00734 /// Accessor to recv_buffer_size_ 00735 size_t recv_buffer_size (void) const; 00736 00737 /// Accessor to sent_byte_count_ 00738 size_t sent_byte_count (void) const; 00739 00740 /// CodeSet Negotiation - Get the char codeset translator factory 00741 TAO_Codeset_Translator_Base *char_translator (void) const; 00742 00743 /// CodeSet Negotiation - Get the wchar codeset translator factory 00744 TAO_Codeset_Translator_Base *wchar_translator (void) const; 00745 00746 /// CodeSet negotiation - Set the char codeset translator factory 00747 void char_translator (TAO_Codeset_Translator_Base *); 00748 00749 /// CodeSet negotiation - Set the wchar codeset translator factory 00750 void wchar_translator (TAO_Codeset_Translator_Base *); 00751 00752 /// Use the Transport's codeset factories to set the translator for input 00753 /// and output CDRs. 00754 void assign_translators (TAO_InputCDR *, TAO_OutputCDR *); 00755 00756 /// It is necessary to clear the codeset translator when a CDR stream 00757 /// is used for more than one GIOP message. This is required since the 00758 /// header must not be translated, whereas the body must be. 00759 void clear_translators (TAO_InputCDR *, TAO_OutputCDR *); 00760 00761 /// Return true if the tcs has been set 00762 CORBA::Boolean is_tcs_set() const; 00763 00764 /// Set the state of the first_request_ flag to 0 00765 void first_request_sent(); 00766 00767 /// Notify all the components inside a Transport when the underlying 00768 /// connection is closed. 00769 void send_connection_closed_notifications (void); 00770 00771 /// Transport statistics 00772 TAO::Transport::Stats* stats (void) const; 00773 00774 private: 00775 00776 /// Helper method that returns the Transport Cache Manager. 00777 TAO::Transport_Cache_Manager &transport_cache_manager (void); 00778 00779 /// Send some of the data in the queue. 00780 /** 00781 * As the outgoing data is drained this method is invoked to send as 00782 * much of the current message as possible. 00783 * 00784 * Returns 0 if there is more data to send, -1 if there was an error 00785 * and 1 if the message was completely sent. 00786 */ 00787 int drain_queue (void); 00788 00789 /// Implement drain_queue() assuming the lock is held 00790 int drain_queue_i (void); 00791 00792 /// Check if there are messages pending in the queue 00793 /** 00794 * This version assumes that the lock is already held. Use with 00795 * care! 00796 * 00797 * @return true if the queue is empty 00798 */ 00799 bool queue_is_empty_i (void); 00800 00801 /// A helper routine used in drain_queue_i() 00802 int drain_queue_helper (int &iovcnt, iovec iov[]); 00803 00804 /// These classes need privileged access to: 00805 /// - schedule_output_i() 00806 /// - cancel_output_i() 00807 friend class TAO_Reactive_Flushing_Strategy; 00808 friend class TAO_Leader_Follower_Flushing_Strategy; 00809 00810 /// Needs priveleged access to 00811 /// event_handler_i () 00812 friend class TAO_Thread_Per_Connection_Handler; 00813 00814 /// Schedule handle_output() callbacks 00815 int schedule_output_i (void); 00816 00817 /// Cancel handle_output() callbacks 00818 int cancel_output_i (void); 00819 00820 /// Cleanup the queue. 00821 /** 00822 * Exactly @a byte_count bytes have been sent, the queue must be 00823 * cleaned up as potentially several messages have been completely 00824 * sent out. 00825 * It leaves on head_ the next message to send out. 00826 */ 00827 void cleanup_queue (size_t byte_count); 00828 00829 /// Cleanup the complete queue 00830 void cleanup_queue_i (); 00831 00832 /// Check if the buffering constraints have been reached 00833 int check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush); 00834 00835 /// Send a synchronous message, i.e. block until the message is on 00836 /// the wire 00837 int send_synchronous_message_i (const ACE_Message_Block *message_block, 00838 ACE_Time_Value *max_wait_time); 00839 00840 /// Send a reply message, i.e. do not block until the message is on 00841 /// the wire, but just return after adding them to the queue. 00842 int send_reply_message_i (const ACE_Message_Block *message_block, 00843 ACE_Time_Value *max_wait_time); 00844 00845 /// Send an asynchronous message, i.e. do not block until the message is on 00846 /// the wire 00847 int send_asynchronous_message_i (TAO_Stub *stub, 00848 const ACE_Message_Block *message_block, 00849 ACE_Time_Value *max_wait_time); 00850 00851 /// A helper method used by send_synchronous_message_i() and 00852 /// send_reply_message_i(). Reusable code that could be used by both 00853 /// the methods. 00854 int send_synch_message_helper_i (TAO_Synch_Queued_Message &s, 00855 ACE_Time_Value *max_wait_time); 00856 00857 /// Check if the flush timer is still pending 00858 int flush_timer_pending (void) const; 00859 00860 /// The flush timer expired or was explicitly cancelled, mark it as 00861 /// not pending 00862 void reset_flush_timer (void); 00863 00864 /// Print out error messages if the event handler is not valid 00865 void report_invalid_event_handler (const char *caller); 00866 00867 /// Is invoked by handle_input operation. It consolidate message on 00868 /// top of incoming_message_stack. The amount of missing data is 00869 /// known and recv operation copies data directly into message buffer, 00870 /// as much as a single recv-invocation provides. 00871 int handle_input_missing_data (TAO_Resume_Handle &rh, 00872 ACE_Time_Value *max_wait_time, 00873 TAO_Queued_Data *q_data); 00874 00875 /// Is invoked by handle_input operation. It parses new messages from input stream 00876 /// or consolidates messages whose header has been partially read, the message 00877 /// size being unknown so far. It parses as much data as a single recv-invocation provides. 00878 int handle_input_parse_data (TAO_Resume_Handle &rh, 00879 ACE_Time_Value *max_wait_time); 00880 00881 /// Is invoked by handle_input_parse_data. Parses all messages remaining 00882 /// in @a message_block. 00883 int handle_input_parse_extra_messages (ACE_Message_Block &message_block); 00884 00885 /// @return -1 error, otherwise 0 00886 int consolidate_enqueue_message (TAO_Queued_Data *qd); 00887 00888 /// @return -1 error, otherwise 0 00889 int consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh); 00890 00891 /* 00892 * Process the message that is in the head of the incoming queue. 00893 * If there are more messages in the queue, this method calls 00894 * this->notify_reactor () to wake up a thread 00895 * @retval -1 on error 00896 * @retval 0 if successfully processing enqueued messages 00897 * @retval 1 if no message present in queue 00898 */ 00899 int process_queue_head (TAO_Resume_Handle &rh); 00900 00901 /* 00902 * This call prepares a new handler for the notify call and sends a 00903 * notify () call to the reactor. 00904 */ 00905 int notify_reactor (void); 00906 00907 /// Assume the lock is held 00908 void send_connection_closed_notifications_i (void); 00909 00910 /// Allocate a partial message block and store it in our 00911 /// partial_message_ data member. 00912 void allocate_partial_message_block (void); 00913 00914 // Disallow copying and assignment. 00915 TAO_Transport (const TAO_Transport&); 00916 void operator= (const TAO_Transport&); 00917 00918 /* 00919 * Specialization hook to add concrete private methods from 00920 * TAO's protocol implementation onto the base Transport class 00921 */ 00922 00923 //@@ TAO_TRANSPORT_SPL_PRIVATE_METHODS_ADD_HOOK 00924 00925 protected: 00926 00927 /// IOP protocol tag. 00928 CORBA::ULong const tag_; 00929 00930 /// Global orbcore resource. 00931 TAO_ORB_Core * const orb_core_; 00932 00933 /// Our entry in the cache. We don't own this. It is here for our 00934 /// convenience. We cannot just change things around. 00935 TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *cache_map_entry_; 00936 00937 /// Strategy to decide whether multiple requests can be sent over the 00938 /// same connection or the connection is exclusive for a request. 00939 TAO_Transport_Mux_Strategy *tms_; 00940 00941 /// Strategy for waiting for the reply after sending the request. 00942 TAO_Wait_Strategy *ws_; 00943 00944 /// Use to check if bidirectional info has been synchronized with 00945 /// the peer. 00946 /** 00947 * Have we sent any info on bidirectional information or have we 00948 * received any info regarding making the connection served by this 00949 * transport bidirectional. 00950 * The flag is used as follows: 00951 * + We dont want to send the bidirectional context info more than 00952 * once on the connection. Why? Waste of marshalling and 00953 * demarshalling time on the client. 00954 * + On the server side -- once a client that has established the 00955 * connection asks the server to use the connection both ways, we 00956 * *dont* want the server to pack service info to the client. That 00957 * is not allowed. We need a flag to prevent such a things from 00958 * happening. 00959 * 00960 * The value of this flag will be 0 if the client sends info and 1 00961 * if the server receives the info. 00962 */ 00963 int bidirectional_flag_; 00964 00965 TAO::Connection_Role opening_connection_role_; 00966 00967 /// Implement the outgoing data queue 00968 TAO_Queued_Message *head_; 00969 TAO_Queued_Message *tail_; 00970 00971 /// Queue of the consolidated, incoming messages.. 00972 TAO_Incoming_Message_Queue incoming_message_queue_; 00973 00974 /// Stack of incoming fragments, consolidated messages 00975 /// are going to be enqueued in "incoming_message_queue_" 00976 TAO::Incoming_Message_Stack incoming_message_stack_; 00977 00978 /// The queue will start draining no later than <queeing_deadline_> 00979 /// *if* the deadline is 00980 ACE_Time_Value current_deadline_; 00981 00982 /// The timer ID 00983 long flush_timer_id_; 00984 00985 /// The adapter used to receive timeout callbacks from the Reactor 00986 TAO_Transport_Timer transport_timer_; 00987 00988 /// Lock that insures that activities that *might* use handler-related 00989 /// resources (such as a connection handler) get serialized. 00990 /** 00991 * This is an <code>ACE_Lock</code> that gets initialized from 00992 * @c TAO_ORB_Core::resource_factory()->create_cached_connection_lock(). 00993 * This way, one can use a lock appropriate for the type of system, i.e., 00994 * a null lock for single-threaded systems, and a real lock for 00995 * multi-threaded systems. 00996 */ 00997 mutable ACE_Lock *handler_lock_; 00998 00999 /// A unique identifier for the transport. 01000 /** 01001 * This never *never* changes over the lifespan, so we don't have to worry 01002 * about locking it. 01003 * 01004 * HINT: Protocol-specific transports that use connection handler 01005 * might choose to set this to the handle for their connection. 01006 */ 01007 size_t id_; 01008 01009 /// Used by the LRU, LFU and FIFO Connection Purging Strategies. 01010 unsigned long purging_order_; 01011 01012 /// Size of the buffer received. 01013 size_t recv_buffer_size_; 01014 01015 /// Number of bytes sent. 01016 size_t sent_byte_count_; 01017 01018 /// Is this transport really connected or not. In case of oneways with 01019 /// SYNC_NONE Policy we don't wait until the connection is ready and we 01020 /// buffer the requests in this transport until the connection is ready 01021 bool is_connected_; 01022 01023 private: 01024 01025 /// @@Phil, I think it would be nice if we could think of a way to 01026 /// do the following. 01027 /// We have been trying to use the transport for marking about 01028 /// translator factories and such! IMHO this is a wrong encapulation 01029 /// ie. trying to populate the transport object with these 01030 /// details. We should probably have a class something like 01031 /// TAO_Message_Property or TAO_Message_Translator or whatever (I am 01032 /// sure you get the idea) and encapsulate all these 01033 /// details. Coupling these seems odd. if I have to be more cynical 01034 /// we can move this to the connection_handler and it may more sense 01035 /// with the DSCP stuff around there. Do you agree? 01036 01037 /// Additional member values required to support codeset translation 01038 TAO_Codeset_Translator_Base *char_translator_; 01039 TAO_Codeset_Translator_Base *wchar_translator_; 01040 01041 /// The tcs_set_ flag indicates that negotiation has occured and so the 01042 /// translators are correct, since a null translator is valid if both ends 01043 /// are using the same codeset, whatever that codeset might be. 01044 CORBA::Boolean tcs_set_; 01045 01046 /// First_request_ is true until the first request is sent or received. This 01047 /// is necessary since codeset context information is necessary only on the 01048 /// first request. After that, the translators are fixed for the life of the 01049 /// connection. 01050 CORBA::Boolean first_request_; 01051 01052 /// Holds the partial GIOP message (if there is one) 01053 ACE_Message_Block* partial_message_; 01054 01055 #if TAO_HAS_SENDFILE == 1 01056 /// mmap()-based allocator used to allocator output CDR buffers. 01057 /** 01058 * If this pointer is non-zero, sendfile() will be used to send data 01059 * in a TAO_OutputCDR stream instance. 01060 */ 01061 TAO_MMAP_Allocator * const mmap_allocator_; 01062 #endif /* TAO_HAS_SENDFILE==1 */ 01063 01064 #if TAO_HAS_TRANSPORT_CURRENT == 1 01065 /// Statistics 01066 TAO::Transport::Stats* stats_; 01067 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 01068 01069 /* 01070 * specialization hook to add class members from concrete 01071 * transport class onto the base transport class. Please 01072 * add any private members to this class *before* this hook. 01073 */ 01074 //@@ TAO_TRANSPORT_SPL_DATA_MEMBERS_ADD_HOOK 01075 }; 01076 01077 /* 01078 * Hook to add external typedefs and specializations to 01079 * TAO's transport implementation. 01080 */ 01081 01082 //@@ TAO_TRANSPORT_SPL_EXTERN_ADD_HOOK 01083 01084 #if TAO_HAS_TRANSPORT_CURRENT == 1 01085 namespace TAO 01086 { 01087 namespace Transport 01088 { 01089 /* 01090 * @class Stats 01091 * 01092 * @brief Used to collect stats on a transport. 01093 * 01094 * The base class in (potentialy) extensible hierarchy used to 01095 * specialize the information available for a specific protocol. 01096 * 01097 * This class is necessary for the implementation of the Transport 01098 * Current feature. 01099 * 01100 * <B>See Also:</B> 01101 * 01102 * https://svn.dre.vanderbilt.edu/viewvc/Middleware/trunk/TAO/docs/transport_current/index.html?revision=HEAD 01103 * 01104 */ 01105 class TAO_Export Stats 01106 { 01107 public: 01108 Stats (); 01109 virtual ~Stats (); 01110 01111 void messages_sent (size_t message_length); 01112 CORBA::LongLong messages_sent (void) const; 01113 CORBA::LongLong bytes_sent (void) const; 01114 01115 void messages_received (size_t message_length); 01116 CORBA::LongLong messages_received (void) const; 01117 CORBA::LongLong bytes_received (void) const; 01118 01119 void opened_since (const ACE_Time_Value& tv); 01120 const ACE_Time_Value& opened_since (void) const; 01121 01122 private: 01123 // The bytes_rcvd_.samples_count() could have been used instead, 01124 // however there was a suspicion that 32 bits would be 01125 // insufficient. 01126 CORBA::LongLong messages_rcvd_; 01127 01128 // The bytes_sent_.samples_count() could have been used instead, 01129 // however there was a suspicion that 32 bits would be 01130 // insufficient. 01131 CORBA::LongLong messages_sent_; 01132 01133 ACE_Basic_Stats bytes_rcvd_; 01134 ACE_Basic_Stats bytes_sent_; 01135 01136 ACE_Time_Value opened_since_; 01137 }; 01138 } 01139 } 01140 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 01141 01142 TAO_END_VERSIONED_NAMESPACE_DECL 01143 01144 #if defined (__ACE_INLINE__) 01145 # include "tao/Transport.inl" 01146 #endif /* __ACE_INLINE__ */ 01147 01148 #include /**/ "ace/post.h" 01149 01150 #endif /* TAO_TRANSPORT_H */