Transport.cpp

Go to the documentation of this file.
00001 // $Id: Transport.cpp 79388 2007-08-17 16:05:00Z wilsond $
00002 
00003 #include "tao/Transport.h"
00004 
00005 #include "tao/LF_Follower.h"
00006 #include "tao/Leader_Follower.h"
00007 #include "tao/Client_Strategy_Factory.h"
00008 #include "tao/Wait_Strategy.h"
00009 #include "tao/Transport_Mux_Strategy.h"
00010 #include "tao/Stub.h"
00011 #include "tao/Transport_Queueing_Strategies.h"
00012 #include "tao/Connection_Handler.h"
00013 #include "tao/Pluggable_Messaging.h"
00014 #include "tao/Synch_Queued_Message.h"
00015 #include "tao/Asynch_Queued_Message.h"
00016 #include "tao/Flushing_Strategy.h"
00017 #include "tao/Thread_Lane_Resources.h"
00018 #include "tao/Resume_Handle.h"
00019 #include "tao/Codeset_Manager.h"
00020 #include "tao/Codeset_Translator_Base.h"
00021 #include "tao/debug.h"
00022 #include "tao/CDR.h"
00023 #include "tao/ORB_Core.h"
00024 #include "tao/MMAP_Allocator.h"
00025 #include "tao/SystemException.h"
00026 
00027 #include "ace/OS_NS_sys_time.h"
00028 #include "ace/OS_NS_stdio.h"
00029 #include "ace/Reactor.h"
00030 #include "ace/os_include/sys/os_uio.h"
00031 #include "ace/High_Res_Timer.h"
00032 #include "ace/CORBA_macros.h"
00033 
00034 /*
00035  * Specialization hook to add include files from
00036  * concrete transport implementation.
00037  */
00038 //@@ TAO_TRANSPORT_SPL_INCLUDE_FORWARD_DECL_ADD_HOOK
00039 
00040 #if !defined (__ACE_INLINE__)
00041 # include "tao/Transport.inl"
00042 #endif /* __ACE_INLINE__ */
00043 
00044 
00045 ACE_RCSID (tao,
00046            Transport,
00047            "$Id: Transport.cpp 79388 2007-08-17 16:05:00Z wilsond $")
00048 
00049 /*
00050  * Static function in file scope
00051  */
00052 static void
00053 dump_iov (iovec *iov, int iovcnt, size_t id,
00054           size_t current_transfer,
00055           const char *location)
00056 {
00057   ACE_Guard <ACE_Log_Msg> log_guard (*ACE_Log_Msg::instance ());
00058 
00059   ACE_DEBUG ((LM_DEBUG,
00060               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
00061               ACE_TEXT ("sending %d buffers\n"),
00062               id, ACE_TEXT_CHAR_TO_TCHAR (location), iovcnt));
00063 
00064   for (int i = 0; i != iovcnt && 0 < current_transfer; ++i)
00065     {
00066       size_t iov_len = iov[i].iov_len;
00067 
00068       // Possibly a partially sent iovec entry.
00069       if (current_transfer < iov_len)
00070         {
00071           iov_len = current_transfer;
00072         }
00073 
00074       ACE_DEBUG ((LM_DEBUG,
00075                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
00076                   ACE_TEXT ("buffer %d/%d has %d bytes\n"),
00077                   id, ACE_TEXT_CHAR_TO_TCHAR(location),
00078                   i, iovcnt,
00079                   iov_len));
00080 
00081       size_t len;
00082 
00083       for (size_t offset = 0; offset < iov_len; offset += len)
00084         {
00085           ACE_TCHAR header[1024];
00086           ACE_OS::sprintf (header,
00087                            ACE_TEXT("TAO - ")
00088                            ACE_TEXT("Transport[")
00089                            ACE_SIZE_T_FORMAT_SPECIFIER
00090                            ACE_TEXT("]::%s")
00091                            ACE_TEXT(" (")
00092                            ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT("/")
00093                            ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT(")"),
00094                            id, location, offset, iov_len);
00095 
00096           len = iov_len - offset;
00097 
00098           if (len > 512)
00099             {
00100               len = 512;
00101             }
00102 
00103           ACE_HEX_DUMP ((LM_DEBUG,
00104                          static_cast<char*> (iov[i].iov_base) + offset,
00105                          len,
00106                          header));
00107         }
00108       current_transfer -= iov_len;
00109     }
00110 
00111   ACE_DEBUG ((LM_DEBUG,
00112               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
00113               ACE_TEXT ("end of data\n"),
00114               id, ACE_TEXT_CHAR_TO_TCHAR(location)));
00115 }
00116 
00117 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00118 
00119 #if TAO_HAS_TRANSPORT_CURRENT == 1
00120 TAO::Transport::Stats::~Stats ()
00121 {
00122   // no-op
00123 }
00124 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
00125 
00126 TAO_Transport::TAO_Transport (CORBA::ULong tag,
00127                               TAO_ORB_Core *orb_core)
00128   : tag_ (tag)
00129   , orb_core_ (orb_core)
00130   , cache_map_entry_ (0)
00131   , bidirectional_flag_ (-1)
00132   , opening_connection_role_ (TAO::TAO_UNSPECIFIED_ROLE)
00133   , head_ (0)
00134   , tail_ (0)
00135   , incoming_message_queue_ (orb_core)
00136   , current_deadline_ (ACE_Time_Value::zero)
00137   , flush_timer_id_ (-1)
00138   , transport_timer_ (this)
00139   , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ())
00140   , id_ ((size_t) this)
00141   , purging_order_ (0)
00142   , recv_buffer_size_ (0)
00143   , sent_byte_count_ (0)
00144   , is_connected_ (false)
00145   , char_translator_ (0)
00146   , wchar_translator_ (0)
00147   , tcs_set_ (0)
00148   , first_request_ (1)
00149   , partial_message_ (0)
00150 #if TAO_HAS_SENDFILE == 1
00151     // The ORB has been configured to use the MMAP allocator, meaning
00152     // we could/should use sendfile() to send data.  Cast once rather
00153     // here rather than during each send.  This assumes that all
00154     // TAO_OutputCDR instances are using the same TAO_MMAP_Allocator
00155     // instance as the underlying output CDR buffer allocator.
00156   , mmap_allocator_ (
00157       dynamic_cast<TAO_MMAP_Allocator *> (
00158         orb_core->output_cdr_buffer_allocator ()))
00159 #endif  /* TAO_HAS_SENDFILE==1 */
00160 {
00161   TAO_Client_Strategy_Factory *cf =
00162     this->orb_core_->client_factory ();
00163 
00164   // Create WS now.
00165   this->ws_ = cf->create_wait_strategy (this);
00166 
00167   // Create TMS now.
00168   this->tms_ = cf->create_transport_mux_strategy (this);
00169 
00170 #if TAO_HAS_TRANSPORT_CURRENT == 1
00171   // Allocate stats
00172   ACE_NEW_THROW_EX (this->stats_,
00173                     TAO::Transport::Stats,
00174                     CORBA::NO_MEMORY ());
00175 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
00176 
00177   /*
00178    * Hook to add code that initializes components that
00179    * belong to the concrete protocol implementation.
00180    * Further additions to this Transport class will
00181    * need to add code *before* this hook.
00182    */
00183   //@@ TAO_TRANSPORT_SPL_CONSTRUCTOR_ADD_HOOK
00184 }
00185 
00186 TAO_Transport::~TAO_Transport (void)
00187 {
00188   delete this->ws_;
00189 
00190   delete this->tms_;
00191 
00192   delete this->handler_lock_;
00193 
00194   if (!this->is_connected_)
00195     {
00196       // When we have a not connected transport we could have buffered
00197       // messages on this transport which we have to cleanup now.
00198       this->cleanup_queue_i();
00199 
00200       // Cleanup our cache entry
00201       this->purge_entry();
00202     }
00203 
00204   // Release the partial message block, however we may
00205   // have never allocated one.
00206   ACE_Message_Block::release (this->partial_message_);
00207 
00208   // By the time the destructor is reached here all the connection stuff
00209   // *must* have been cleaned up.
00210 
00211   // The following assert is needed for the test "Bug_2494_Regression".
00212   // See the bugzilla bug #2494 for details.
00213   ACE_ASSERT (this->head_ == 0);
00214   ACE_ASSERT (this->cache_map_entry_ == 0);
00215 
00216 #if TAO_HAS_TRANSPORT_CURRENT == 1
00217   delete this->stats_;
00218 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
00219 
00220   /*
00221    * Hook to add code that cleans up components
00222    * belong to the concrete protocol implementation.
00223    * Further additions to this Transport class will
00224    * need to add code *before* this hook.
00225    */
00226   //@@ TAO_TRANSPORT_SPL_DESTRUCTOR_ADD_HOOK
00227 }
00228 
00229 void
00230 TAO_Transport::provide_handler (TAO::Connection_Handler_Set &handlers)
00231 {
00232   (void) this->add_reference ();
00233 
00234   handlers.insert (this->connection_handler_i ());
00235 }
00236 
00237 bool
00238 TAO_Transport::provide_blockable_handler (TAO::Connection_Handler_Set &h)
00239 {
00240   if (this->ws_->non_blocking () ||
00241       this->opening_connection_role_ == TAO::TAO_SERVER_ROLE)
00242     return false;
00243 
00244   (void) this->add_reference ();
00245 
00246   h.insert (this->connection_handler_i ());
00247 
00248   return true;
00249 }
00250 
00251 bool
00252 TAO_Transport::idle_after_send (void)
00253 {
00254   return this->tms ()->idle_after_send ();
00255 }
00256 
00257 bool
00258 TAO_Transport::idle_after_reply (void)
00259 {
00260   return this->tms ()->idle_after_reply ();
00261 }
00262 
00263 /*
00264  * A concrete transport class specializes this
00265  * method. This hook allows commenting this function
00266  * when TAO's transport is specialized. Note: All
00267  * functions that have an implementation that does
00268  * nothing should be added within this hook to
00269  * enable specialization.
00270  */
00271 //@@ TAO_TRANSPORT_SPL_COMMENT_HOOK_START
00272 
00273 int
00274 TAO_Transport::tear_listen_point_list (TAO_InputCDR &)
00275 {
00276   ACE_NOTSUP_RETURN (-1);
00277 }
00278 
00279 int
00280 TAO_Transport::send_message_shared (TAO_Stub *stub,
00281                                     int message_semantics,
00282                                     const ACE_Message_Block *message_block,
00283                                     ACE_Time_Value *max_wait_time)
00284 {
00285   int result = 0;
00286 
00287   {
00288     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00289 
00290     result =
00291       this->send_message_shared_i (stub, message_semantics,
00292                                    message_block, max_wait_time);
00293   }
00294 
00295   if (result == -1)
00296     {
00297       this->close_connection ();
00298     }
00299 
00300   return result;
00301 }
00302 
00303 //@@ TAO_TRANSPORT_SPL_COMMENT_HOOK_END
00304 
00305 bool
00306 TAO_Transport::post_connect_hook (void)
00307 {
00308   return true;
00309 }
00310 
00311 void
00312 TAO_Transport::close_connection (void)
00313 {
00314   this->connection_handler_i ()->close_connection ();
00315 }
00316 
00317 int
00318 TAO_Transport::register_handler (void)
00319 {
00320   if (TAO_debug_level > 4)
00321     {
00322       ACE_DEBUG ((LM_DEBUG,
00323                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"),
00324                   this->id ()));
00325     }
00326 
00327   ACE_Reactor * const r = this->orb_core_->reactor ();
00328 
00329   // @@note: This should be okay since the register handler call will
00330   // not make a nested call into the transport.
00331   ACE_GUARD_RETURN (ACE_Lock,
00332                     ace_mon,
00333                     *this->handler_lock_,
00334                     false);
00335 
00336   if (r == this->event_handler_i ()->reactor ())
00337     {
00338       return 0;
00339     }
00340 
00341   // Set the flag in the Connection Handler and in the Wait Strategy
00342   // @@Maybe we should set these flags after registering with the
00343   // reactor. What if the  registration fails???
00344   this->ws_->is_registered (true);
00345 
00346   // Register the handler with the reactor
00347   return r->register_handler (this->event_handler_i (),
00348                               ACE_Event_Handler::READ_MASK);
00349 }
00350 
00351 #if TAO_HAS_SENDFILE == 1
00352 ssize_t
00353 TAO_Transport::sendfile (TAO_MMAP_Allocator * /* allocator */,
00354                          iovec * iov,
00355                          int iovcnt,
00356                          size_t &bytes_transferred,
00357                          ACE_Time_Value const * timeout)
00358 {
00359   // Concrete pluggable transport doesn't implement sendfile().
00360   // Fallback on TAO_Transport::send().
00361 
00362   // @@ We can probably refactor the TAO_IIOP_Transport::sendfile()
00363   //    implementation to this base class method, and leave any TCP
00364   //    specific configuration out of this base class method.
00365   //      -Ossama
00366   return this->send (iov, iovcnt, bytes_transferred, timeout);
00367 }
00368 #endif  /* TAO_HAS_SENDFILE==1 */
00369 
00370 int
00371 TAO_Transport::generate_locate_request (
00372     TAO_Target_Specification &spec,
00373     TAO_Operation_Details &opdetails,
00374     TAO_OutputCDR &output)
00375 {
00376   if (this->messaging_object ()->generate_locate_request_header (opdetails,
00377                                                                  spec,
00378                                                                  output) == -1)
00379     {
00380       if (TAO_debug_level > 0)
00381         {
00382           ACE_DEBUG ((LM_DEBUG,
00383                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ")
00384                       ACE_TEXT ("error while marshalling the LocateRequest header\n"),
00385                       this->id ()));
00386         }
00387 
00388       return -1;
00389     }
00390 
00391   return 0;
00392 }
00393 
00394 int
00395 TAO_Transport::generate_request_header (
00396     TAO_Operation_Details &opdetails,
00397     TAO_Target_Specification &spec,
00398     TAO_OutputCDR &output)
00399 {
00400   // codeset service context is only supposed to be sent in the first request
00401   // on a particular connection.
00402   if (this->first_request_)
00403     {
00404       TAO_Codeset_Manager * const csm = this->orb_core ()->codeset_manager ();
00405       if (csm)
00406         csm->generate_service_context (opdetails,*this);
00407     }
00408 
00409   if (this->messaging_object ()->generate_request_header (opdetails,
00410                                                           spec,
00411                                                           output) == -1)
00412     {
00413       if (TAO_debug_level > 0)
00414         {
00415         ACE_DEBUG ((LM_DEBUG,
00416                    ACE_TEXT ("(%P|%t) - Transport[%d]::generate_request_header, ")
00417                    ACE_TEXT ("error while marshalling the Request header\n"),
00418                       this->id()));
00419         }
00420 
00421       return -1;
00422     }
00423 
00424   return 0;
00425 }
00426 
00427 /// @todo Ideally the following should be inline.
00428 /// @todo purge_entry has a return value, use it
00429 int
00430 TAO_Transport::recache_transport (TAO_Transport_Descriptor_Interface *desc)
00431 {
00432   // First purge our entry
00433   this->purge_entry ();
00434 
00435   // Then add ourselves to the cache
00436   return this->transport_cache_manager ().cache_transport (desc,
00437                                                            this);
00438 }
00439 
00440 int
00441 TAO_Transport::purge_entry (void)
00442 {
00443   return this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
00444 }
00445 
00446 int
00447 TAO_Transport::make_idle (void)
00448 {
00449   if (TAO_debug_level > 3)
00450     {
00451       ACE_DEBUG ((LM_DEBUG,
00452                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"),
00453                   this->id ()));
00454     }
00455 
00456   return this->transport_cache_manager ().make_idle (this->cache_map_entry_);
00457 }
00458 
00459 int
00460 TAO_Transport::update_transport (void)
00461 {
00462   return this->transport_cache_manager ().update_entry (this->cache_map_entry_);
00463 }
00464 
00465 /*
00466  *
00467  *  Methods called and used in the output path of the ORB.
00468  *
00469  */
00470 int
00471 TAO_Transport::handle_output (void)
00472 {
00473   if (TAO_debug_level > 3)
00474     {
00475       ACE_DEBUG ((LM_DEBUG,
00476                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output\n"),
00477                   this->id ()));
00478     }
00479 
00480   // The flushing strategy (potentially via the Reactor) wants to send
00481   // more data, first check if there is a current message that needs
00482   // more sending...
00483   int const retval = this->drain_queue ();
00484 
00485   if (TAO_debug_level > 3)
00486     {
00487       ACE_DEBUG ((LM_DEBUG,
00488                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ")
00489                   ACE_TEXT ("drain_queue returns %d/%d\n"),
00490                   this->id (),
00491                   retval, errno));
00492     }
00493 
00494   // Any errors are returned directly to the Reactor
00495   return retval;
00496 }
00497 
00498 int
00499 TAO_Transport::format_queue_message (TAO_OutputCDR &stream,
00500                                      ACE_Time_Value *max_wait_time)
00501 {
00502   if (this->messaging_object ()->format_message (stream) != 0)
00503     return -1;
00504 
00505   return this->queue_message_i (stream.begin (), max_wait_time);
00506 }
00507 
00508 int
00509 TAO_Transport::send_message_block_chain (const ACE_Message_Block *mb,
00510                                          size_t &bytes_transferred,
00511                                          ACE_Time_Value *max_wait_time)
00512 {
00513   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00514 
00515   return this->send_message_block_chain_i (mb,
00516                                            bytes_transferred,
00517                                            max_wait_time);
00518 }
00519 
00520 int
00521 TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb,
00522                                            size_t &bytes_transferred,
00523                                            ACE_Time_Value *)
00524 {
00525   size_t const total_length = mb->total_length ();
00526 
00527   // We are going to block, so there is no need to clone
00528   // the message block.
00529   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00530 
00531   synch_message.push_back (this->head_, this->tail_);
00532 
00533   int const n = this->drain_queue_i ();
00534 
00535   if (n == -1)
00536     {
00537       synch_message.remove_from_list (this->head_, this->tail_);
00538       return -1; // Error while sending...
00539     }
00540   else if (n == 1)
00541     {
00542       bytes_transferred = total_length;
00543       return 1;  // Empty queue, message was sent..
00544     }
00545 
00546   // Remove the temporary message from the queue...
00547   synch_message.remove_from_list (this->head_, this->tail_);
00548 
00549   bytes_transferred = total_length - synch_message.message_length ();
00550 
00551   return 0;
00552 }
00553 
00554 int
00555 TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
00556                                            ACE_Time_Value *max_wait_time)
00557 {
00558   // We are going to block, so there is no need to clone
00559   // the message block.
00560   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00561   size_t const message_length = synch_message.message_length ();
00562 
00563   synch_message.push_back (this->head_, this->tail_);
00564 
00565   int const n = this->send_synch_message_helper_i (synch_message,
00566                                                    0 /*ignored*/);
00567   if (n == -1 || n == 1)
00568     {
00569       return n;
00570     }
00571 
00572   // @todo: Check for timeouts!
00573   // if (max_wait_time != 0 && errno == ETIME) return -1;
00574   TAO_Flushing_Strategy *flushing_strategy =
00575     this->orb_core ()->flushing_strategy ();
00576   int result = flushing_strategy->schedule_output (this);
00577   if (result == -1)
00578     {
00579       synch_message.remove_from_list (this->head_, this->tail_);
00580       if (TAO_debug_level > 0)
00581         {
00582           ACE_ERROR ((LM_ERROR,
00583                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
00584                       ACE_TEXT ("send_synchronous_message_i, ")
00585                       ACE_TEXT ("error while scheduling flush - %m\n"),
00586                       this->id ()));
00587         }
00588       return -1;
00589     }
00590 
00591   // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH,
00592   // because we're always going to flush anyway.
00593 
00594   // Release the mutex, other threads may modify the queue as we
00595   // block for a long time writing out data.
00596   {
00597     typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00598     TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00599     ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00600 
00601     result = flushing_strategy->flush_message (this,
00602                                                &synch_message,
00603                                                max_wait_time);
00604   }
00605 
00606   if (result == -1)
00607     {
00608       synch_message.remove_from_list (this->head_, this->tail_);
00609 
00610       if (errno == ETIME)
00611         {
00612           // If partially sent, then we must queue the remainder.
00613           if (message_length != synch_message.message_length ())
00614             {
00615               // This is a timeout, there is only one nasty case: the
00616               // message has been partially sent!  We simply cannot take
00617               // the message out of the queue, because that would corrupt
00618               // the connection.
00619               //
00620               // What we do is replace the queued message with an
00621               // asynchronous message, that contains only what remains of
00622               // the timed out request.  If you think about sending
00623               // CancelRequests in this case: there is no much point in
00624               // doing that: the receiving ORB would probably ignore it,
00625               // and figuring out the request ID would be a bit of a
00626               // nightmare.
00627               //
00628               TAO_Queued_Message *queued_message = 0;
00629               ACE_NEW_RETURN (queued_message,
00630                               TAO_Asynch_Queued_Message (
00631                                   synch_message.current_block (),
00632                                   this->orb_core_,
00633                                   0, // no timeout
00634                                   0,
00635                                   true),
00636                               -1);
00637               queued_message->push_front (this->head_, this->tail_);
00638             }
00639         }
00640 
00641       if (TAO_debug_level > 0)
00642         {
00643           ACE_ERROR ((LM_ERROR,
00644              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ")
00645              ACE_TEXT ("error while flushing message - %m\n"),
00646              this->id ()));
00647         }
00648 
00649       return -1;
00650     }
00651 
00652   return 1;
00653 }
00654 
00655 
00656 int
00657 TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb,
00658                                      ACE_Time_Value *max_wait_time)
00659 {
00660   // Dont clone now.. We could be sent in one shot!
00661   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00662 
00663   synch_message.push_back (this->head_,
00664                            this->tail_);
00665 
00666   int const n =
00667     this->send_synch_message_helper_i (synch_message,
00668                                        max_wait_time);
00669 
00670   if (n == -1 || n == 1)
00671     {
00672       return n;
00673     }
00674 
00675   if (TAO_debug_level > 3)
00676     {
00677       ACE_DEBUG ((LM_DEBUG,
00678          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ")
00679          ACE_TEXT ("preparing to add to queue before leaving\n"),
00680          this->id ()));
00681     }
00682 
00683   // Till this point we shouldn't have any copying and that is the
00684   // point anyway. Now, remove the node from the list
00685   synch_message.remove_from_list (this->head_, this->tail_);
00686 
00687   // Clone the node that we have.
00688   TAO_Queued_Message *msg =
00689     synch_message.clone (this->orb_core_->transport_message_buffer_allocator ());
00690 
00691   // Stick it in the queue
00692   msg->push_back (this->head_, this->tail_);
00693 
00694   TAO_Flushing_Strategy *flushing_strategy =
00695     this->orb_core ()->flushing_strategy ();
00696 
00697   int const result = flushing_strategy->schedule_output (this);
00698 
00699   if (result == -1)
00700     {
00701       if (TAO_debug_level > 5)
00702         {
00703           ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_"
00704                       "message_i, dequeuing msg due to schedule_output "
00705                       "failure\n", this->id ()));
00706         }
00707       msg->remove_from_list (this->head_, this->tail_);
00708       msg->destroy ();
00709     }
00710   else if (result == TAO_Flushing_Strategy::MUST_FLUSH)
00711     {
00712       typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00713       TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00714       ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00715       (void) flushing_strategy->flush_message(this, msg, 0);
00716     }
00717 
00718   return 1;
00719 }
00720 
00721 int
00722 TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_message,
00723                                             ACE_Time_Value * /*max_wait_time*/)
00724 {
00725   // @todo: Need to send timeouts for writing..
00726   int const n = this->drain_queue_i ();
00727 
00728   if (n == -1)
00729     {
00730       synch_message.remove_from_list (this->head_, this->tail_);
00731       return -1; // Error while sending...
00732     }
00733   else if (n == 1)
00734     {
00735       return 1;  // Empty queue, message was sent..
00736     }
00737 
00738   if (synch_message.all_data_sent ())
00739     {
00740       return 1;
00741     }
00742 
00743   return 0;
00744 }
00745 
00746 bool
00747 TAO_Transport::queue_is_empty_i (void)
00748 {
00749   return (this->head_ == 0);
00750 }
00751 
00752 
00753 int
00754 TAO_Transport::schedule_output_i (void)
00755 {
00756   ACE_Event_Handler * const eh = this->event_handler_i ();
00757   ACE_Reactor * const reactor = eh->reactor ();
00758 
00759   if (reactor == 0)
00760      return -1;
00761 
00762   // Check to see if our event handler is still registered with the
00763   // reactor.  It's possible for another thread to have run close_connection()
00764   // since we last used the event handler.
00765   ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ());
00766   if (found != eh)
00767     {
00768       if(TAO_debug_level > 3)
00769         {
00770           ACE_DEBUG ((LM_DEBUG,
00771                       "TAO (%P|%t) - Transport[%d]::schedule_output_i "
00772                       "event handler not found in reactor, returning -1\n",
00773                       this->id ()));
00774         }
00775       if (found)
00776         {
00777           found->remove_reference ();
00778         }
00779       return -1;
00780     }
00781   found->remove_reference ();
00782 
00783   if (TAO_debug_level > 3)
00784     {
00785       ACE_DEBUG ((LM_DEBUG,
00786          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"),
00787          this->id ()));
00788     }
00789 
00790   return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00791 }
00792 
00793 int
00794 TAO_Transport::cancel_output_i (void)
00795 {
00796   ACE_Event_Handler * const eh = this->event_handler_i ();
00797   ACE_Reactor *const reactor = eh->reactor ();
00798 
00799   if (TAO_debug_level > 3)
00800     {
00801       ACE_DEBUG ((LM_DEBUG,
00802          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"),
00803          this->id ()));
00804     }
00805 
00806   return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00807 }
00808 
00809 int
00810 TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */,
00811                                const void *act)
00812 {
00813   if (TAO_debug_level > 6)
00814     {
00815       ACE_DEBUG ((LM_DEBUG,
00816          ACE_TEXT ("TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, ")
00817          ACE_TEXT ("timer expired\n"),
00818          this->id ()));
00819     }
00820 
00821   /// This is the only legal ACT in the current configuration....
00822   if (act != &this->current_deadline_)
00823     {
00824       return -1;
00825     }
00826 
00827   if (this->flush_timer_pending ())
00828     {
00829       // The timer is always a oneshot timer, so mark is as not
00830       // pending.
00831       this->reset_flush_timer ();
00832 
00833       TAO_Flushing_Strategy *flushing_strategy =
00834         this->orb_core ()->flushing_strategy ();
00835       int const result = flushing_strategy->schedule_output (this);
00836       if (result == TAO_Flushing_Strategy::MUST_FLUSH)
00837         {
00838           typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00839           TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00840           ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00841           (void) flushing_strategy->flush_transport (this);
00842         }
00843     }
00844 
00845   return 0;
00846 }
00847 
00848 int
00849 TAO_Transport::drain_queue (void)
00850 {
00851   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00852   int const retval = this->drain_queue_i ();
00853 
00854   if (retval == 1)
00855     {
00856       // ... there is no current message or it was completely
00857       // sent, cancel output...
00858       TAO_Flushing_Strategy *flushing_strategy =
00859         this->orb_core ()->flushing_strategy ();
00860 
00861       flushing_strategy->cancel_output (this);
00862 
00863       return 0;
00864     }
00865 
00866   return retval;
00867 }
00868 
00869 int
00870 TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
00871 {
00872   size_t byte_count = 0;
00873 
00874   // ... send the message ...
00875   ssize_t retval = -1;
00876 
00877 #if TAO_HAS_SENDFILE == 1
00878   if (this->mmap_allocator_)
00879     retval = this->sendfile (this->mmap_allocator_,
00880                              iov,
00881                              iovcnt,
00882                              byte_count);
00883   else
00884 #endif  /* TAO_HAS_SENDFILE==1 */
00885     retval = this->send (iov, iovcnt, byte_count);
00886 
00887   if (TAO_debug_level == 5)
00888     {
00889       dump_iov (iov, iovcnt, this->id (),
00890                 byte_count, "drain_queue_helper");
00891     }
00892 
00893   // ... now we need to update the queue, removing elements
00894   // that have been sent, and updating the last element if it
00895   // was only partially sent ...
00896   this->cleanup_queue (byte_count);
00897   iovcnt = 0;
00898 
00899   if (retval == 0)
00900     {
00901       if (TAO_debug_level > 4)
00902         {
00903           ACE_DEBUG ((LM_DEBUG,
00904              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00905              ACE_TEXT ("send() returns 0\n"),
00906              this->id ()));
00907         }
00908       return -1;
00909     }
00910   else if (retval == -1)
00911     {
00912       if (TAO_debug_level > 4)
00913         {
00914           ACE_DEBUG ((LM_DEBUG,
00915              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00916              ACE_TEXT ("error during %p\n"),
00917              this->id (), ACE_TEXT ("send()")));
00918         }
00919 
00920       if (errno == EWOULDBLOCK || errno == EAGAIN)
00921         {
00922           return 0;
00923         }
00924 
00925       return -1;
00926     }
00927 
00928   // ... start over, how do we guarantee progress?  Because if
00929   // no bytes are sent send() can only return 0 or -1
00930 
00931   // Total no. of bytes sent for a send call
00932   this->sent_byte_count_ += byte_count;
00933 
00934   if (TAO_debug_level > 4)
00935     {
00936       ACE_DEBUG ((LM_DEBUG,
00937          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00938          ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"),
00939          this->id(), byte_count, (this->head_ == 0)));
00940     }
00941 
00942   return 1;
00943 }
00944 
00945 int
00946 TAO_Transport::drain_queue_i (void)
00947 {
00948   // This is the vector used to send data, it must be declared outside
00949   // the loop because after the loop there may still be data to be
00950   // sent
00951   int iovcnt = 0;
00952 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00953   iovec iov[ACE_IOV_MAX] = { 0 , 0 };
00954 #else
00955   iovec iov[ACE_IOV_MAX];
00956 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
00957 
00958   // We loop over all the elements in the queue ...
00959   TAO_Queued_Message *i = this->head_;
00960 
00961   // Reset the value so that the counting is done for each new send
00962   // call.
00963   this->sent_byte_count_ = 0;
00964 
00965   // Avoid calling this expensive function each time through the loop. Instead
00966   // we'll assume that the time is unlikely to change much during the loop.
00967   // If we are forced to send in the loop then we'll recompute the time.
00968   ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr ();
00969 
00970   while (i != 0)
00971     {
00972       if (i->is_expired (now))
00973         {
00974           if (TAO_debug_level > 3)
00975           {
00976             ACE_DEBUG ((LM_DEBUG,
00977               ACE_TEXT ("TAO (%P|%t - Transport[%d]::drain_queue_i, ")
00978               ACE_TEXT ("Discarding expired queued message.\n"),
00979               this->id ()));
00980           }
00981           TAO_Queued_Message *next = i->next ();
00982           i->state_changed (TAO_LF_Event::LFS_TIMEOUT,
00983                             this->orb_core_->leader_follower ());
00984           i->remove_from_list (this->head_, this->tail_);
00985           i->destroy ();
00986           i = next;
00987           continue;
00988         }
00989       // ... each element fills the iovector ...
00990       i->fill_iov (ACE_IOV_MAX, iovcnt, iov);
00991 
00992       // ... the vector is full, no choice but to send some data out.
00993       // We need to loop because a single message can span multiple
00994       // IOV_MAX elements ...
00995       if (iovcnt == ACE_IOV_MAX)
00996         {
00997           int const retval =
00998             this->drain_queue_helper (iovcnt, iov);
00999 
01000           now = ACE_High_Res_Timer::gettimeofday_hr ();
01001 
01002           if (TAO_debug_level > 4)
01003             {
01004               ACE_DEBUG ((LM_DEBUG,
01005                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01006                  ACE_TEXT ("helper retval = %d\n"),
01007                  this->id (), retval));
01008             }
01009 
01010           if (retval != 1)
01011             {
01012               return retval;
01013             }
01014 
01015           i = this->head_;
01016           continue;
01017         }
01018       // ... notice that this line is only reached if there is still
01019       // room in the iovector ...
01020       i = i->next ();
01021     }
01022 
01023   if (iovcnt != 0)
01024     {
01025       int const retval = this->drain_queue_helper (iovcnt, iov);
01026 
01027       if (TAO_debug_level > 4)
01028         {
01029           ACE_DEBUG ((LM_DEBUG,
01030               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01031               ACE_TEXT ("helper retval = %d\n"),
01032               this->id (), retval));
01033         }
01034 
01035       if (retval != 1)
01036         {
01037           return retval;
01038         }
01039     }
01040 
01041   if (this->head_ == 0)
01042     {
01043       if (this->flush_timer_pending ())
01044         {
01045           ACE_Event_Handler *eh = this->event_handler_i ();
01046           ACE_Reactor * const reactor = eh->reactor ();
01047           reactor->cancel_timer (this->flush_timer_id_);
01048           this->reset_flush_timer ();
01049         }
01050 
01051       return 1;
01052     }
01053 
01054   return 0;
01055 }
01056 
01057 void
01058 TAO_Transport::cleanup_queue_i ()
01059 {
01060   if (TAO_debug_level > 4)
01061     {
01062       ACE_DEBUG ((LM_DEBUG,
01063          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
01064          ACE_TEXT ("cleaning up complete queue\n"),
01065          this->id ()));
01066     }
01067 
01068   size_t byte_count = 0;
01069   int msg_count = 0;
01070 
01071   // Cleanup all messages
01072   while (this->head_ != 0)
01073     {
01074       TAO_Queued_Message *i = this->head_;
01075 
01076       if (TAO_debug_level > 4)
01077         {
01078           byte_count += i->message_length();
01079           ++msg_count;
01080         }
01081        // @@ This is a good point to insert a flag to indicate that a
01082        //    CloseConnection message was successfully received.
01083       i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
01084                         this->orb_core_->leader_follower ());
01085 
01086       i->remove_from_list (this->head_, this->tail_);
01087 
01088       i->destroy ();
01089     }
01090 
01091   if (TAO_debug_level > 4)
01092     {
01093       ACE_DEBUG ((LM_DEBUG,
01094                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
01095                   ACE_TEXT ("discarded %d messages, %u bytes.\n"),
01096                   this->id (), msg_count, byte_count));
01097     }
01098 }
01099 
01100 void
01101 TAO_Transport::cleanup_queue (size_t byte_count)
01102 {
01103   while (this->head_ != 0 && byte_count > 0)
01104     {
01105       TAO_Queued_Message *i = this->head_;
01106 
01107       if (TAO_debug_level > 4)
01108         {
01109           ACE_DEBUG ((LM_DEBUG,
01110              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01111              ACE_TEXT ("byte_count = %d\n"),
01112              this->id (), byte_count));
01113         }
01114 
01115       // Update the state of the first message
01116       i->bytes_transferred (byte_count);
01117 
01118       if (TAO_debug_level > 4)
01119         {
01120           ACE_DEBUG ((LM_DEBUG,
01121              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01122              ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"),
01123              this->id (), byte_count, i->all_data_sent (),
01124              i->message_length ()));
01125         }
01126 
01127       // ... if all the data was sent the message must be removed from
01128       // the queue...
01129       if (i->all_data_sent ())
01130         {
01131           i->remove_from_list (this->head_, this->tail_);
01132           i->destroy ();
01133         }
01134     }
01135 }
01136 
01137 int
01138 TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush)
01139 {
01140   // First let's compute the size of the queue:
01141   size_t msg_count = 0;
01142   size_t total_bytes = 0;
01143 
01144   for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
01145     {
01146       ++msg_count;
01147       total_bytes += i->message_length ();
01148     }
01149 
01150   bool set_timer = false;
01151   ACE_Time_Value new_deadline;
01152 
01153   TAO::Transport_Queueing_Strategy *queue_strategy =
01154     stub->transport_queueing_strategy ();
01155 
01156   bool constraints_reached = true;
01157 
01158   if (queue_strategy)
01159     {
01160       constraints_reached =
01161         queue_strategy->buffering_constraints_reached (stub,
01162                                                        msg_count,
01163                                                        total_bytes,
01164                                                        must_flush,
01165                                                        this->current_deadline_,
01166                                                        set_timer,
01167                                                        new_deadline);
01168     }
01169   else
01170     {
01171       must_flush = false;
01172     }
01173 
01174   // ... set the new timer, also cancel any previous timers ...
01175   if (set_timer)
01176     {
01177       ACE_Event_Handler *eh = this->event_handler_i ();
01178       ACE_Reactor * const reactor = eh->reactor ();
01179       this->current_deadline_ = new_deadline;
01180       ACE_Time_Value delay = new_deadline - ACE_OS::gettimeofday ();
01181 
01182       if (this->flush_timer_pending ())
01183         {
01184           reactor->cancel_timer (this->flush_timer_id_);
01185         }
01186 
01187       this->flush_timer_id_ =
01188         reactor->schedule_timer (&this->transport_timer_,
01189                                  &this->current_deadline_,
01190                                  delay);
01191     }
01192 
01193   return constraints_reached;
01194 }
01195 
01196 void
01197 TAO_Transport::report_invalid_event_handler (const char *caller)
01198 {
01199   if (TAO_debug_level > 0)
01200     {
01201       ACE_DEBUG ((LM_DEBUG,
01202          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler")
01203          ACE_TEXT ("(%s) no longer associated with handler [tag=%d]\n"),
01204          this->id (), ACE_TEXT_CHAR_TO_TCHAR (caller), this->tag_));
01205     }
01206 }
01207 
01208 void
01209 TAO_Transport::send_connection_closed_notifications (void)
01210 {
01211   {
01212     ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
01213 
01214     this->send_connection_closed_notifications_i ();
01215   }
01216 
01217   this->tms ()->connection_closed ();
01218 }
01219 
01220 void
01221 TAO_Transport::send_connection_closed_notifications_i (void)
01222 {
01223   this->cleanup_queue_i ();
01224 
01225   this->messaging_object ()->reset ();
01226 }
01227 
01228 int
01229 TAO_Transport::send_message_shared_i (TAO_Stub *stub,
01230                                       int message_semantics,
01231                                       const ACE_Message_Block *message_block,
01232                                       ACE_Time_Value *max_wait_time)
01233 {
01234   int ret = 0;
01235 
01236 #if TAO_HAS_TRANSPORT_CURRENT == 1
01237   size_t const message_length = message_block->length ();
01238 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
01239 
01240   switch (message_semantics)
01241     {
01242       case TAO_Transport::TAO_TWOWAY_REQUEST:
01243         ret = this->send_synchronous_message_i (message_block,
01244                                                 max_wait_time);
01245         break;
01246 
01247       case TAO_Transport::TAO_REPLY:
01248         ret = this->send_reply_message_i (message_block,
01249                                           max_wait_time);
01250         break;
01251 
01252       case TAO_Transport::TAO_ONEWAY_REQUEST:
01253         ret = this->send_asynchronous_message_i (stub,
01254                                                  message_block,
01255                                                  max_wait_time);
01256         break;
01257     }
01258 
01259 #if TAO_HAS_TRANSPORT_CURRENT == 1
01260   // "Count" the message, only if no error was encountered.
01261   if (ret != -1 && this->stats_ != 0)
01262     this->stats_->messages_sent (message_length);
01263 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
01264 
01265   return ret;
01266 }
01267 
01268 int
01269 TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
01270                                             const ACE_Message_Block *message_block,
01271                                             ACE_Time_Value *max_wait_time)
01272 {
01273   // Let's figure out if the message should be queued without trying
01274   // to send first:
01275   bool try_sending_first = true;
01276 
01277   bool const queue_empty = (this->head_ == 0);
01278 
01279   TAO::Transport_Queueing_Strategy *queue_strategy =
01280     stub->transport_queueing_strategy ();
01281 
01282   if (!queue_empty)
01283     {
01284       try_sending_first = false;
01285     }
01286   else if (queue_strategy)
01287     {
01288       if (queue_strategy->must_queue (queue_empty))
01289         {
01290           try_sending_first = false;
01291         }
01292     }
01293 
01294   if (try_sending_first)
01295     {
01296       ssize_t n = 0;
01297       size_t byte_count = 0;
01298       // ... in this case we must try to send the message first ...
01299 
01300       size_t const total_length = message_block->total_length ();
01301 
01302       if (TAO_debug_level > 6)
01303         {
01304           ACE_DEBUG ((LM_DEBUG,
01305              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01306              ACE_TEXT ("trying to send the message (ml = %d)\n"),
01307              this->id (), total_length));
01308         }
01309 
01310       // @@ I don't think we want to hold the mutex here, however if
01311       // we release it we need to recheck the status of the transport
01312       // after we return... once I understand the final form for this
01313       // code I will re-visit this decision
01314       n = this->send_message_block_chain_i (message_block,
01315                                             byte_count,
01316                                             max_wait_time);
01317       if (n == -1)
01318         {
01319           // ... if this is just an EWOULDBLOCK we must schedule the
01320           // message for later, if it is ETIME we still have to send
01321           // the complete message, because cutting off the message at
01322           // this point will destroy the synchronization with the
01323           // server ...
01324           if (errno != EWOULDBLOCK && errno != ETIME)
01325             {
01326               if (TAO_debug_level > 0)
01327                 {
01328                   ACE_ERROR ((LM_ERROR,
01329                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01330                      ACE_TEXT ("fatal error in ")
01331                      ACE_TEXT ("send_message_block_chain_i - %m\n"),
01332                      this->id ()));
01333                 }
01334               return -1;
01335             }
01336         }
01337 
01338       // ... let's figure out if the complete message was sent ...
01339       if (total_length == byte_count)
01340         {
01341           // Done, just return.  Notice that there are no allocations
01342           // or copies up to this point (though some fancy calling
01343           // back and forth).
01344           // This is the common case for the critical path, it should
01345           // be fast.
01346           return 0;
01347         }
01348 
01349       // If it was partially sent, then we can't allow a timeout
01350       if (byte_count > 0)
01351         max_wait_time = 0;
01352 
01353       if (TAO_debug_level > 6)
01354         {
01355           ACE_DEBUG ((LM_DEBUG,
01356              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01357              ACE_TEXT ("partial send %d / %d bytes\n"),
01358              this->id (), byte_count, total_length));
01359         }
01360 
01361       // ... part of the data was sent, need to figure out what piece
01362       // of the message block chain must be queued ...
01363       while (message_block != 0 && message_block->length () == 0)
01364         {
01365           message_block = message_block->cont ();
01366         }
01367 
01368       // ... at least some portion of the message block chain should
01369       // remain ...
01370     }
01371 
01372   // ... either the message must be queued or we need to queue it
01373   // because it was not completely sent out ...
01374 
01375   if (TAO_debug_level > 6)
01376     {
01377       ACE_DEBUG ((LM_DEBUG,
01378          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01379          ACE_TEXT ("message is queued\n"),
01380          this->id ()));
01381     }
01382 
01383   if (this->queue_message_i (message_block, max_wait_time) == -1)
01384   {
01385     if (TAO_debug_level > 0)
01386       {
01387         ACE_DEBUG ((LM_DEBUG,
01388                     ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01389                     ACE_TEXT ("send_asynchronous_message_i, ")
01390                     ACE_TEXT ("cannot queue message for  - %m\n"),
01391                     this->id ()));
01392       }
01393     return -1;
01394   }
01395 
01396   // ... if the queue is full we need to activate the output on the
01397   // queue ...
01398   bool must_flush = false;
01399   const bool constraints_reached =
01400     this->check_buffering_constraints_i (stub,
01401                                          must_flush);
01402 
01403   // ... but we also want to activate it if the message was partially
01404   // sent.... Plus, when we use the blocking flushing strategy the
01405   // queue is flushed as a side-effect of 'schedule_output()'
01406 
01407   TAO_Flushing_Strategy *flushing_strategy =
01408     this->orb_core ()->flushing_strategy ();
01409 
01410   if (constraints_reached || try_sending_first)
01411     {
01412       int const result = flushing_strategy->schedule_output (this);
01413       if (result == TAO_Flushing_Strategy::MUST_FLUSH)
01414         {
01415           must_flush = true;
01416         }
01417     }
01418 
01419   if (must_flush)
01420     {
01421       typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
01422       TAO_REVERSE_LOCK reverse (*this->handler_lock_);
01423       ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
01424 
01425       (void) flushing_strategy->flush_transport (this);
01426     }
01427 
01428   return 0;
01429 }
01430 
01431 int
01432 TAO_Transport::queue_message_i (const ACE_Message_Block *message_block,
01433                                 ACE_Time_Value *max_wait_time)
01434 {
01435   TAO_Queued_Message *queued_message = 0;
01436   ACE_NEW_RETURN (queued_message,
01437                   TAO_Asynch_Queued_Message (message_block,
01438                                              this->orb_core_,
01439                                              max_wait_time,
01440                                              0,
01441                                              true),
01442                   -1);
01443   queued_message->push_back (this->head_, this->tail_);
01444 
01445   return 0;
01446 }
01447 
01448 /*
01449  *
01450  * All the methods relevant to the incoming data path of the ORB are
01451  * defined below
01452  *
01453  */
01454 int
01455 TAO_Transport::handle_input (TAO_Resume_Handle &rh,
01456                              ACE_Time_Value * max_wait_time)
01457 {
01458   if (TAO_debug_level > 3)
01459     {
01460       ACE_DEBUG ((LM_DEBUG,
01461          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"),
01462          this->id ()));
01463     }
01464 
01465   // First try to process messages of the head of the incoming queue.
01466   int const retval = this->process_queue_head (rh);
01467 
01468   if (retval <= 0)
01469     {
01470       if (retval == -1)
01471         {
01472           if (TAO_debug_level > 2)
01473             {
01474               ACE_DEBUG ((LM_DEBUG,
01475                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01476                  ACE_TEXT ("error while parsing the head of the queue\n"),
01477                  this->id()));
01478 
01479             }
01480           return -1;
01481         }
01482       else
01483         {
01484           // retval == 0
01485 
01486           // Processed a message in queue successfully. This
01487           // thread must return to thread-pool now.
01488           return 0;
01489         }
01490     }
01491 
01492   TAO_Queued_Data *q_data = 0;
01493 
01494   if (this->incoming_message_stack_.top (q_data) != -1
01495       && q_data->missing_data () != TAO_MISSING_DATA_UNDEFINED)
01496     {
01497       /* PRE: q_data->missing_data_ > 0 as all QD on stack must be incomplete  */
01498       if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1)
01499         {
01500           if (TAO_debug_level > 0)
01501             {
01502               ACE_ERROR ((LM_ERROR,
01503                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01504                  ACE_TEXT ("error consolidating incoming message\n"),
01505                  this->id ()));
01506             }
01507           return -1;
01508         }
01509     }
01510   else
01511     {
01512       if (this->handle_input_parse_data (rh, max_wait_time) == -1)
01513         {
01514           if (TAO_debug_level > 0)
01515             {
01516               ACE_ERROR ((LM_ERROR,
01517                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01518                  ACE_TEXT ("error parsing incoming message\n"),
01519                  this->id ()));
01520             }
01521           return -1;
01522         }
01523     }
01524 
01525   return 0;
01526 }
01527 
01528 int
01529 TAO_Transport::consolidate_process_message (TAO_Queued_Data *q_data,
01530                                             TAO_Resume_Handle &rh)
01531 {
01532   // paranoid check
01533   if (q_data->missing_data () != 0)
01534     {
01535       if (TAO_debug_level > 0)
01536         {
01537            ACE_ERROR ((LM_ERROR,
01538               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01539               ACE_TEXT ("missing data\n"),
01540               this->id ()));
01541         }
01542        return -1;
01543     }
01544 
01545   if (q_data->more_fragments () ||
01546       q_data->msg_type () == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
01547     {
01548       // consolidate message on top of stack, only for fragmented messages
01549       TAO_Queued_Data *new_q_data = 0;
01550 
01551       switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01552         {
01553         case -1: // error
01554           return -1;
01555 
01556         case 0:  // returning consolidated message in q_data
01557           if (!new_q_data)
01558             {
01559               if (TAO_debug_level > 0)
01560                 {
01561                   ACE_ERROR ((LM_ERROR,
01562                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01563                      ACE_TEXT ("error, consolidated message is NULL\n"),
01564                      this->id ()));
01565                 }
01566               return -1;
01567             }
01568 
01569 
01570           if (this->process_parsed_messages (new_q_data, rh) == -1)
01571             {
01572               TAO_Queued_Data::release (new_q_data);
01573 
01574               if (TAO_debug_level > 0)
01575                 {
01576                   ACE_ERROR ((LM_ERROR,
01577                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01578                      ACE_TEXT ("error processing consolidated message\n"),
01579                      this->id ()));
01580                 }
01581               return -1;
01582             }
01583 
01584           TAO_Queued_Data::release (new_q_data);
01585 
01586           break;
01587 
01588         case 1:  // fragment has been stored in messaging_oject()
01589           break;
01590         }
01591     }
01592   else
01593     {
01594       if (this->process_parsed_messages (q_data, rh) == -1)
01595         {
01596           TAO_Queued_Data::release (q_data);
01597 
01598           if (TAO_debug_level > 0)
01599             {
01600               ACE_ERROR ((LM_ERROR,
01601                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01602                  ACE_TEXT ("error processing message\n"),
01603                  this->id ()));
01604             }
01605           return -1;
01606         }
01607 
01608       TAO_Queued_Data::release (q_data);
01609 
01610     }
01611 
01612   return 0;
01613 }
01614 
01615 int
01616 TAO_Transport::consolidate_enqueue_message (TAO_Queued_Data *q_data)
01617 {
01618   // consolidate message on top of stack, only for fragmented messages
01619 
01620   // paranoid check
01621   if (q_data->missing_data () != 0)
01622     {
01623        return -1;
01624     }
01625 
01626   if (q_data->more_fragments () ||
01627       q_data->msg_type () == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
01628     {
01629       TAO_Queued_Data *new_q_data = 0;
01630 
01631       switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01632         {
01633         case -1: // error
01634           return -1;
01635 
01636         case 0:  // returning consolidated message in new_q_data
01637           if (!new_q_data)
01638             {
01639               if (TAO_debug_level > 0)
01640                 {
01641                   ACE_ERROR ((LM_ERROR,
01642                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ")
01643                      ACE_TEXT ("error, consolidated message is NULL\n"),
01644                      this->id ()));
01645                 }
01646               return -1;
01647             }
01648 
01649           if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0)
01650             {
01651               TAO_Queued_Data::release (new_q_data);
01652               return -1;
01653             }
01654           break;
01655 
01656         case 1:  // fragment has been stored in messaging_oject()
01657           break;
01658         }
01659     }
01660   else
01661     {
01662       if (this->incoming_message_queue_.enqueue_tail (q_data) != 0)
01663         {
01664           TAO_Queued_Data::release (q_data);
01665           return -1;
01666         }
01667     }
01668 
01669   return 0; // success
01670 }
01671 
01672 int
01673 TAO_Transport::handle_input_missing_data (TAO_Resume_Handle &rh,
01674                                           ACE_Time_Value * max_wait_time,
01675                                           TAO_Queued_Data *q_data)
01676 {
01677   // paranoid check
01678   if (q_data == 0)
01679     {
01680       return -1;
01681     }
01682 
01683   if (TAO_debug_level > 3)
01684     {
01685       ACE_DEBUG ((LM_DEBUG,
01686          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01687          ACE_TEXT ("enter (missing data == %d)\n"),
01688          this->id (), q_data->missing_data ()));
01689     }
01690 
01691   size_t const recv_size = q_data->missing_data ();
01692 
01693   if (q_data->msg_block ()->space() < recv_size)
01694     {
01695       // make sure the message_block has enough space
01696       size_t const message_size = recv_size + q_data->msg_block ()->length();
01697 
01698       if (ACE_CDR::grow (q_data->msg_block (), message_size) == -1)
01699         {
01700           return -1;
01701         }
01702     }
01703 
01704   // Saving the size of the received buffer in case any one needs to
01705   // get the size of the message thats received in the
01706   // context. Obviously the value will be changed for each recv call
01707   // and the user is supposed to invoke the accessor only in the
01708   // invocation context to get meaningful information.
01709   this->recv_buffer_size_ = recv_size;
01710 
01711   // Read the message into the existing message block on heap
01712   ssize_t const n = this->recv (q_data->msg_block ()->wr_ptr(),
01713                                 recv_size,
01714                                 max_wait_time);
01715 
01716 
01717   if (n <= 0)
01718     {
01719       return n;
01720     }
01721 
01722   if (TAO_debug_level > 3)
01723     {
01724       ACE_DEBUG ((LM_DEBUG,
01725          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01726          ACE_TEXT ("read bytes %d\n"),
01727          this->id (), n));
01728     }
01729 
01730   q_data->msg_block ()->wr_ptr(n);
01731   q_data->missing_data (q_data->missing_data () - n);
01732 
01733   if (q_data->missing_data () == 0)
01734     {
01735       // paranoid check
01736       if (this->incoming_message_stack_.pop (q_data) == -1)
01737         {
01738           return -1;
01739         }
01740 
01741       if (this->consolidate_process_message (q_data, rh) == -1)
01742         {
01743           return -1;
01744         }
01745     }
01746 
01747   return 0;
01748 }
01749 
01750 
01751 int
01752 TAO_Transport::handle_input_parse_extra_messages (ACE_Message_Block &message_block)
01753 {
01754 
01755   // store buffer status of last extraction: -1 parse error, 0
01756   // incomplete message header in buffer, 1 complete messages header
01757   // parsed
01758   int buf_status = 0;
01759 
01760   TAO_Queued_Data *q_data = 0;     // init
01761 
01762   // parse buffer until all messages have been extracted, consolidate
01763   // and enqueue complete messages, if the last message being parsed
01764   // has missin data, it is stays on top of incoming_message_stack.
01765   while (message_block.length () > 0 &&
01766          (buf_status = this->messaging_object ()->extract_next_message
01767           (message_block, q_data)) != -1 &&
01768          q_data != 0) // paranoid check
01769     {
01770       if (q_data->missing_data () == 0)
01771         {
01772           if (this->consolidate_enqueue_message (q_data) == -1)
01773             {
01774               return -1;
01775             }
01776         }
01777       else  // incomplete message read, probably the last message in buffer
01778         {
01779           // can not fail
01780           this->incoming_message_stack_.push (q_data);
01781         }
01782 
01783       q_data = 0; // reset
01784     } // while
01785 
01786   if (buf_status == -1)
01787     {
01788       return -1;
01789     }
01790 
01791   return 0;
01792 }
01793 
01794 int
01795 TAO_Transport::handle_input_parse_data  (TAO_Resume_Handle &rh,
01796                                          ACE_Time_Value * max_wait_time)
01797 {
01798 
01799   if (TAO_debug_level > 3)
01800     {
01801       ACE_DEBUG ((LM_DEBUG,
01802          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01803          ACE_TEXT ("enter\n"),
01804          this->id ()));
01805     }
01806 
01807 
01808   // The buffer on the stack which will be used to hold the input
01809   // messages, ACE_CDR::MAX_ALIGNMENT compensates the
01810   // memory-alignment. This improves performance with SUN-Java-ORB-1.4
01811   // and higher that sends fragmented requests of size 1024 bytes.
01812   char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT];
01813 
01814 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
01815   (void) ACE_OS::memset (buf,
01816                          '\0',
01817                          sizeof buf);
01818 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
01819 
01820   // Create a data block
01821   ACE_Data_Block db (sizeof (buf),
01822                      ACE_Message_Block::MB_DATA,
01823                      buf,
01824                      this->orb_core_->input_cdr_buffer_allocator (),
01825                      this->orb_core_->locking_strategy (),
01826                      ACE_Message_Block::DONT_DELETE,
01827                      this->orb_core_->input_cdr_dblock_allocator ());
01828 
01829   // Create a message block
01830   ACE_Message_Block message_block (&db,
01831                                    ACE_Message_Block::DONT_DELETE,
01832                                    this->orb_core_->input_cdr_msgblock_allocator ());
01833 
01834 
01835   // Align the message block
01836   ACE_CDR::mb_align (&message_block);
01837 
01838   size_t recv_size = 0; // Note: unsigned integer
01839 
01840   // Pointer to newly parsed message
01841   TAO_Queued_Data *q_data = 0;
01842 
01843   // optimizing access of constants
01844   size_t const header_length = this->messaging_object ()->header_length ();
01845 
01846   // paranoid check
01847   if (header_length > message_block.space ())
01848     {
01849       return -1;
01850     }
01851 
01852   if (this->orb_core_->orb_params ()->single_read_optimization ())
01853     {
01854       recv_size = message_block.space ();
01855     }
01856   else
01857     {
01858       // Single read optimization has been de-activated. That means
01859       // that we need to read from transport the GIOP header first
01860       // before the payload. This codes first checks the incoming
01861       // stack for partial messages which needs to be
01862       // consolidated. Otherwise we are in new cycle, reading complete
01863       // GIOP header of new incoming message.
01864       if (this->incoming_message_stack_.top (q_data) != -1
01865            && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
01866         {
01867           // There is a partial message on incoming_message_stack_
01868           // whose length is unknown so far. We need to consolidate
01869           // the GIOP header to get to know the payload size,
01870           recv_size = header_length - q_data->msg_block ()->length ();
01871         }
01872       else
01873         {
01874           // Read amount of data forming GIOP header of new incoming
01875           // message.
01876           recv_size = header_length;
01877         }
01878       // POST: 0 <= recv_size <= header_length
01879     }
01880   // POST: 0 <= recv_size <= message_block->space ()
01881 
01882   // If we have a partial message, copy it into our message block and
01883   // clear out the partial message.
01884   if (this->partial_message_ != 0 && this->partial_message_->length () > 0)
01885     {
01886       // (*) Copy back the partial message into current read-buffer,
01887       // verify that the read-strategy of "recv_size" bytes is not
01888       // exceeded. The latter check guarantees that recv_size does not
01889       // roll-over and keeps in range
01890       // 0<=recv_size<=message_block->space()
01891       if (this->partial_message_->length () <= recv_size &&
01892           message_block.copy (this->partial_message_->rd_ptr (),
01893                               this->partial_message_->length ()) == 0)
01894         {
01895 
01896           recv_size -= this->partial_message_->length ();
01897           this->partial_message_->reset ();
01898         }
01899       else
01900         {
01901           return -1;
01902         }
01903     }
01904   // POST: 0 <= recv_size <= buffer_space
01905 
01906   if (0 >= recv_size) // paranoid: the check above (*) guarantees recv_size>=0
01907     {
01908       // This event would cause endless looping, trying frequently to
01909       // read zero bytes from stream.  This might happen, if TAOs
01910       // protocol implementation is not correct and tries to read data
01911       // beyond header without "single_read_optimazation" being
01912       // activated.
01913       if (TAO_debug_level > 0)
01914         {
01915           ACE_ERROR ((LM_ERROR,
01916              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01917              ACE_TEXT ("Error - endless loop detection, closing connection"),
01918              this->id ()));
01919         }
01920       return -1;
01921     }
01922 
01923   // Saving the size of the received buffer in case any one needs to
01924   // get the size of the message thats received in the
01925   // context. Obviously the value will be changed for each recv call
01926   // and the user is supposed to invoke the accessor only in the
01927   // invocation context to get meaningful information.
01928   this->recv_buffer_size_ = recv_size;
01929 
01930   // Read the message into the  message block that we have created on
01931   // the stack.
01932   ssize_t const n = this->recv (message_block.wr_ptr (),
01933                                 recv_size,
01934                                 max_wait_time);
01935 
01936   // If there is an error return to the reactor..
01937   if (n <= 0)
01938     {
01939       return n;
01940     }
01941 
01942   if (TAO_debug_level > 3)
01943     {
01944       ACE_DEBUG ((LM_DEBUG,
01945          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01946          ACE_TEXT ("read %d bytes\n"),
01947          this->id (), n));
01948     }
01949 
01950   // Set the write pointer in the stack buffer
01951   message_block.wr_ptr (n);
01952 
01953   //
01954   // STACK PROCESSING OR MESSAGE CONSOLIDATION
01955   //
01956 
01957   // PRE: data in buffer is aligned && message_block.length() > 0
01958 
01959   if (this->incoming_message_stack_.top (q_data) != -1
01960       && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
01961     {
01962       //
01963       // MESSAGE CONSOLIDATION
01964       //
01965 
01966       // Partial message on incoming_message_stack_ needs to be
01967       // consolidated.  The message header could not be parsed so far
01968       // and therefor the message size is unknown yet. Consolidating
01969       // the message destroys the memory alignment of succeeding
01970       // messages sharing the buffer, for that reason consolidation
01971       // and stack based processing are mutial exclusive.
01972       if (this->messaging_object ()->consolidate_node (q_data,
01973                                                        message_block) == -1)
01974         {
01975            if (TAO_debug_level > 0)
01976             {
01977                 ACE_ERROR ((LM_ERROR,
01978                    ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01979                    ACE_TEXT ("error consolidating message from input buffer\n"),
01980                    this->id () ));
01981              }
01982            return -1;
01983         }
01984 
01985       // Complete message are to be enqueued and later processed
01986       if (q_data->missing_data () == 0)
01987         {
01988           if (this->incoming_message_stack_.pop (q_data) == -1)
01989             {
01990               return -1;
01991             }
01992 
01993           if (this->consolidate_enqueue_message (q_data) == -1)
01994             {
01995               return -1;
01996             }
01997         }
01998 
01999       if (message_block.length () > 0
02000           && this->handle_input_parse_extra_messages (message_block) == -1)
02001         {
02002           return -1;
02003         }
02004 
02005       // In any case try to process the enqueued messages
02006       if (this->process_queue_head (rh) == -1)
02007         {
02008           return -1;
02009         }
02010     }
02011   else
02012     {
02013       //
02014       // STACK PROCESSING (critical path)
02015       //
02016 
02017       // Process the first message in buffer on stack
02018 
02019       // (PRE: first message resides in aligned memory) Make a node of
02020       // the message-block..
02021 
02022       TAO_Queued_Data qd (&message_block,
02023                           this->orb_core_->transport_message_buffer_allocator ());
02024 
02025       size_t mesg_length  = 0;
02026 
02027       if (this->messaging_object ()->parse_next_message (qd,
02028                                                          mesg_length) == -1
02029           || (qd.missing_data () == 0
02030               && mesg_length > message_block.length ()) )
02031         {
02032           // extracting message failed
02033           return -1;
02034         }
02035       // POST: qd.missing_data_ == 0 --> mesg_length <= message_block.length()
02036       // This prevents seeking rd_ptr behind the wr_ptr
02037 
02038       if (qd.missing_data () != 0 ||
02039           qd.more_fragments ()   ||
02040           qd.msg_type () == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
02041         {
02042           if (qd.missing_data () == 0)
02043             {
02044               // Dealing with a fragment
02045               TAO_Queued_Data *nqd =
02046                 TAO_Queued_Data::duplicate (qd);
02047 
02048               if (nqd == 0)
02049                 {
02050                   return -1;
02051                 }
02052 
02053               // mark the end of message in new buffer
02054               char* end_mark = nqd->msg_block ()->rd_ptr ()
02055                              + mesg_length;
02056               nqd->msg_block ()->wr_ptr (end_mark);
02057 
02058               // move the read pointer forward in old buffer
02059               message_block.rd_ptr (mesg_length);
02060 
02061               // enqueue the message
02062               if (this->consolidate_enqueue_message (nqd) == -1)
02063                 {
02064                   return -1;
02065                 }
02066 
02067               if (message_block.length () > 0
02068                   && this->handle_input_parse_extra_messages (message_block) == -1)
02069                 {
02070                   return -1;
02071                 }
02072 
02073               // In any case try to process the enqueued messages
02074               if (this->process_queue_head (rh) == -1)
02075                 {
02076                   return -1;
02077                 }
02078             }
02079           else if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED)
02080             {
02081               // Incomplete message, must be the last one in buffer
02082 
02083               if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED &&
02084                   qd.missing_data () > message_block.space ())
02085                 {
02086                   // Re-Allocate correct size on heap
02087                   if (ACE_CDR::grow (qd.msg_block (),
02088                                      message_block.length ()
02089                                      + qd.missing_data ()) == -1)
02090                     {
02091                       return -1;
02092                     }
02093                 }
02094 
02095               TAO_Queued_Data *nqd =
02096                 TAO_Queued_Data::duplicate (qd);
02097 
02098               if (nqd == 0)
02099                 {
02100                   return -1;
02101                 }
02102 
02103               // move read-pointer to end of buffer
02104               message_block.rd_ptr (message_block.length());
02105 
02106               this->incoming_message_stack_.push (nqd);
02107             }
02108         }
02109       else
02110         {
02111           //
02112           // critical path
02113           //
02114 
02115           // We cant process the message on stack right now. First we
02116           // have got to parse extra messages from message_block,
02117           // putting them into queue.  When this is done we can return
02118           // to process this message, and notifying other threads to
02119           // process the messages in queue.
02120 
02121           char * end_marker = message_block.rd_ptr ()
02122                             + mesg_length;
02123 
02124           if (message_block.length () > mesg_length)
02125             {
02126               // There are more message in data stream to be parsed.
02127               // Safe the rd_ptr to restore later.
02128               char *rd_ptr_stack_mesg = message_block.rd_ptr ();
02129 
02130               // Skip parsed message, jump to next message in buffer
02131               // PRE: mesg_length <= message_block.length ()
02132               message_block.rd_ptr (mesg_length);
02133 
02134               // Extract remaining messages and enqueue them for later
02135               // heap processing
02136               if (this->handle_input_parse_extra_messages (message_block) == -1)
02137                 {
02138                   return -1;
02139                 }
02140 
02141               // correct the end_marker
02142               end_marker = message_block.rd_ptr ();
02143 
02144               // Restore rd_ptr
02145               message_block.rd_ptr (rd_ptr_stack_mesg);
02146             }
02147 
02148           // The following if-else has been copied from
02149           // process_queue_head().  While process_queue_head()
02150           // processes message on heap, here we will process a message
02151           // on stack.
02152 
02153           // Now that we have one message on stack to be processed,
02154           // check whether we have one more message in the queue...
02155           if (this->incoming_message_queue_.queue_length () > 0)
02156             {
02157               if (TAO_debug_level > 0)
02158                 {
02159                   ACE_DEBUG ((LM_DEBUG,
02160                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02161                      ACE_TEXT ("notify reactor\n"),
02162                      this->id ()));
02163 
02164                 }
02165 
02166               const int retval = this->notify_reactor ();
02167 
02168               if (retval == 1)
02169                 {
02170                   // Let the class know that it doesn't need to resume  the
02171                   // handle..
02172                   rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02173                 }
02174               else if (retval < 0)
02175                 return -1;
02176             }
02177           else
02178             {
02179               // As there are no further messages in queue just resume
02180               // the handle. Set the flag incase someone had reset the flag..
02181               rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02182             }
02183 
02184           // PRE: incoming_message_queue is empty
02185           if (this->process_parsed_messages (&qd,
02186                                              rh) == -1)
02187             {
02188               return -1;
02189             }
02190 
02191           // move the rd_ptr tp position of end_marker
02192           message_block.rd_ptr (end_marker);
02193         }
02194     }
02195 
02196   // Now that all cases have been processed, there might be kept some data
02197   // in buffer that needs to be safed for next "handle_input" invocations.
02198    if (message_block.length () > 0)
02199      {
02200        if (this->partial_message_ == 0)
02201          {
02202            this->allocate_partial_message_block ();
02203          }
02204 
02205        if (this->partial_message_ != 0 &&
02206            this->partial_message_->copy (message_block.rd_ptr (),
02207                                          message_block.length ()) == 0)
02208          {
02209            message_block.rd_ptr (message_block.length ());
02210          }
02211        else
02212          {
02213            return -1;
02214          }
02215      }
02216 
02217    return 0;
02218 }
02219 
02220 
02221 int
02222 TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
02223                                         TAO_Resume_Handle &rh)
02224 {
02225   if (TAO_debug_level > 7)
02226     {
02227       ACE_DEBUG ((LM_DEBUG,
02228          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02229          ACE_TEXT ("entering (missing data == %d)\n"),
02230          this->id(), qd->missing_data ()));
02231     }
02232 
02233 #if TAO_HAS_TRANSPORT_CURRENT == 1
02234   // Update stats, if any
02235   if (this->stats_ != 0)
02236     this->stats_->messages_received (qd->msg_block ()->length ());
02237 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
02238 
02239   switch (qd->msg_type ())
02240   {
02241     case TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION:
02242     {
02243       if (TAO_debug_level > 0)
02244         ACE_DEBUG ((LM_DEBUG,
02245            ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02246            ACE_TEXT ("received CloseConnection message - %m\n"),
02247            this->id()));
02248 
02249       // Return a "-1" so that the next stage can take care of
02250       // closing connection and the necessary memory management.
02251       return -1;
02252     }
02253     break;
02254     case TAO_PLUGGABLE_MESSAGE_REQUEST:
02255     case TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST:
02256     {
02257       // Let us resume the handle before we go ahead to process the
02258       // request. This will open up the handle for other threads.
02259       rh.resume_handle ();
02260 
02261       if (this->messaging_object ()->process_request_message (
02262             this,
02263             qd) == -1)
02264         {
02265           // Return a "-1" so that the next stage can take care of
02266           // closing connection and the necessary memory management.
02267           return -1;
02268         }
02269     }
02270     break;
02271     case TAO_PLUGGABLE_MESSAGE_REPLY:
02272     case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY:
02273     {
02274       rh.resume_handle ();
02275 
02276       TAO_Pluggable_Reply_Params params (this);
02277 
02278       if (this->messaging_object ()->process_reply_message (params,
02279                                                             qd) == -1)
02280         {
02281           if (TAO_debug_level > 0)
02282             ACE_DEBUG ((LM_DEBUG,
02283                ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02284                ACE_TEXT ("error in process_reply_message - %m\n"),
02285                this->id ()));
02286 
02287           return -1;
02288         }
02289 
02290     }
02291     break;
02292     case TAO_PLUGGABLE_MESSAGE_CANCELREQUEST:
02293     {
02294       // The associated request might be incomplete residing
02295       // fragmented in messaging object. We must make sure the
02296       // resources allocated by fragments are released.
02297       if (this->messaging_object ()->discard_fragmented_message (qd) == -1)
02298         {
02299           if (TAO_debug_level > 0)
02300             {
02301               ACE_ERROR ((LM_ERROR,
02302                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02303                  ACE_TEXT ("error processing CancelRequest\n"),
02304                  this->id ()));
02305             }
02306         }
02307 
02308       // We are not able to cancel requests being processed already;
02309       // this is declared as optional feature by CORBA, and TAO does
02310       // not support this currently.
02311 
02312       // Just continue processing, CancelRequest does not mean to cut
02313       // off the connection.
02314     }
02315     break;
02316     case TAO_PLUGGABLE_MESSAGE_MESSAGERROR:
02317     {
02318       if (TAO_debug_level > 0)
02319         {
02320           ACE_ERROR ((LM_ERROR,
02321              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02322              ACE_TEXT ("received MessageError, closing connection\n"),
02323              this->id ()));
02324         }
02325       return -1;
02326     }
02327     break;
02328     case TAO_PLUGGABLE_MESSAGE_FRAGMENT:
02329     {
02330       // Nothing to be done.
02331     }
02332     break;
02333   }
02334 
02335   // If not, just return back..
02336   return 0;
02337 }
02338 
02339 int
02340 TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
02341 {
02342   if (TAO_debug_level > 3)
02343     {
02344       ACE_DEBUG ((LM_DEBUG,
02345          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"),
02346          this->id (), this->incoming_message_queue_.queue_length () ));
02347     }
02348 
02349   // See if  message in queue ...
02350   if (this->incoming_message_queue_.queue_length () > 0)
02351     {
02352       // Get the message on the head of the queue..
02353       TAO_Queued_Data *qd =
02354         this->incoming_message_queue_.dequeue_head ();
02355 
02356       if (TAO_debug_level > 3)
02357         {
02358           ACE_DEBUG ((LM_DEBUG,
02359              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02360              ACE_TEXT ("the size of the queue is [%d]\n"),
02361              this->id (),
02362              this->incoming_message_queue_.queue_length()));
02363         }
02364       // Now that we have pulled out out one message out of the queue,
02365       // check whether we have one more message in the queue...
02366       if (this->incoming_message_queue_.queue_length () > 0)
02367         {
02368           if (TAO_debug_level > 0)
02369             {
02370               ACE_DEBUG ((LM_DEBUG,
02371                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02372                  ACE_TEXT ("notify reactor\n"),
02373                  this->id ()));
02374             }
02375 
02376           int const retval = this->notify_reactor ();
02377 
02378           if (retval == 1)
02379             {
02380               // Let the class know that it doesn't need to resume  the
02381               // handle..
02382               rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02383             }
02384           else if (retval < 0)
02385             return -1;
02386         }
02387       else
02388         {
02389           // As we are ready to process the last message just resume
02390           // the handle. Set the flag incase someone had reset the flag..
02391           rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02392         }
02393 
02394       // Process the message...
02395       if (this->process_parsed_messages (qd, rh) == -1)
02396         {
02397           return -1;
02398         }
02399 
02400       // Delete the Queued_Data..
02401       TAO_Queued_Data::release (qd);
02402 
02403       return 0;
02404     }
02405 
02406   return 1;
02407 }
02408 
02409 int
02410 TAO_Transport::notify_reactor (void)
02411 {
02412   if (!this->ws_->is_registered ())
02413     {
02414       return 0;
02415     }
02416 
02417   ACE_Event_Handler *eh = this->event_handler_i ();
02418 
02419   // Get the reactor associated with the event handler
02420   ACE_Reactor *reactor = this->orb_core ()->reactor ();
02421 
02422   if (TAO_debug_level > 0)
02423     {
02424       ACE_DEBUG ((LM_DEBUG,
02425          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02426          ACE_TEXT ("notify to Reactor\n"),
02427          this->id ()));
02428     }
02429 
02430 
02431   // Send a notification to the reactor...
02432   int const retval = reactor->notify (eh, ACE_Event_Handler::READ_MASK);
02433 
02434   if (retval < 0 && TAO_debug_level > 2)
02435     {
02436       // @todo: need to think about what is the action that
02437       // we can take when we get here.
02438       ACE_DEBUG ((LM_DEBUG,
02439          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02440          ACE_TEXT ("notify to the reactor failed..\n"),
02441          this->id ()));
02442     }
02443 
02444   return 1;
02445 }
02446 
02447 TAO::Transport_Cache_Manager &
02448 TAO_Transport::transport_cache_manager (void)
02449 {
02450   return this->orb_core_->lane_resources ().transport_cache ();
02451 }
02452 
02453 void
02454 TAO_Transport::assign_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp)
02455 {
02456   if (this->char_translator_)
02457     {
02458       this->char_translator_->assign (inp);
02459       this->char_translator_->assign (outp);
02460     }
02461   if (this->wchar_translator_)
02462     {
02463       this->wchar_translator_->assign (inp);
02464       this->wchar_translator_->assign (outp);
02465     }
02466 }
02467 
02468 void
02469 TAO_Transport::clear_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp)
02470 {
02471   if (inp)
02472     {
02473       inp->char_translator (0);
02474       inp->wchar_translator (0);
02475     }
02476   if (outp)
02477     {
02478       outp->char_translator (0);
02479       outp->wchar_translator (0);
02480     }
02481 }
02482 
02483 ACE_Event_Handler::Reference_Count
02484 TAO_Transport::add_reference (void)
02485 {
02486   return this->event_handler_i ()->add_reference ();
02487 }
02488 
02489 ACE_Event_Handler::Reference_Count
02490 TAO_Transport::remove_reference (void)
02491 {
02492   return this->event_handler_i ()->remove_reference ();
02493 }
02494 
02495 TAO_OutputCDR &
02496 TAO_Transport::out_stream (void)
02497 {
02498   return this->messaging_object ()->out_stream ();
02499 }
02500 
02501 void
02502 TAO_Transport::pre_close (void)
02503 {
02504   this->is_connected_ = false;
02505   this->purge_entry ();
02506   {
02507     ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
02508     this->cleanup_queue_i ();
02509   }
02510 }
02511 
02512 bool
02513 TAO_Transport::post_open (size_t id)
02514 {
02515   this->id_ = id;
02516 
02517   {
02518     ACE_GUARD_RETURN (ACE_Lock,
02519                       ace_mon,
02520                       *this->handler_lock_,
02521                       false);
02522     this->is_connected_ = true;
02523   }
02524 
02525   // When we have data in our outgoing queue schedule ourselves
02526   // for output
02527   if (this->queue_is_empty_i ())
02528     return true;
02529 
02530   // If the wait strategy wants us to be registered with the reactor
02531   // then we do so. If registeration is required and it succeeds,
02532   // #REFCOUNT# becomes two.
02533   if (this->wait_strategy ()->register_handler () != 0)
02534     {
02535       // Registration failures.
02536 
02537       // Purge from the connection cache, if we are not in the cache, this
02538       // just does nothing.
02539       (void) this->purge_entry ();
02540 
02541       // Close the handler.
02542       (void) this->close_connection ();
02543 
02544       if (TAO_debug_level > 0)
02545         ACE_ERROR ((LM_ERROR,
02546            ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_connect , ")
02547            ACE_TEXT ("could not register the transport ")
02548            ACE_TEXT ("in the reactor.\n"),
02549            this->id ()));
02550 
02551       return false;
02552     }
02553 
02554   return true;
02555 }
02556 
02557 void
02558 TAO_Transport::allocate_partial_message_block (void)
02559 {
02560   if (this->partial_message_ == 0)
02561     {
02562       // This value must be at least large enough to hold a GIOP message
02563       // header plus a GIOP fragment header
02564       size_t const partial_message_size =
02565         this->messaging_object ()->header_length ();
02566        // + this->messaging_object ()->fragment_header_length ();
02567        // deprecated, conflicts with not-single_read_opt.
02568 
02569       ACE_NEW (this->partial_message_,
02570                ACE_Message_Block (partial_message_size));
02571     }
02572 }
02573 
02574 /*
02575  * Hook to add concrete implementations from the derived class onto
02576  * TAO's transport.
02577  */
02578 
02579 //@@ TAO_TRANSPORT_SPL_METHODS_ADD_HOOK
02580 
02581 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Sun Jan 27 13:07:37 2008 for TAO by doxygen 1.3.6