00001 // -*- C++ -*- 00002 00003 //============================================================================= 00004 /** 00005 * @file Transport.h 00006 * 00007 * Transport.h,v 1.89 2006/06/20 06:33:40 jwillemsen Exp 00008 * 00009 * Define the interface for the Transport component in TAO's 00010 * pluggable protocol framework. 00011 * 00012 * @author Fred Kuhns <fredk@cs.wustl.edu> 00013 */ 00014 //============================================================================= 00015 00016 #ifndef TAO_TRANSPORT_H 00017 #define TAO_TRANSPORT_H 00018 00019 #include /**/ "ace/pre.h" 00020 00021 #include "tao/Transport_Cache_Manager.h" 00022 00023 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00024 # pragma once 00025 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00026 00027 #include "tao/Transport_Timer.h" 00028 #include "tao/Incoming_Message_Queue.h" 00029 #include "tao/Incoming_Message_Stack.h" 00030 #include "ace/Time_Value.h" 00031 00032 struct iovec; 00033 00034 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00035 00036 class TAO_ORB_Core; 00037 class TAO_Target_Specification; 00038 class TAO_Operation_Details; 00039 class TAO_Transport_Mux_Strategy; 00040 class TAO_Wait_Strategy; 00041 class TAO_Connection_Handler; 00042 class TAO_Pluggable_Messaging; 00043 class TAO_Codeset_Translator_Base; 00044 00045 class TAO_Queued_Message; 00046 class TAO_Synch_Queued_Message; 00047 class TAO_Resume_Handle; 00048 class TAO_Stub; 00049 class TAO_MMAP_Allocator; 00050 00051 namespace TAO 00052 { 00053 /** 00054 * @note Should this be in TAO namespace. Seems like a candidate 00055 * that should be in the transport 00056 */ 00057 enum Connection_Role 00058 { 00059 TAO_UNSPECIFIED_ROLE = 0, 00060 TAO_SERVER_ROLE = 1, 00061 TAO_CLIENT_ROLE = 2 00062 }; 00063 } 00064 00065 /* 00066 * Specialization hook for the TAO's transport implementation. 00067 */ 00068 //@@ TAO_TRANSPORT_SPL_INCLUDE_FORWARD_DECL_ADD_HOOK 00069 00070 /** 00071 * @class TAO_Transport 00072 * 00073 * @brief Generic definitions for the Transport class. 00074 * 00075 * The transport object is created in the Service handler 00076 * constructor and deleted in the Service Handler's destructor!! 00077 * 00078 * The main responsability of a Transport object is to encapsulate a 00079 * connection, and provide a transport independent way to send and 00080 * receive data. Since TAO is heavily based on the Reactor for all if 00081 * not all its I/O the Transport class is usually implemented with a 00082 * helper Connection Handler that adapts the generic Transport 00083 * interface to the Reactor types. 00084 * 00085 * <H3>The outgoing data path:</H3> 00086 * 00087 * One of the responsibilities of the TAO_Transport class is to send 00088 * out GIOP messages as efficiently as possible. In most cases 00089 * messages are put out in FIFO order, the transport object will put 00090 * out the message using a single system call and return control to 00091 * the application. However, for oneways and AMI requests it may be 00092 * more efficient (or required if the SYNC_NONE policy is in effect) 00093 * to queue the messages until a large enough data set is available. 00094 * Another reason to queue is that some applications cannot block for 00095 * I/O, yet they want to send messages so large that a single write() 00096 * operation would not be able to cope with them. In such cases we 00097 * need to queue the data and use the Reactor to drain the queue. 00098 * 00099 * Therefore, the Transport class may need to use a queue to 00100 * temporarily hold the messages, and, in some configurations, it may 00101 * need to use the Reactor to concurrently drain such queues. 00102 * 00103 * <H4>Out of order messages:</H4> TAO provides explicit policies to 00104 * send 'urgent' messages. Such messages may put at the head of the 00105 * queue. However, they cannot be sent immediately because the 00106 * transport may already be sending another message in a reactive 00107 * fashion. 00108 * 00109 * Consequently, the Transport must also know if the head of the queue 00110 * has been partially sent. In that case new messages can only follow 00111 * the head. Only once the head is completely sent we can start 00112 * sending new messages. 00113 * 00114 * <H4>Waiting threads:</H4> One or more threads can be blocked 00115 * waiting for the connection to completely send the message. 00116 * The thread should return as soon as its message has been sent, so a 00117 * per-thread condition is required. This suggest that simply using a 00118 * ACE_Message_Queue would not be enough: there is a significant 00119 * amount of ancillary information, to keep on each message that the 00120 * Message_Block class does not provide room for. 00121 * 00122 * Blocking I/O is still attractive for some applications. First, my 00123 * eliminating the Reactor overhead performance is improved when 00124 * sending large blocks of data. Second, using the Reactor to send 00125 * out data opens the door for nested upcalls, yet some applications 00126 * cannot deal with the reentrancy issues in this case. 00127 * 00128 * <H4>Timeouts:</H4> Some or all messages could have a timeout period 00129 * attached to them. The timeout source could either be some 00130 * high-level policy or maybe some strategy to prevent denial of 00131 * service attacks. In any case the timeouts are per-message, and 00132 * later messages could have shorter timeouts. 00133 * In fact, some kind of scheduling (such as EDF) could be required in 00134 * a few applications. 00135 * 00136 * <H4>Conclusions:</H4> The outgoing data path consist in several 00137 * components: 00138 * 00139 * - A queue of pending messages 00140 * - A message currently being transmitted 00141 * - A per-transport 'send strategy' to choose between blocking on 00142 * write, blocking on the reactor or blockin on leader/follower. 00143 * - A per-message 'waiting object' 00144 * - A per-message timeout 00145 * 00146 * The Transport object provides a single method to send request 00147 * messages (send_request_message ()). 00148 * 00149 * <H3>The incoming data path:</H3> 00150 * 00151 * One of the main responsibilities of the transport is to read and 00152 * process the incoming GIOP message as quickly and efficiently as 00153 * possible. There are other forces that needs to be given due 00154 * consideration. They are 00155 * - Multiple threads should be able to traverse along the same data 00156 * path but should not be able to read from the same handle at the 00157 * same time ie. the handle should not be shared between threads at 00158 * any instant. 00159 * - Reads on the handle could give one or more messages. 00160 * - Minimise locking and copying overhead when trying to attack the 00161 * above. 00162 * 00163 * <H3> Parsing messages (GIOP) & processing the message:</H3> 00164 * 00165 * The messages should be checked for validity and the right 00166 * information should be sent to the higher layer for processing. The 00167 * process of doing a sanity check and preparing the messages for the 00168 * higher layers of the ORB are done by the messaging protocol. 00169 * 00170 * <H3> Design forces and Challenges </H3> 00171 * 00172 * To keep things as efficient as possible for medium sized requests, 00173 * it would be good to minimise data copying and locking along the 00174 * incoming path ie. from the time of reading the data from the handle 00175 * to the application. We achieve this by creating a buffer on stack 00176 * and reading the data from the handle into the buffer. We then pass 00177 * the same data block (the buffer is encapsulated into a data block) 00178 * to the higher layers of the ORB. The problems stem from the 00179 * following 00180 * (a) Data is bigger than the buffer that we have on stack 00181 * (b) Transports like TCP do not guarantee availability of the whole 00182 * chunk of data in one shot. Data could trickle in byte by byte. 00183 * (c) Single read gives multiple messages 00184 * 00185 * We solve the problems as follows 00186 * 00187 * (a) First do a read with the buffer on stack. Query the underlying 00188 * messaging object whether the message has any incomplete 00189 * portion. If so, data will be copied into new buffer being able 00190 * to hold full message and is queued; succeeding events will read 00191 * data from socket and write directly into this buffer. 00192 * Otherwise, if if the message in local buffer is complete, we free 00193 * the handle and then send the message to the higher layers of the 00194 * ORB for processing. 00195 * 00196 * (b) If buffer with incomplete message has been enqueued, while trying 00197 * to do the above, the reactor will call us back when the handle 00198 * becomes read ready. The read-operation will copy data directly 00199 * into the enqueued buffer. If the message has bee read completely 00200 * the message is sent to the higher layers of the ORB for processing. 00201 * 00202 * (c) If we get multiple messages (possible if the client connected 00203 * to the server sends oneways or AMI requests), we parse and 00204 * split the messages. Every message is put in the queue. Once 00205 * the messages are queued, the thread picks up one message to 00206 * send to the higher layers of the ORB. Before doing that, if 00207 * it finds more messages, it sends a notify to the reactor 00208 * without resuming the handle. The next thread picks up a 00209 * message from the queue and processes that. Once the queue 00210 * is drained the last thread resumes the handle. 00211 * 00212 * <H3> Sending Replies </H3> 00213 * 00214 * We could use the outgoing path of the ORB to send replies. This 00215 * would allow us to reuse most of the code in the outgoing data 00216 * path. We were doing this till TAO-1.2.3. We run in to 00217 * problems. When writing the reply the ORB gets flow controlled, and the 00218 * ORB tries to flush the message by going into the reactor. This 00219 * resulted in unnecessary nesting. The thread that gets into the 00220 * Reactor could potentially handle other messages (incoming or 00221 * outgoing) and the stack starts growing leading to crashes. 00222 * 00223 * <H4> Solution to the nesting problem </H4> 00224 * 00225 * The solution that we (plan to) adopt is pretty straight 00226 * forward. The thread sending replies will not block to send the 00227 * replies but queue the replies and return to the Reactor. (Note the 00228 * careful usages of the terms "blocking in the Reactor" as opposed to 00229 * "return back to the Reactor". 00230 * 00231 * 00232 * <B>See Also:</B> 00233 * 00234 * http://cvs.doc.wustl.edu/ace-latest.cgi/ACE_wrappers/TAO/docs/pluggable_protocols/index.html 00235 * 00236 */ 00237 class TAO_Export TAO_Transport 00238 { 00239 public: 00240 00241 /// Default creator, requires the tag value be supplied. 00242 TAO_Transport (CORBA::ULong tag, 00243 TAO_ORB_Core *orb_core); 00244 00245 /// Destructor 00246 virtual ~TAO_Transport (void); 00247 00248 /// Return the protocol tag. 00249 /** 00250 * The OMG assigns unique tags (a 32-bit unsigned number) to each 00251 * protocol. New protocol tags can be obtained free of charge from 00252 * the OMG, check the documents in corbafwd.h for more details. 00253 */ 00254 CORBA::ULong tag (void) const; 00255 00256 /// Access the ORB that owns this connection. 00257 TAO_ORB_Core *orb_core (void) const; 00258 00259 /// Get the TAO_Tranport_Mux_Strategy used by this object. 00260 /** 00261 * The role of the TAO_Transport_Mux_Strategy is described in more 00262 * detail in that class' documentation. Enough is to say that the 00263 * class is used to control how many threads can have pending 00264 * requests over the same connection. Multiplexing multiple threads 00265 * over the same connection conserves resources and is almost 00266 * required for AMI, but having only one pending request per 00267 * connection is more efficient and reduces the possibilities of 00268 * priority inversions. 00269 */ 00270 TAO_Transport_Mux_Strategy *tms (void) const; 00271 00272 /// Return the TAO_Wait_Strategy used by this object. 00273 /** 00274 * The role of the TAO_Wait_Strategy is described in more detail in 00275 * that class' documentation. Enough is to say that the ORB can wait 00276 * for a reply blocking on read(), using the Reactor to wait for 00277 * multiple events concurrently or using the Leader/Followers 00278 * protocol. 00279 */ 00280 TAO_Wait_Strategy *wait_strategy (void) const; 00281 00282 /// Callback method to reactively drain the outgoing data queue 00283 int handle_output (void); 00284 00285 /// Get the bidirectional flag 00286 int bidirectional_flag (void) const; 00287 00288 /// Set the bidirectional flag 00289 void bidirectional_flag (int flag); 00290 00291 /// Set the Cache Map entry 00292 void cache_map_entry (TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *entry); 00293 00294 /// Get the Cache Map entry 00295 TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *cache_map_entry (void); 00296 00297 /// Set and Get the identifier for this transport instance. 00298 /** 00299 * If not set, this will return an integer representation of 00300 * the <code>this</code> pointer for the instance on which 00301 * it's called. 00302 */ 00303 size_t id (void) const; 00304 void id (size_t id); 00305 00306 /** 00307 * Methods dealing with the role of the connection, e.g., CLIENT or SERVER. 00308 * See CORBA 2.6 Specification, Section 15.5.1 for origin of definitions. 00309 */ 00310 TAO::Connection_Role opened_as (void) const; 00311 void opened_as (TAO::Connection_Role); 00312 00313 /// Get and Set the purging order. The purging strategy uses the set 00314 /// version to set the purging order. 00315 unsigned long purging_order (void) const; 00316 void purging_order(unsigned long value); 00317 00318 /// Check if there are messages pending in the queue 00319 /** 00320 * @return 1 if the queue is empty 00321 */ 00322 int queue_is_empty (void); 00323 00324 /// Added event handler to the handlers set. 00325 /** 00326 * Called by the cache when the cache is closing. 00327 * 00328 * @param handlers The TAO_Connection_Handler_Set into which the 00329 * transport should place its handler 00330 */ 00331 void provide_handler (TAO::Connection_Handler_Set &handlers); 00332 00333 /// Add event handlers corresponding to transports that have RW wait 00334 /// strategy to the handlers set. 00335 /** 00336 * Called by the cache when the ORB is shuting down. 00337 * 00338 * @param handlers The TAO_Connection_Handler_Set into which the 00339 * transport should place its handler if the transport has RW 00340 * strategy on. 00341 * 00342 * @return true indicates a handler was added to the handler set. 00343 * false indocates that the transport did not have a 00344 * blockable handler that could be added. 00345 */ 00346 bool provide_blockable_handler (TAO::Connection_Handler_Set &handlers); 00347 00348 /// Register the handler with the reactor. 00349 /** 00350 * Register the handler with the reactor. This method is used by the 00351 * Wait_On_Reactor strategy. The transport must register its event 00352 * handler with the ORB's Reactor. 00353 * 00354 * @todo I think this method is pretty much useless, the 00355 * connections are *always* registered with the Reactor, except in 00356 * thread-per-connection mode. In that case putting the connection 00357 * in the Reactor would produce unpredictable results anyway. 00358 */ 00359 virtual int register_handler (void); 00360 00361 /// Write the complete Message_Block chain to the connection. 00362 /** 00363 * This method serializes on handler_lock_, guaranteeing that only 00364 * thread can execute it on the same instance concurrently. 00365 * 00366 * Often the implementation simply forwards the arguments to the 00367 * underlying ACE_Svc_Handler class. Using the code factored out 00368 * into ACE. 00369 * 00370 * Be careful with protocols that perform non-trivial 00371 * transformations of the data, such as SSLIOP or protocols that 00372 * compress the stream. 00373 * 00374 * @param iov contains the data that must be sent. 00375 * 00376 * @param timeout is the maximum time that the application is 00377 * willing to wait for the data to be sent, useful in platforms that 00378 * implement timed writes. 00379 * The timeout value is obtained from the policies set by the 00380 * application. 00381 * 00382 * @param bytes_transferred should return the total number of bytes 00383 * successfully transferred before the connection blocked. This is 00384 * required because in some platforms and/or protocols multiple 00385 * system calls may be required to send the chain of message 00386 * blocks. The first few calls can work successfully, but the final 00387 * one can fail or signal a flow control situation (via EAGAIN). 00388 * In this case the ORB expects the function to return -1, errno to 00389 * be appropriately set and this argument to return the number of 00390 * bytes already on the OS I/O subsystem. 00391 * 00392 * This call can also fail if the transport instance is no longer 00393 * associated with a connection (e.g., the connection handler closed 00394 * down). In that case, it returns -1 and sets errno to 00395 * <code>ENOENT</code>. 00396 */ 00397 virtual ssize_t send (iovec *iov, 00398 int iovcnt, 00399 size_t &bytes_transferred, 00400 const ACE_Time_Value *timeout = 0) = 0; 00401 00402 #ifdef ACE_HAS_SENDFILE 00403 /// Send data through zero-copy write mechanism, if available. 00404 /** 00405 * This method sends the data in the I/O vector through the platform 00406 * sendfile() function to perform a zero-copy write, if available. 00407 * Otherwise, the default fallback implementation simply delegates 00408 * to the TAO_Transport::send() method. 00409 * 00410 * @note This method is best used when sending very large blocks of 00411 * data. 00412 */ 00413 virtual ssize_t sendfile (TAO_MMAP_Allocator * allocator, 00414 iovec * iov, 00415 int iovcnt, 00416 size_t &bytes_transferred, 00417 ACE_Time_Value const * timeout = 0); 00418 #endif /* ACE_HAS_SENDFILE */ 00419 00420 00421 /// Read len bytes from into buf. 00422 /** 00423 * This method serializes on handler_lock_, guaranteeing that only 00424 * thread can execute it on the same instance concurrently. 00425 * 00426 * @param buffer ORB allocated buffer where the data should be 00427 * @@ The ACE_Time_Value *s is just a place holder for now. It is 00428 * not clear this this is the best place to specify this. The actual 00429 * timeout values will be kept in the Policies. 00430 */ 00431 virtual ssize_t recv (char *buffer, 00432 size_t len, 00433 const ACE_Time_Value *timeout = 0) = 0; 00434 00435 /** 00436 * @name Control connection lifecycle 00437 * 00438 * These methods are routed through the TMS object. The TMS 00439 * strategies implement them correctly. 00440 */ 00441 //@{ 00442 00443 /// Request has been just sent, but the reply is not received. Idle 00444 /// the transport now. 00445 bool idle_after_send (void); 00446 00447 /// Request is sent and the reply is received. Idle the transport 00448 /// now. 00449 bool idle_after_reply (void); 00450 00451 /// Call the implementation method after obtaining the lock. 00452 virtual void close_connection (void); 00453 00454 //@} 00455 00456 /** @name Template methods 00457 * 00458 * The Transport class uses the Template Method Pattern to implement 00459 * the protocol specific functionality. 00460 * Implementors of a pluggable protocol should override the 00461 * following methods with the semantics documented below. 00462 */ 00463 /** 00464 * Initialising the messaging object. This would be used by the 00465 * connector side. On the acceptor side the connection handler 00466 * would take care of the messaging objects. 00467 */ 00468 virtual int messaging_init (CORBA::Octet major, 00469 CORBA::Octet minor) = 0; 00470 00471 /// Extracts the list of listen points from the @a cdr stream. The 00472 /// list would have the protocol specific details of the 00473 /// ListenPoints 00474 virtual int tear_listen_point_list (TAO_InputCDR &cdr); 00475 00476 /// Hooks that can be overridden in concrete transports. 00477 /** 00478 * These hooks are invoked just after connection establishment (or 00479 * after a connection is fetched from cache). The 00480 * return value signifies whether the invoker should proceed with 00481 * post connection establishment activities. Protocols like SSLIOP 00482 * need this to verify whether connections already established have 00483 * valid certificates. There are no pre_connect_hooks () since the 00484 * transport doesn't exist before a connection establishment. :-) 00485 * 00486 * @note The methods are not made const with a reason. 00487 */ 00488 virtual bool post_connect_hook (void); 00489 00490 /// Memory management routines. 00491 /* 00492 * Forwards to event handler. 00493 */ 00494 ACE_Event_Handler::Reference_Count add_reference (void); 00495 ACE_Event_Handler::Reference_Count remove_reference (void); 00496 00497 /// Return the messaging object that is used to format the data that 00498 /// needs to be sent. 00499 virtual TAO_Pluggable_Messaging * messaging_object (void) = 0; 00500 00501 /** @name Template methods 00502 * 00503 * The Transport class uses the Template Method Pattern to implement 00504 * the protocol specific functionality. 00505 * Implementors of a pluggable protocol should override the 00506 * following methods with the semantics documented below. 00507 */ 00508 //@{ 00509 00510 /// Return the event handler used to receive notifications from the 00511 /// Reactor. 00512 /** 00513 * Normally a concrete TAO_Transport object has-a ACE_Event_Handler 00514 * member that functions as an adapter between the ACE_Reactor 00515 * framework and the TAO pluggable protocol framework. 00516 * In all the protocols implemented so far this role is fullfilled 00517 * by an instance of ACE_Svc_Handler. 00518 * 00519 * @todo Since we only use a limited functionality of 00520 * ACE_Svc_Handler we could probably implement a generic 00521 * adapter class (TAO_Transport_Event_Handler or something), this 00522 * will reduce footprint and simplify the process of implementing a 00523 * pluggable protocol. 00524 * 00525 * @todo This method has to be renamed to event_handler() 00526 */ 00527 virtual ACE_Event_Handler * event_handler_i (void) = 0; 00528 00529 /// Is this transport really connected 00530 bool is_connected (void) const; 00531 00532 /// Perform all the actions when this transport get opened 00533 bool post_open (size_t id); 00534 00535 /// Get the connection handler for this transport 00536 TAO_Connection_Handler * connection_handler (void); 00537 00538 /// Accessor for the output CDR stream 00539 TAO_OutputCDR &out_stream (void); 00540 00541 /* 00542 * Specialization hook to add public methods from 00543 * concrete transport implementations to TAO's transport 00544 * class 00545 */ 00546 //@@ TAO_TRANSPORT_SPL_PUBLIC_METHODS_ADD_HOOK 00547 00548 protected: 00549 00550 virtual TAO_Connection_Handler * connection_handler_i (void) = 0; 00551 00552 public: 00553 00554 /// This is a request for the transport object to write a 00555 /// LocateRequest header before it is sent out. 00556 int generate_locate_request (TAO_Target_Specification &spec, 00557 TAO_Operation_Details &opdetails, 00558 TAO_OutputCDR &output); 00559 00560 /// This is a request for the transport object to write a request 00561 /// header before it sends out the request 00562 virtual int generate_request_header (TAO_Operation_Details &opd, 00563 TAO_Target_Specification &spec, 00564 TAO_OutputCDR &msg); 00565 00566 /// Recache ourselves in the cache 00567 int recache_transport (TAO_Transport_Descriptor_Interface* desc); 00568 00569 /// Callback to read incoming data 00570 /** 00571 * The ACE_Event_Handler adapter invokes this method as part of its 00572 * handle_input() operation. 00573 * 00574 * @todo the method name is confusing! Calling it handle_input() 00575 * would probably make things easier to understand and follow! 00576 * 00577 * Once a complete message is read the Transport class delegates on 00578 * the Messaging layer to invoke the right upcall (on the server) or 00579 * the TAO_Reply_Dispatcher (on the client side). 00580 * 00581 * @param max_wait_time In some cases the I/O is synchronous, e.g. a 00582 * thread-per-connection server or when Wait_On_Read is enabled. In 00583 * those cases a maximum read time can be specified. 00584 * 00585 * @param block Is deprecated and ignored. 00586 * 00587 */ 00588 virtual int handle_input (TAO_Resume_Handle &rh, 00589 ACE_Time_Value *max_wait_time = 0, 00590 int block = 0); 00591 00592 enum 00593 { 00594 TAO_ONEWAY_REQUEST = 0, 00595 TAO_TWOWAY_REQUEST = 1, 00596 TAO_REPLY 00597 }; 00598 00599 /// Prepare the waiting and demuxing strategy to receive a reply for 00600 /// a new request. 00601 /** 00602 * Preparing the ORB to receive the reply only once the request is 00603 * completely sent opens the system to some subtle race conditions: 00604 * suppose the ORB is running in a multi-threaded configuration, 00605 * thread A makes a request while thread B is using the Reactor to 00606 * process all incoming requests. 00607 * Thread A could be implemented as follows: 00608 * 1) send the request 00609 * 2) setup the ORB to receive the reply 00610 * 3) wait for the request 00611 * 00612 * but in this case thread B may receive the reply between step (1) 00613 * and (2), and drop it as an invalid or unexpected message. 00614 * Consequently the correct implementation is: 00615 * 1) setup the ORB to receive the reply 00616 * 2) send the request 00617 * 3) wait for the reply 00618 * 00619 * The following method encapsulates this idiom. 00620 * 00621 * @todo This is generic code, it should be factored out into the 00622 * Transport class. 00623 */ 00624 // @nolock b/c this calls send_or_buffer 00625 virtual int send_request (TAO_Stub *stub, 00626 TAO_ORB_Core *orb_core, 00627 TAO_OutputCDR &stream, 00628 int message_semantics, 00629 ACE_Time_Value *max_time_wait) = 0; 00630 00631 00632 00633 /// This method formats the stream and then sends the message on the 00634 /// transport. 00635 /** 00636 * Once the ORB is prepared to receive a reply (see send_request() 00637 * above), and all the arguments have been marshaled the CDR stream 00638 * must be 'formatted', i.e. the message_size field in the GIOP 00639 * header can finally be set to the proper value. 00640 * 00641 */ 00642 virtual int send_message (TAO_OutputCDR &stream, 00643 TAO_Stub *stub = 0, 00644 int message_semantics = TAO_Transport::TAO_TWOWAY_REQUEST, 00645 ACE_Time_Value *max_time_wait = 0) = 0; 00646 00647 00648 /// Sent the contents of @a message_block 00649 /** 00650 * @param stub The object reference used for this operation, useful 00651 * to obtain the current policies. 00652 * @param message_semantics If this is set to TAO_TWO_REQUEST 00653 * this method will block until the operation is completely 00654 * written on the wire. If it is set to other values this 00655 * operation could return. 00656 * @param message_block The CDR encapsulation of the GIOP message 00657 * that must be sent. The message may consist of 00658 * multiple Message Blocks chained through the cont() 00659 * field. 00660 * @param max_wait_time The maximum time that the operation can 00661 * block, used in the implementation of timeouts. 00662 */ 00663 virtual int send_message_shared (TAO_Stub *stub, 00664 int message_semantics, 00665 const ACE_Message_Block *message_block, 00666 ACE_Time_Value *max_wait_time); 00667 00668 protected: 00669 00670 /// Process the message by sending it to the higher layers of the 00671 /// ORB. 00672 int process_parsed_messages (TAO_Queued_Data *qd, 00673 TAO_Resume_Handle &rh); 00674 00675 /// Implement send_message_shared() assuming the handler_lock_ is 00676 /// held. 00677 int send_message_shared_i (TAO_Stub *stub, 00678 int message_semantics, 00679 const ACE_Message_Block *message_block, 00680 ACE_Time_Value *max_wait_time); 00681 00682 /// Queue a message for @a message_block 00683 int queue_message_i (const ACE_Message_Block *message_block); 00684 00685 public: 00686 /// Format and queue a message for @a stream 00687 int format_queue_message (TAO_OutputCDR &stream); 00688 00689 /// Send a message block chain, 00690 int send_message_block_chain (const ACE_Message_Block *message_block, 00691 size_t &bytes_transferred, 00692 ACE_Time_Value *max_wait_time = 0); 00693 00694 /// Send a message block chain, assuming the lock is held 00695 int send_message_block_chain_i (const ACE_Message_Block *message_block, 00696 size_t &bytes_transferred, 00697 ACE_Time_Value *max_wait_time); 00698 /// Cache management 00699 int purge_entry (void); 00700 00701 /// Cache management 00702 int make_idle (void); 00703 00704 /// Cache management 00705 int update_transport (void); 00706 00707 /// The timeout callback, invoked when any of the timers related to 00708 /// this transport expire. 00709 /** 00710 * @param current_time The current time as reported from the Reactor 00711 * @param act The Asynchronous Completion Token. Currently it is 00712 * interpreted as follows: 00713 * - If the ACT is the address of this->current_deadline_ the 00714 * queueing timeout has expired and the queue should start 00715 * flushing. 00716 * 00717 * @return Returns 0 if there are no problems, -1 if there is an 00718 * error 00719 * 00720 * @todo In the future this function could be used to expire 00721 * messages (oneways) that have been sitting for too long on 00722 * the queue. 00723 */ 00724 int handle_timeout (const ACE_Time_Value ¤t_time, 00725 const void* act); 00726 00727 /// Accessor to recv_buffer_size_ 00728 size_t recv_buffer_size (void) const; 00729 00730 /// Accessor to sent_byte_count_ 00731 size_t sent_byte_count (void) const; 00732 00733 /// CodeSet Negotiation - Get the char codeset translator factory 00734 TAO_Codeset_Translator_Base *char_translator (void) const; 00735 00736 /// CodeSet Negotiation - Get the wchar codeset translator factory 00737 TAO_Codeset_Translator_Base *wchar_translator (void) const; 00738 00739 /// CodeSet negotiation - Set the char codeset translator factory 00740 void char_translator (TAO_Codeset_Translator_Base *); 00741 00742 /// CodeSet negotiation - Set the wchar codeset translator factory 00743 void wchar_translator (TAO_Codeset_Translator_Base *); 00744 00745 /// Use the Transport's codeset factories to set the translator for input 00746 /// and output CDRs. 00747 void assign_translators (TAO_InputCDR *, TAO_OutputCDR *); 00748 00749 /// It is necessary to clear the codeset translator when a CDR stream 00750 /// is used for more than one GIOP message. This is required since the 00751 /// header must not be translated, whereas the body must be. 00752 void clear_translators (TAO_InputCDR *, TAO_OutputCDR *); 00753 00754 /// Return true if the tcs has been set 00755 CORBA::Boolean is_tcs_set() const; 00756 00757 /// Set the state of the first_request_ flag to 0 00758 void first_request_sent(); 00759 00760 /// Notify all the components inside a Transport when the underlying 00761 /// connection is closed. 00762 void send_connection_closed_notifications (void); 00763 00764 private: 00765 00766 /// Helper method that returns the Transport Cache Manager. 00767 TAO::Transport_Cache_Manager &transport_cache_manager (void); 00768 00769 /// Send some of the data in the queue. 00770 /** 00771 * As the outgoing data is drained this method is invoked to send as 00772 * much of the current message as possible. 00773 * 00774 * Returns 0 if there is more data to send, -1 if there was an error 00775 * and 1 if the message was completely sent. 00776 */ 00777 int drain_queue (void); 00778 00779 /// Implement drain_queue() assuming the lock is held 00780 int drain_queue_i (void); 00781 00782 /// This class needs priviledged access to 00783 /// - queue_is_empty_i() 00784 /// - drain_queue_i() 00785 friend class TAO_Block_Flushing_Strategy; 00786 00787 /// Check if there are messages pending in the queue 00788 /** 00789 * This version assumes that the lock is already held. Use with 00790 * care! 00791 * 00792 * @return 1 if the queue is empty 00793 */ 00794 int queue_is_empty_i (void); 00795 00796 /// A helper routine used in drain_queue_i() 00797 int drain_queue_helper (int &iovcnt, iovec iov[]); 00798 00799 /// These classes need privileged access to: 00800 /// - schedule_output_i() 00801 /// - cancel_output_i() 00802 friend class TAO_Reactive_Flushing_Strategy; 00803 friend class TAO_Leader_Follower_Flushing_Strategy; 00804 00805 /// Needs priveleged access to 00806 /// event_handler_i () 00807 friend class TAO_Thread_Per_Connection_Handler; 00808 00809 /// Schedule handle_output() callbacks 00810 int schedule_output_i (void); 00811 00812 /// Cancel handle_output() callbacks 00813 int cancel_output_i (void); 00814 00815 /// Cleanup the queue. 00816 /** 00817 * Exactly @a byte_count bytes have been sent, the queue must be 00818 * cleaned up as potentially several messages have been completely 00819 * sent out. 00820 * It leaves on head_ the next message to send out. 00821 */ 00822 void cleanup_queue (size_t byte_count); 00823 00824 /// Cleanup the complete queue 00825 void cleanup_queue_i (); 00826 00827 /// Check if the buffering constraints have been reached 00828 int check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush); 00829 00830 /// Send a synchronous message, i.e. block until the message is on 00831 /// the wire 00832 int send_synchronous_message_i (const ACE_Message_Block *message_block, 00833 ACE_Time_Value *max_wait_time); 00834 00835 /// Send a reply message, i.e. do not block until the message is on 00836 /// the wire, but just return after adding them to the queue. 00837 int send_reply_message_i (const ACE_Message_Block *message_block, 00838 ACE_Time_Value *max_wait_time); 00839 00840 /// Send an asynchronous message, i.e. do not block until the message is on 00841 /// the wire 00842 int send_asynchronous_message_i (TAO_Stub *stub, 00843 const ACE_Message_Block *message_block, 00844 ACE_Time_Value *max_wait_time); 00845 00846 /// A helper method used by send_synchronous_message_i() and 00847 /// send_reply_message_i(). Reusable code that could be used by both 00848 /// the methods. 00849 int send_synch_message_helper_i (TAO_Synch_Queued_Message &s, 00850 ACE_Time_Value *max_wait_time); 00851 00852 /// Check if the flush timer is still pending 00853 int flush_timer_pending (void) const; 00854 00855 /// The flush timer expired or was explicitly cancelled, mark it as 00856 /// not pending 00857 void reset_flush_timer (void); 00858 00859 /// Print out error messages if the event handler is not valid 00860 void report_invalid_event_handler (const char *caller); 00861 00862 /// Is invoked by handle_input operation. It consolidate message on 00863 /// top of incoming_message_stack. The amount of missing data is 00864 /// known and recv operation copies data directly into message buffer, 00865 /// as much as a single recv-invocation provides. 00866 int handle_input_missing_data (TAO_Resume_Handle &rh, 00867 ACE_Time_Value *max_wait_time, 00868 TAO_Queued_Data *q_data); 00869 00870 /// Is invoked by handle_input operation. It parses new messages from input stream 00871 /// or consolidates messages whose header has been partially read, the message 00872 /// size being unknown so far. It parses as much data as a single recv-invocation provides. 00873 int handle_input_parse_data (TAO_Resume_Handle &rh, 00874 ACE_Time_Value *max_wait_time); 00875 00876 /// Is invoked by handle_input_parse_data. Parses all messages remaining 00877 /// in @a message_block. 00878 int handle_input_parse_extra_messages (ACE_Message_Block &message_block); 00879 00880 /// @return -1 error, otherwise 0 00881 int consolidate_enqueue_message (TAO_Queued_Data *qd); 00882 00883 /// @return -1 error, otherwise 0 00884 int consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh); 00885 00886 /* 00887 * Process the message that is in the head of the incoming queue. 00888 * If there are more messages in the queue, this method calls 00889 * this->notify_reactor () to wake up a thread 00890 * @retval -1 on error 00891 * @retval 0 if successfully processing enqueued messages 00892 * @retval 1 if no message present in queue 00893 */ 00894 int process_queue_head (TAO_Resume_Handle &rh); 00895 00896 /* 00897 * This call prepares a new handler for the notify call and sends a 00898 * notify () call to the reactor. 00899 */ 00900 int notify_reactor (void); 00901 00902 /// Assume the lock is held 00903 void send_connection_closed_notifications_i (void); 00904 00905 /// Allocate a partial message block and store it in our 00906 /// partial_message_ data member. 00907 void allocate_partial_message_block (void); 00908 00909 // Disallow copying and assignment. 00910 TAO_Transport (const TAO_Transport&); 00911 void operator= (const TAO_Transport&); 00912 00913 /* 00914 * Specialization hook to add concrete private methods from 00915 * TAO's protocol implementation onto the base Transport class 00916 */ 00917 00918 //@@ TAO_TRANSPORT_SPL_PRIVATE_METHODS_ADD_HOOK 00919 00920 protected: 00921 00922 /// IOP protocol tag. 00923 CORBA::ULong const tag_; 00924 00925 /// Global orbcore resource. 00926 TAO_ORB_Core * const orb_core_; 00927 00928 /// Our entry in the cache. We don't own this. It is here for our 00929 /// convenience. We cannot just change things around. 00930 TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *cache_map_entry_; 00931 00932 /// Strategy to decide whether multiple requests can be sent over the 00933 /// same connection or the connection is exclusive for a request. 00934 TAO_Transport_Mux_Strategy *tms_; 00935 00936 /// Strategy for waiting for the reply after sending the request. 00937 TAO_Wait_Strategy *ws_; 00938 00939 /// Use to check if bidirectional info has been synchronized with 00940 /// the peer. 00941 /** 00942 * Have we sent any info on bidirectional information or have we 00943 * received any info regarding making the connection served by this 00944 * transport bidirectional. 00945 * The flag is used as follows: 00946 * + We dont want to send the bidirectional context info more than 00947 * once on the connection. Why? Waste of marshalling and 00948 * demarshalling time on the client. 00949 * + On the server side -- once a client that has established the 00950 * connection asks the server to use the connection both ways, we 00951 * *dont* want the server to pack service info to the client. That 00952 * is not allowed. We need a flag to prevent such a things from 00953 * happening. 00954 * 00955 * The value of this flag will be 0 if the client sends info and 1 00956 * if the server receives the info. 00957 */ 00958 int bidirectional_flag_; 00959 00960 TAO::Connection_Role opening_connection_role_; 00961 00962 /// Implement the outgoing data queue 00963 TAO_Queued_Message *head_; 00964 TAO_Queued_Message *tail_; 00965 00966 /// Queue of the consolidated, incoming messages.. 00967 TAO_Incoming_Message_Queue incoming_message_queue_; 00968 00969 /// Stack of incoming fragments, consolidated messages 00970 /// are going to be enqueued in "incoming_message_queue_" 00971 TAO::Incoming_Message_Stack incoming_message_stack_; 00972 00973 /// The queue will start draining no later than <queeing_deadline_> 00974 /// *if* the deadline is 00975 ACE_Time_Value current_deadline_; 00976 00977 /// The timer ID 00978 long flush_timer_id_; 00979 00980 /// The adapter used to receive timeout callbacks from the Reactor 00981 TAO_Transport_Timer transport_timer_; 00982 00983 /// Lock that insures that activities that *might* use handler-related 00984 /// resources (such as a connection handler) get serialized. 00985 /** 00986 * This is an <code>ACE_Lock</code> that gets initialized from 00987 * @c TAO_ORB_Core::resource_factory()->create_cached_connection_lock(). 00988 * This way, one can use a lock appropriate for the type of system, i.e., 00989 * a null lock for single-threaded systems, and a real lock for 00990 * multi-threaded systems. 00991 */ 00992 mutable ACE_Lock *handler_lock_; 00993 00994 /// A unique identifier for the transport. 00995 /** 00996 * This never *never* changes over the lifespan, so we don't have to worry 00997 * about locking it. 00998 * 00999 * HINT: Protocol-specific transports that use connection handler 01000 * might choose to set this to the handle for their connection. 01001 */ 01002 size_t id_; 01003 01004 /// Used by the LRU, LFU and FIFO Connection Purging Strategies. 01005 unsigned long purging_order_; 01006 01007 /// Size of the buffer received. 01008 size_t recv_buffer_size_; 01009 01010 /// Number of bytes sent. 01011 size_t sent_byte_count_; 01012 01013 /// Is this transport really connected or not. In case of oneways with 01014 /// SYNC_NONE Policy we don't wait until the connection is ready and we 01015 /// buffer the requests in this transport until the connection is ready 01016 bool is_connected_; 01017 01018 private: 01019 01020 /// @@Phil, I think it would be nice if we could think of a way to 01021 /// do the following. 01022 /// We have been trying to use the transport for marking about 01023 /// translator factories and such! IMHO this is a wrong encapulation 01024 /// ie. trying to populate the transport object with these 01025 /// details. We should probably have a class something like 01026 /// TAO_Message_Property or TAO_Message_Translator or whatever (I am 01027 /// sure you get the idea) and encapsulate all these 01028 /// details. Coupling these seems odd. if I have to be more cynical 01029 /// we can move this to the connection_handler and it may more sense 01030 /// with the DSCP stuff around there. Do you agree? 01031 01032 /// Additional member values required to support codeset translation 01033 TAO_Codeset_Translator_Base *char_translator_; 01034 TAO_Codeset_Translator_Base *wchar_translator_; 01035 01036 /// The tcs_set_ flag indicates that negotiation has occured and so the 01037 /// translators are correct, since a null translator is valid if both ends 01038 /// are using the same codeset, whatever that codeset might be. 01039 CORBA::Boolean tcs_set_; 01040 01041 /// First_request_ is true until the first request is sent or received. This 01042 /// is necessary since codeset context information is necessary only on the 01043 /// first request. After that, the translators are fixed for the life of the 01044 /// connection. 01045 CORBA::Boolean first_request_; 01046 01047 /// Holds the partial GIOP message (if there is one) 01048 ACE_Message_Block* partial_message_; 01049 01050 #ifdef ACE_HAS_SENDFILE 01051 /// mmap()-based allocator used to allocator output CDR buffers. 01052 /** 01053 * If this pointer is non-zero, sendfile() will be used to send data 01054 * in a TAO_OutputCDR stream instance. 01055 */ 01056 TAO_MMAP_Allocator * const mmap_allocator_; 01057 #endif /* ACE_HAS_SENDFILE */ 01058 01059 /* 01060 * specialization hook to add class members from concrete 01061 * transport class onto the base transport class. Please 01062 * add any private members to this class *before* this hook. 01063 */ 01064 //@@ TAO_TRANSPORT_SPL_DATA_MEMBERS_ADD_HOOK 01065 }; 01066 01067 /* 01068 * Hook to add external typedefs and specializations to 01069 * TAO's transport implementation. 01070 */ 01071 01072 //@@ TAO_TRANSPORT_SPL_EXTERN_ADD_HOOK 01073 01074 TAO_END_VERSIONED_NAMESPACE_DECL 01075 01076 #if defined (__ACE_INLINE__) 01077 # include "tao/Transport.inl" 01078 #endif /* __ACE_INLINE__ */ 01079 01080 #include /**/ "ace/post.h" 01081 01082 #endif /* TAO_TRANSPORT_H */