Transport.cpp

Go to the documentation of this file.
00001 // $Id: Transport.cpp 81319 2008-04-10 11:31:40Z johnnyw $
00002 
00003 #include "tao/Transport.h"
00004 
00005 #include "tao/LF_Follower.h"
00006 #include "tao/Leader_Follower.h"
00007 #include "tao/Client_Strategy_Factory.h"
00008 #include "tao/Wait_Strategy.h"
00009 #include "tao/Transport_Mux_Strategy.h"
00010 #include "tao/Stub.h"
00011 #include "tao/Transport_Queueing_Strategies.h"
00012 #include "tao/Connection_Handler.h"
00013 #include "tao/GIOP_Message_Base.h"
00014 #include "tao/Synch_Queued_Message.h"
00015 #include "tao/Asynch_Queued_Message.h"
00016 #include "tao/Flushing_Strategy.h"
00017 #include "tao/Thread_Lane_Resources.h"
00018 #include "tao/Resume_Handle.h"
00019 #include "tao/Codeset_Manager.h"
00020 #include "tao/Codeset_Translator_Base.h"
00021 #include "tao/debug.h"
00022 #include "tao/CDR.h"
00023 #include "tao/ORB_Core.h"
00024 #include "tao/MMAP_Allocator.h"
00025 #include "tao/SystemException.h"
00026 
00027 #include "ace/OS_NS_sys_time.h"
00028 #include "ace/OS_NS_stdio.h"
00029 #include "ace/Reactor.h"
00030 #include "ace/os_include/sys/os_uio.h"
00031 #include "ace/High_Res_Timer.h"
00032 #include "ace/Countdown_Time.h"
00033 #include "ace/CORBA_macros.h"
00034 
00035 /*
00036  * Specialization hook to add include files from
00037  * concrete transport implementation.
00038  */
00039 //@@ TAO_TRANSPORT_SPL_INCLUDE_FORWARD_DECL_ADD_HOOK
00040 
00041 #if !defined (__ACE_INLINE__)
00042 # include "tao/Transport.inl"
00043 #endif /* __ACE_INLINE__ */
00044 
00045 
00046 ACE_RCSID (tao,
00047            Transport,
00048            "$Id: Transport.cpp 81319 2008-04-10 11:31:40Z johnnyw $")
00049 
00050 /*
00051  * Static function in file scope
00052  */
00053 static void
00054 dump_iov (iovec *iov, int iovcnt, size_t id,
00055           size_t current_transfer,
00056           const char *location)
00057 {
00058   ACE_Guard <ACE_Log_Msg> log_guard (*ACE_Log_Msg::instance ());
00059 
00060   ACE_DEBUG ((LM_DEBUG,
00061               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
00062               ACE_TEXT ("sending %d buffers\n"),
00063               id, ACE_TEXT_CHAR_TO_TCHAR (location), iovcnt));
00064 
00065   for (int i = 0; i != iovcnt && 0 < current_transfer; ++i)
00066     {
00067       size_t iov_len = iov[i].iov_len;
00068 
00069       // Possibly a partially sent iovec entry.
00070       if (current_transfer < iov_len)
00071         {
00072           iov_len = current_transfer;
00073         }
00074 
00075       ACE_DEBUG ((LM_DEBUG,
00076                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
00077                   ACE_TEXT ("buffer %d/%d has %d bytes\n"),
00078                   id, ACE_TEXT_CHAR_TO_TCHAR(location),
00079                   i, iovcnt,
00080                   iov_len));
00081 
00082       size_t len;
00083 
00084       for (size_t offset = 0; offset < iov_len; offset += len)
00085         {
00086           ACE_TCHAR header[1024];
00087           ACE_OS::sprintf (header,
00088                            ACE_TEXT("TAO - ")
00089                            ACE_TEXT("Transport[")
00090                            ACE_SIZE_T_FORMAT_SPECIFIER
00091                            ACE_TEXT("]::%s")
00092                            ACE_TEXT(" (")
00093                            ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT("/")
00094                            ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT(")"),
00095                            id, location, offset, iov_len);
00096 
00097           len = iov_len - offset;
00098 
00099           if (len > 512)
00100             {
00101               len = 512;
00102             }
00103 
00104           ACE_HEX_DUMP ((LM_DEBUG,
00105                          static_cast<char*> (iov[i].iov_base) + offset,
00106                          len,
00107                          header));
00108         }
00109       current_transfer -= iov_len;
00110     }
00111 
00112   ACE_DEBUG ((LM_DEBUG,
00113               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
00114               ACE_TEXT ("end of data\n"),
00115               id, ACE_TEXT_CHAR_TO_TCHAR(location)));
00116 }
00117 
00118 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00119 
00120 #if TAO_HAS_TRANSPORT_CURRENT == 1
00121 TAO::Transport::Stats::~Stats ()
00122 {
00123   // no-op
00124 }
00125 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
00126 
00127 TAO_Transport::TAO_Transport (CORBA::ULong tag,
00128                               TAO_ORB_Core *orb_core,
00129                               size_t input_cdr_size)
00130   : tag_ (tag)
00131   , orb_core_ (orb_core)
00132   , cache_map_entry_ (0)
00133   , bidirectional_flag_ (-1)
00134   , opening_connection_role_ (TAO::TAO_UNSPECIFIED_ROLE)
00135   , head_ (0)
00136   , tail_ (0)
00137   , incoming_message_queue_ (orb_core)
00138   , current_deadline_ (ACE_Time_Value::zero)
00139   , flush_timer_id_ (-1)
00140   , transport_timer_ (this)
00141   , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ())
00142   , id_ ((size_t) this)
00143   , purging_order_ (0)
00144   , recv_buffer_size_ (0)
00145   , sent_byte_count_ (0)
00146   , is_connected_ (false)
00147   , messaging_object_ (0)
00148   , char_translator_ (0)
00149   , wchar_translator_ (0)
00150   , tcs_set_ (0)
00151   , first_request_ (1)
00152   , partial_message_ (0)
00153 #if TAO_HAS_SENDFILE == 1
00154     // The ORB has been configured to use the MMAP allocator, meaning
00155     // we could/should use sendfile() to send data.  Cast once rather
00156     // here rather than during each send.  This assumes that all
00157     // TAO_OutputCDR instances are using the same TAO_MMAP_Allocator
00158     // instance as the underlying output CDR buffer allocator.
00159   , mmap_allocator_ (
00160       dynamic_cast<TAO_MMAP_Allocator *> (
00161         orb_core->output_cdr_buffer_allocator ()))
00162 #endif  /* TAO_HAS_SENDFILE==1 */
00163 #if TAO_HAS_TRANSPORT_CURRENT == 1
00164   , stats_ (0)
00165 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
00166 {
00167   ACE_NEW (this->messaging_object_,
00168             TAO_GIOP_Message_Base (orb_core,
00169                                    this,
00170                                    input_cdr_size));
00171 
00172   TAO_Client_Strategy_Factory *cf =
00173     this->orb_core_->client_factory ();
00174 
00175   // Create WS now.
00176   this->ws_ = cf->create_wait_strategy (this);
00177 
00178   // Create TMS now.
00179   this->tms_ = cf->create_transport_mux_strategy (this);
00180 
00181 #if TAO_HAS_TRANSPORT_CURRENT == 1
00182   // Allocate stats
00183   ACE_NEW_THROW_EX (this->stats_,
00184                     TAO::Transport::Stats,
00185                     CORBA::NO_MEMORY ());
00186 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
00187 
00188   /*
00189    * Hook to add code that initializes components that
00190    * belong to the concrete protocol implementation.
00191    * Further additions to this Transport class will
00192    * need to add code *before* this hook.
00193    */
00194   //@@ TAO_TRANSPORT_SPL_CONSTRUCTOR_ADD_HOOK
00195 }
00196 
00197 TAO_Transport::~TAO_Transport (void)
00198 {
00199   delete this->messaging_object_;
00200 
00201   delete this->ws_;
00202 
00203   delete this->tms_;
00204 
00205   delete this->handler_lock_;
00206 
00207   if (!this->is_connected_)
00208     {
00209       // When we have a not connected transport we could have buffered
00210       // messages on this transport which we have to cleanup now.
00211       this->cleanup_queue_i();
00212 
00213       // Cleanup our cache entry
00214       this->purge_entry();
00215     }
00216 
00217   // Release the partial message block, however we may
00218   // have never allocated one.
00219   ACE_Message_Block::release (this->partial_message_);
00220 
00221   // By the time the destructor is reached here all the connection stuff
00222   // *must* have been cleaned up.
00223 
00224   // The following assert is needed for the test "Bug_2494_Regression".
00225   // See the bugzilla bug #2494 for details.
00226   ACE_ASSERT (this->head_ == 0);
00227   ACE_ASSERT (this->cache_map_entry_ == 0);
00228 
00229 #if TAO_HAS_TRANSPORT_CURRENT == 1
00230   delete this->stats_;
00231 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
00232 
00233   /*
00234    * Hook to add code that cleans up components
00235    * belong to the concrete protocol implementation.
00236    * Further additions to this Transport class will
00237    * need to add code *before* this hook.
00238    */
00239   //@@ TAO_TRANSPORT_SPL_DESTRUCTOR_ADD_HOOK
00240 }
00241 
00242 void
00243 TAO_Transport::provide_handler (TAO::Connection_Handler_Set &handlers)
00244 {
00245   (void) this->add_reference ();
00246 
00247   handlers.insert (this->connection_handler_i ());
00248 }
00249 
00250 bool
00251 TAO_Transport::provide_blockable_handler (TAO::Connection_Handler_Set &h)
00252 {
00253   if (this->ws_->non_blocking () ||
00254       this->opening_connection_role_ == TAO::TAO_SERVER_ROLE)
00255     return false;
00256 
00257   (void) this->add_reference ();
00258 
00259   h.insert (this->connection_handler_i ());
00260 
00261   return true;
00262 }
00263 
00264 bool
00265 TAO_Transport::idle_after_send (void)
00266 {
00267   return this->tms ()->idle_after_send ();
00268 }
00269 
00270 bool
00271 TAO_Transport::idle_after_reply (void)
00272 {
00273   return this->tms ()->idle_after_reply ();
00274 }
00275 
00276 /*
00277  * A concrete transport class specializes this
00278  * method. This hook allows commenting this function
00279  * when TAO's transport is specialized. Note: All
00280  * functions that have an implementation that does
00281  * nothing should be added within this hook to
00282  * enable specialization.
00283  */
00284 //@@ TAO_TRANSPORT_SPL_COMMENT_HOOK_START
00285 
00286 int
00287 TAO_Transport::tear_listen_point_list (TAO_InputCDR &)
00288 {
00289   ACE_NOTSUP_RETURN (-1);
00290 }
00291 
00292 int
00293 TAO_Transport::send_message_shared (TAO_Stub *stub,
00294                                     TAO_Message_Semantics message_semantics,
00295                                     const ACE_Message_Block *message_block,
00296                                     ACE_Time_Value *max_wait_time)
00297 {
00298   int result = 0;
00299 
00300   {
00301     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00302 
00303     result =
00304       this->send_message_shared_i (stub, message_semantics,
00305                                    message_block, max_wait_time);
00306   }
00307 
00308   if (result == -1)
00309     {
00310       // The connection needs to be closed here.
00311       // In the case of a partially written message this is the only way to cleanup
00312       //  the physical connection as well as the Transport. An EOF on the remote end
00313       //  will cancel the partially received message.
00314       this->close_connection ();
00315     }
00316 
00317   return result;
00318 }
00319 
00320 //@@ TAO_TRANSPORT_SPL_COMMENT_HOOK_END
00321 
00322 bool
00323 TAO_Transport::post_connect_hook (void)
00324 {
00325   return true;
00326 }
00327 
00328 void
00329 TAO_Transport::close_connection (void)
00330 {
00331   this->connection_handler_i ()->close_connection ();
00332 }
00333 
00334 int
00335 TAO_Transport::register_handler (void)
00336 {
00337   if (TAO_debug_level > 4)
00338     {
00339       ACE_DEBUG ((LM_DEBUG,
00340                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"),
00341                   this->id ()));
00342     }
00343 
00344   ACE_Reactor * const r = this->orb_core_->reactor ();
00345 
00346   // @@note: This should be okay since the register handler call will
00347   // not make a nested call into the transport.
00348   ACE_GUARD_RETURN (ACE_Lock,
00349                     ace_mon,
00350                     *this->handler_lock_,
00351                     false);
00352 
00353   if (r == this->event_handler_i ()->reactor ())
00354     {
00355       return 0;
00356     }
00357 
00358   // Set the flag in the Connection Handler and in the Wait Strategy
00359   // @@Maybe we should set these flags after registering with the
00360   // reactor. What if the  registration fails???
00361   this->ws_->is_registered (true);
00362 
00363   // Register the handler with the reactor
00364   return r->register_handler (this->event_handler_i (),
00365                               ACE_Event_Handler::READ_MASK);
00366 }
00367 
00368 #if TAO_HAS_SENDFILE == 1
00369 ssize_t
00370 TAO_Transport::sendfile (TAO_MMAP_Allocator * /* allocator */,
00371                          iovec * iov,
00372                          int iovcnt,
00373                          size_t &bytes_transferred,
00374                          ACE_Time_Value const * timeout)
00375 {
00376   // Concrete pluggable transport doesn't implement sendfile().
00377   // Fallback on TAO_Transport::send().
00378 
00379   // @@ We can probably refactor the TAO_IIOP_Transport::sendfile()
00380   //    implementation to this base class method, and leave any TCP
00381   //    specific configuration out of this base class method.
00382   //      -Ossama
00383   return this->send (iov, iovcnt, bytes_transferred, timeout);
00384 }
00385 #endif  /* TAO_HAS_SENDFILE==1 */
00386 
00387 int
00388 TAO_Transport::generate_locate_request (
00389     TAO_Target_Specification &spec,
00390     TAO_Operation_Details &opdetails,
00391     TAO_OutputCDR &output)
00392 {
00393   if (this->messaging_object ()->generate_locate_request_header (opdetails,
00394                                                                  spec,
00395                                                                  output) == -1)
00396     {
00397       if (TAO_debug_level > 0)
00398         {
00399           ACE_DEBUG ((LM_DEBUG,
00400                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ")
00401                       ACE_TEXT ("error while marshalling the LocateRequest header\n"),
00402                       this->id ()));
00403         }
00404 
00405       return -1;
00406     }
00407 
00408   return 0;
00409 }
00410 
00411 int
00412 TAO_Transport::generate_request_header (
00413     TAO_Operation_Details &opdetails,
00414     TAO_Target_Specification &spec,
00415     TAO_OutputCDR &output)
00416 {
00417   // codeset service context is only supposed to be sent in the first request
00418   // on a particular connection.
00419   if (this->first_request_)
00420     {
00421       TAO_Codeset_Manager * const csm = this->orb_core ()->codeset_manager ();
00422       if (csm)
00423         csm->generate_service_context (opdetails,*this);
00424     }
00425 
00426   if (this->messaging_object ()->generate_request_header (opdetails,
00427                                                           spec,
00428                                                           output) == -1)
00429     {
00430       if (TAO_debug_level > 0)
00431         {
00432         ACE_DEBUG ((LM_DEBUG,
00433                    ACE_TEXT ("(%P|%t) - Transport[%d]::generate_request_header, ")
00434                    ACE_TEXT ("error while marshalling the Request header\n"),
00435                       this->id()));
00436         }
00437 
00438       return -1;
00439     }
00440 
00441   return 0;
00442 }
00443 
00444 /// @todo Ideally the following should be inline.
00445 /// @todo purge_entry has a return value, use it
00446 int
00447 TAO_Transport::recache_transport (TAO_Transport_Descriptor_Interface *desc)
00448 {
00449   // First purge our entry
00450   this->purge_entry ();
00451 
00452   // Then add ourselves to the cache
00453   return this->transport_cache_manager ().cache_transport (desc, this);
00454 }
00455 
00456 int
00457 TAO_Transport::purge_entry (void)
00458 {
00459   return this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
00460 }
00461 
00462 int
00463 TAO_Transport::make_idle (void)
00464 {
00465   if (TAO_debug_level > 3)
00466     {
00467       ACE_DEBUG ((LM_DEBUG,
00468                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"),
00469                   this->id ()));
00470     }
00471 
00472   return this->transport_cache_manager ().make_idle (this->cache_map_entry_);
00473 }
00474 
00475 int
00476 TAO_Transport::update_transport (void)
00477 {
00478   return this->transport_cache_manager ().update_entry (this->cache_map_entry_);
00479 }
00480 
00481 /*
00482  *
00483  *  Methods called and used in the output path of the ORB.
00484  *
00485  */
00486 int
00487 TAO_Transport::handle_output (ACE_Time_Value *max_wait_time)
00488 {
00489   if (TAO_debug_level > 3)
00490     {
00491       ACE_DEBUG ((LM_DEBUG,
00492                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output\n"),
00493                   this->id ()));
00494     }
00495 
00496   // The flushing strategy (potentially via the Reactor) wants to send
00497   // more data, first check if there is a current message that needs
00498   // more sending...
00499   int const retval = this->drain_queue (max_wait_time);
00500 
00501   if (TAO_debug_level > 3)
00502     {
00503       ACE_DEBUG ((LM_DEBUG,
00504                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ")
00505                   ACE_TEXT ("drain_queue returns %d/%d\n"),
00506                   this->id (),
00507                   retval, errno));
00508     }
00509 
00510   // Any errors are returned directly to the Reactor
00511   return retval;
00512 }
00513 
00514 int
00515 TAO_Transport::format_queue_message (TAO_OutputCDR &stream,
00516                                      ACE_Time_Value *max_wait_time)
00517 {
00518   if (this->messaging_object ()->format_message (stream) != 0)
00519     return -1;
00520 
00521   return this->queue_message_i (stream.begin (), max_wait_time);
00522 }
00523 
00524 int
00525 TAO_Transport::send_message_block_chain (const ACE_Message_Block *mb,
00526                                          size_t &bytes_transferred,
00527                                          ACE_Time_Value *max_wait_time)
00528 {
00529   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00530 
00531   return this->send_message_block_chain_i (mb,
00532                                            bytes_transferred,
00533                                            max_wait_time);
00534 }
00535 
00536 int
00537 TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb,
00538                                            size_t &bytes_transferred,
00539                                            ACE_Time_Value *max_wait_time)
00540 {
00541   size_t const total_length = mb->total_length ();
00542 
00543   // We are going to block, so there is no need to clone
00544   // the message block.
00545   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00546 
00547   synch_message.push_back (this->head_, this->tail_);
00548 
00549   int const n = this->drain_queue_i (max_wait_time);
00550 
00551   if (n == -1)
00552     {
00553       synch_message.remove_from_list (this->head_, this->tail_);
00554       return -1; // Error while sending...
00555     }
00556   else if (n == 1)
00557     {
00558       bytes_transferred = total_length;
00559       return 1;  // Empty queue, message was sent..
00560     }
00561 
00562   // Remove the temporary message from the queue...
00563   synch_message.remove_from_list (this->head_, this->tail_);
00564 
00565   bytes_transferred = total_length - synch_message.message_length ();
00566 
00567   return 0;
00568 }
00569 
00570 int
00571 TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
00572                                            ACE_Time_Value *max_wait_time)
00573 {
00574   // We are going to block, so there is no need to clone
00575   // the message block.
00576   size_t const total_length = mb->total_length ();
00577   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00578 
00579   synch_message.push_back (this->head_, this->tail_);
00580 
00581   int const result = this->send_synch_message_helper_i (synch_message,
00582                                                         max_wait_time);
00583   if (result == -1 && errno == ETIME)
00584     {
00585       if (total_length == synch_message.message_length ()) //none was sent
00586         {
00587           if (TAO_debug_level > 2)
00588             {
00589               ACE_DEBUG ((LM_DEBUG,
00590                           ACE_TEXT ("TAO (%P|%t) - ")
00591                           ACE_TEXT ("Transport[%d]::send_synchronous_message_i, ")
00592                           ACE_TEXT ("timeout encountered before any bytes sent\n"),
00593                           this->id ()));
00594             }
00595           throw ::CORBA::TIMEOUT (
00596             CORBA::SystemException::_tao_minor_code (
00597               TAO_TIMEOUT_SEND_MINOR_CODE,
00598               ETIME),
00599             CORBA::COMPLETED_NO);
00600         }
00601       else
00602         {
00603           return -1;
00604         }
00605     }
00606   else if(result == -1 || result == 1)
00607     {
00608       return result;
00609     }
00610 
00611   TAO_Flushing_Strategy *flushing_strategy =
00612     this->orb_core ()->flushing_strategy ();
00613   if (flushing_strategy->schedule_output (this) == -1)
00614     {
00615       synch_message.remove_from_list (this->head_, this->tail_);
00616       if (TAO_debug_level > 0)
00617         {
00618           ACE_ERROR ((LM_ERROR,
00619                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
00620                       ACE_TEXT ("send_synchronous_message_i, ")
00621                       ACE_TEXT ("error while scheduling flush - %m\n"),
00622                       this->id ()));
00623         }
00624       return -1;
00625     }
00626 
00627   // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH,
00628   // because we're always going to flush anyway.
00629 
00630   // Release the mutex, other threads may modify the queue as we
00631   // block for a long time writing out data.
00632   int flush_result;
00633   {
00634     typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00635     TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00636     ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00637 
00638     flush_result = flushing_strategy->flush_message (this,
00639                                                      &synch_message,
00640                                                      max_wait_time);
00641   }
00642 
00643   if (flush_result == -1)
00644     {
00645       synch_message.remove_from_list (this->head_, this->tail_);
00646 
00647       // We don't need to do anything special for the timeout case.
00648       // The connection is going to get closed and the Transport destroyed.
00649       // The only thing to do maybe is to empty the queue.
00650 
00651       if (TAO_debug_level > 0)
00652         {
00653           ACE_ERROR ((LM_ERROR,
00654              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ")
00655              ACE_TEXT ("error while sending message - %m\n"),
00656              this->id ()));
00657         }
00658 
00659       return -1;
00660     }
00661 
00662   return 1;
00663 }
00664 
00665 
00666 int
00667 TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb,
00668                                      ACE_Time_Value *max_wait_time)
00669 {
00670   // Dont clone now.. We could be sent in one shot!
00671   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00672 
00673   synch_message.push_back (this->head_, this->tail_);
00674 
00675   int const n =
00676     this->send_synch_message_helper_i (synch_message, max_wait_time);
00677 
00678   // What about partially sent messages.
00679   if (n == -1 || n == 1)
00680     {
00681       return n;
00682     }
00683 
00684   if (TAO_debug_level > 3)
00685     {
00686       ACE_DEBUG ((LM_DEBUG,
00687          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ")
00688          ACE_TEXT ("preparing to add to queue before leaving\n"),
00689          this->id ()));
00690     }
00691 
00692   // Till this point we shouldn't have any copying and that is the
00693   // point anyway. Now, remove the node from the list
00694   synch_message.remove_from_list (this->head_, this->tail_);
00695 
00696   // Clone the node that we have.
00697   TAO_Queued_Message *msg =
00698     synch_message.clone (this->orb_core_->transport_message_buffer_allocator ());
00699 
00700   // Stick it in the queue
00701   msg->push_back (this->head_, this->tail_);
00702 
00703   TAO_Flushing_Strategy *flushing_strategy =
00704     this->orb_core ()->flushing_strategy ();
00705 
00706   int const result = flushing_strategy->schedule_output (this);
00707 
00708   if (result == -1)
00709     {
00710       if (TAO_debug_level > 5)
00711         {
00712           ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_"
00713                       "message_i, dequeuing msg due to schedule_output "
00714                       "failure\n", this->id ()));
00715         }
00716       msg->remove_from_list (this->head_, this->tail_);
00717       msg->destroy ();
00718     }
00719   else if (result == TAO_Flushing_Strategy::MUST_FLUSH)
00720     {
00721       typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00722       TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00723       ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00724       (void) flushing_strategy->flush_message (this, msg, 0);
00725     }
00726 
00727   return 1;
00728 }
00729 
00730 int
00731 TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_message,
00732                                             ACE_Time_Value * max_wait_time)
00733 {
00734   int const n = this->drain_queue_i (max_wait_time);
00735 
00736   if (n == -1)
00737     {
00738       synch_message.remove_from_list (this->head_, this->tail_);
00739       return -1; // Error while sending...
00740     }
00741   else if (n == 1)
00742     {
00743       return 1;  // Empty queue, message was sent..
00744     }
00745 
00746   if (synch_message.all_data_sent ())
00747     {
00748       return 1;
00749     }
00750 
00751   return 0;
00752 }
00753 
00754 bool
00755 TAO_Transport::queue_is_empty_i (void)
00756 {
00757   return (this->head_ == 0);
00758 }
00759 
00760 
00761 int
00762 TAO_Transport::schedule_output_i (void)
00763 {
00764   ACE_Event_Handler * const eh = this->event_handler_i ();
00765   ACE_Reactor * const reactor = eh->reactor ();
00766 
00767   if (reactor == 0)
00768      return -1;
00769 
00770   // Check to see if our event handler is still registered with the
00771   // reactor.  It's possible for another thread to have run close_connection()
00772   // since we last used the event handler.
00773   ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ());
00774   if (found)
00775     {
00776       found->remove_reference ();
00777 
00778       if (found != eh)
00779         {
00780           if (TAO_debug_level > 3)
00781             {
00782               ACE_DEBUG ((LM_DEBUG,
00783                           ACE_TEXT ("TAO (%P|%t) - ")
00784                           ACE_TEXT ("Transport[%d]::schedule_output_i ")
00785                           ACE_TEXT ("event handler not found in reactor,")
00786                           ACE_TEXT ("returning -1\n"),
00787                           this->id ()));
00788             }
00789 
00790           return -1;
00791         }
00792     }
00793 
00794   if (TAO_debug_level > 3)
00795     {
00796       ACE_DEBUG ((LM_DEBUG,
00797          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"),
00798          this->id ()));
00799     }
00800 
00801   return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00802 }
00803 
00804 int
00805 TAO_Transport::cancel_output_i (void)
00806 {
00807   ACE_Event_Handler * const eh = this->event_handler_i ();
00808   ACE_Reactor *const reactor = eh->reactor ();
00809 
00810   if (TAO_debug_level > 3)
00811     {
00812       ACE_DEBUG ((LM_DEBUG,
00813          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"),
00814          this->id ()));
00815     }
00816 
00817   return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00818 }
00819 
00820 int
00821 TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */,
00822                                const void *act)
00823 {
00824   if (TAO_debug_level > 6)
00825     {
00826       ACE_DEBUG ((LM_DEBUG,
00827          ACE_TEXT ("TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, ")
00828          ACE_TEXT ("timer expired\n"),
00829          this->id ()));
00830     }
00831 
00832   /// This is the only legal ACT in the current configuration....
00833   if (act != &this->current_deadline_)
00834     {
00835       return -1;
00836     }
00837 
00838   if (this->flush_timer_pending ())
00839     {
00840       // The timer is always a oneshot timer, so mark is as not
00841       // pending.
00842       this->reset_flush_timer ();
00843 
00844       TAO_Flushing_Strategy *flushing_strategy =
00845         this->orb_core ()->flushing_strategy ();
00846       int const result = flushing_strategy->schedule_output (this);
00847       if (result == TAO_Flushing_Strategy::MUST_FLUSH)
00848         {
00849           typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00850           TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00851           ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00852           if (flushing_strategy->flush_transport (this, 0) == -1) {
00853             return -1;
00854           }
00855         }
00856     }
00857 
00858   return 0;
00859 }
00860 
00861 int
00862 TAO_Transport::drain_queue (ACE_Time_Value *max_wait_time)
00863 {
00864   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00865   int const retval = this->drain_queue_i (max_wait_time);
00866 
00867   if (retval == 1)
00868     {
00869       // ... there is no current message or it was completely
00870       // sent, cancel output...
00871       TAO_Flushing_Strategy *flushing_strategy =
00872         this->orb_core ()->flushing_strategy ();
00873 
00874       flushing_strategy->cancel_output (this);
00875 
00876       return 0;
00877     }
00878 
00879   return retval;
00880 }
00881 
00882 int
00883 TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[], ACE_Time_Value *max_wait_time)
00884 {
00885   size_t byte_count = 0;
00886   ACE_Countdown_Time countdown (max_wait_time);
00887 
00888   // ... send the message ...
00889   ssize_t retval = -1;
00890 
00891 #if TAO_HAS_SENDFILE == 1
00892   if (this->mmap_allocator_)
00893     retval = this->sendfile (this->mmap_allocator_,
00894                              iov,
00895                              iovcnt,
00896                              byte_count);
00897   else
00898 #endif  /* TAO_HAS_SENDFILE==1 */
00899     retval = this->send (iov, iovcnt, byte_count, max_wait_time);
00900 
00901   if (TAO_debug_level == 5)
00902     {
00903       dump_iov (iov, iovcnt, this->id (),
00904                 byte_count, "drain_queue_helper");
00905     }
00906 
00907   // ... now we need to update the queue, removing elements
00908   // that have been sent, and updating the last element if it
00909   // was only partially sent ...
00910   this->cleanup_queue (byte_count);
00911   iovcnt = 0;
00912 
00913   if (retval == 0)
00914     {
00915       if (TAO_debug_level > 4)
00916         {
00917           ACE_DEBUG ((LM_DEBUG,
00918              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00919              ACE_TEXT ("send() returns 0\n"),
00920              this->id ()));
00921         }
00922       return -1;
00923     }
00924   else if (retval == -1)
00925     {
00926       if (TAO_debug_level > 4)
00927         {
00928           ACE_DEBUG ((LM_DEBUG,
00929              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00930              ACE_TEXT ("error during send (errno: %d) - %m\n"),
00931              this->id (), errno));
00932         }
00933 
00934       if (errno == EWOULDBLOCK || errno == EAGAIN)
00935         {
00936           return 0;
00937         }
00938 
00939       return -1;
00940     }
00941 
00942   // ... start over, how do we guarantee progress?  Because if
00943   // no bytes are sent send() can only return 0 or -1
00944 
00945   // Total no. of bytes sent for a send call
00946   this->sent_byte_count_ += byte_count;
00947 
00948   if (TAO_debug_level > 4)
00949     {
00950       ACE_DEBUG ((LM_DEBUG,
00951          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00952          ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"),
00953          this->id(), byte_count, (this->head_ == 0)));
00954     }
00955 
00956   return 1;
00957 }
00958 
00959 int
00960 TAO_Transport::drain_queue_i (ACE_Time_Value *max_wait_time)
00961 {
00962   // This is the vector used to send data, it must be declared outside
00963   // the loop because after the loop there may still be data to be
00964   // sent
00965   int iovcnt = 0;
00966 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00967   iovec iov[ACE_IOV_MAX] = { { 0 , 0 } };
00968 #else
00969   iovec iov[ACE_IOV_MAX];
00970 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
00971 
00972   // We loop over all the elements in the queue ...
00973   TAO_Queued_Message *i = this->head_;
00974 
00975   // Reset the value so that the counting is done for each new send
00976   // call.
00977   this->sent_byte_count_ = 0;
00978 
00979   // Avoid calling this expensive function each time through the loop. Instead
00980   // we'll assume that the time is unlikely to change much during the loop.
00981   // If we are forced to send in the loop then we'll recompute the time.
00982   ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr ();
00983 
00984   while (i != 0)
00985     {
00986       if (i->is_expired (now))
00987         {
00988           if (TAO_debug_level > 3)
00989           {
00990             ACE_DEBUG ((LM_DEBUG,
00991               ACE_TEXT ("TAO (%P|%t - Transport[%d]::drain_queue_i, ")
00992               ACE_TEXT ("Discarding expired queued message.\n"),
00993               this->id ()));
00994           }
00995           TAO_Queued_Message *next = i->next ();
00996           i->state_changed (TAO_LF_Event::LFS_TIMEOUT,
00997                             this->orb_core_->leader_follower ());
00998           i->remove_from_list (this->head_, this->tail_);
00999           i->destroy ();
01000           i = next;
01001           continue;
01002         }
01003       // ... each element fills the iovector ...
01004       i->fill_iov (ACE_IOV_MAX, iovcnt, iov);
01005 
01006       // ... the vector is full, no choice but to send some data out.
01007       // We need to loop because a single message can span multiple
01008       // IOV_MAX elements ...
01009       if (iovcnt == ACE_IOV_MAX)
01010         {
01011           int const retval = this->drain_queue_helper (iovcnt, iov,
01012                                                        max_wait_time);
01013 
01014           now = ACE_High_Res_Timer::gettimeofday_hr ();
01015 
01016           if (TAO_debug_level > 4)
01017             {
01018               ACE_DEBUG ((LM_DEBUG,
01019                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01020                  ACE_TEXT ("helper retval = %d\n"),
01021                  this->id (), retval));
01022             }
01023 
01024           if (retval != 1)
01025             {
01026               return retval;
01027             }
01028 
01029           i = this->head_;
01030           continue;
01031         }
01032       // ... notice that this line is only reached if there is still
01033       // room in the iovector ...
01034       i = i->next ();
01035     }
01036 
01037   if (iovcnt != 0)
01038     {
01039       int const retval = this->drain_queue_helper (iovcnt, iov, max_wait_time);
01040 
01041       if (TAO_debug_level > 4)
01042         {
01043           ACE_DEBUG ((LM_DEBUG,
01044               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01045               ACE_TEXT ("helper retval = %d\n"),
01046               this->id (), retval));
01047         }
01048 
01049       if (retval != 1)
01050         {
01051           return retval;
01052         }
01053     }
01054 
01055   if (this->head_ == 0)
01056     {
01057       if (this->flush_timer_pending ())
01058         {
01059           ACE_Event_Handler *eh = this->event_handler_i ();
01060           ACE_Reactor * const reactor = eh->reactor ();
01061           reactor->cancel_timer (this->flush_timer_id_);
01062           this->reset_flush_timer ();
01063         }
01064 
01065       return 1;
01066     }
01067 
01068   return 0;
01069 }
01070 
01071 void
01072 TAO_Transport::cleanup_queue_i ()
01073 {
01074   if (TAO_debug_level > 4)
01075     {
01076       ACE_DEBUG ((LM_DEBUG,
01077          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
01078          ACE_TEXT ("cleaning up complete queue\n"),
01079          this->id ()));
01080     }
01081 
01082   size_t byte_count = 0;
01083   int msg_count = 0;
01084 
01085   // Cleanup all messages
01086   while (this->head_ != 0)
01087     {
01088       TAO_Queued_Message *i = this->head_;
01089 
01090       if (TAO_debug_level > 4)
01091         {
01092           byte_count += i->message_length();
01093           ++msg_count;
01094         }
01095        // @@ This is a good point to insert a flag to indicate that a
01096        //    CloseConnection message was successfully received.
01097       i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
01098                         this->orb_core_->leader_follower ());
01099 
01100       i->remove_from_list (this->head_, this->tail_);
01101 
01102       i->destroy ();
01103     }
01104 
01105   if (TAO_debug_level > 4)
01106     {
01107       ACE_DEBUG ((LM_DEBUG,
01108                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
01109                   ACE_TEXT ("discarded %d messages, %u bytes.\n"),
01110                   this->id (), msg_count, byte_count));
01111     }
01112 }
01113 
01114 void
01115 TAO_Transport::cleanup_queue (size_t byte_count)
01116 {
01117   while (this->head_ != 0 && byte_count > 0)
01118     {
01119       TAO_Queued_Message *i = this->head_;
01120 
01121       if (TAO_debug_level > 4)
01122         {
01123           ACE_DEBUG ((LM_DEBUG,
01124              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01125              ACE_TEXT ("byte_count = %d\n"),
01126              this->id (), byte_count));
01127         }
01128 
01129       // Update the state of the first message
01130       i->bytes_transferred (byte_count);
01131 
01132       if (TAO_debug_level > 4)
01133         {
01134           ACE_DEBUG ((LM_DEBUG,
01135              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01136              ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"),
01137              this->id (), byte_count, i->all_data_sent (),
01138              i->message_length ()));
01139         }
01140 
01141       // ... if all the data was sent the message must be removed from
01142       // the queue...
01143       if (i->all_data_sent ())
01144         {
01145           i->remove_from_list (this->head_, this->tail_);
01146           i->destroy ();
01147         }
01148     }
01149 }
01150 
01151 int
01152 TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush)
01153 {
01154   // First let's compute the size of the queue:
01155   size_t msg_count = 0;
01156   size_t total_bytes = 0;
01157 
01158   for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
01159     {
01160       ++msg_count;
01161       total_bytes += i->message_length ();
01162     }
01163 
01164   bool set_timer = false;
01165   ACE_Time_Value new_deadline;
01166 
01167   TAO::Transport_Queueing_Strategy *queue_strategy =
01168     stub->transport_queueing_strategy ();
01169 
01170   bool constraints_reached = true;
01171 
01172   if (queue_strategy)
01173     {
01174       constraints_reached =
01175         queue_strategy->buffering_constraints_reached (stub,
01176                                                        msg_count,
01177                                                        total_bytes,
01178                                                        must_flush,
01179                                                        this->current_deadline_,
01180                                                        set_timer,
01181                                                        new_deadline);
01182     }
01183   else
01184     {
01185       must_flush = false;
01186     }
01187 
01188   // ... set the new timer, also cancel any previous timers ...
01189   if (set_timer)
01190     {
01191       ACE_Event_Handler *eh = this->event_handler_i ();
01192       ACE_Reactor * const reactor = eh->reactor ();
01193       this->current_deadline_ = new_deadline;
01194       ACE_Time_Value delay = new_deadline - ACE_OS::gettimeofday ();
01195 
01196       if (this->flush_timer_pending ())
01197         {
01198           reactor->cancel_timer (this->flush_timer_id_);
01199         }
01200 
01201       this->flush_timer_id_ =
01202         reactor->schedule_timer (&this->transport_timer_,
01203                                  &this->current_deadline_,
01204                                  delay);
01205     }
01206 
01207   return constraints_reached;
01208 }
01209 
01210 void
01211 TAO_Transport::report_invalid_event_handler (const char *caller)
01212 {
01213   if (TAO_debug_level > 0)
01214     {
01215       ACE_DEBUG ((LM_DEBUG,
01216          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler")
01217          ACE_TEXT ("(%s) no longer associated with handler [tag=%d]\n"),
01218          this->id (), ACE_TEXT_CHAR_TO_TCHAR (caller), this->tag_));
01219     }
01220 }
01221 
01222 void
01223 TAO_Transport::send_connection_closed_notifications (void)
01224 {
01225   {
01226     ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
01227 
01228     this->send_connection_closed_notifications_i ();
01229   }
01230 
01231   this->tms ()->connection_closed ();
01232 }
01233 
01234 void
01235 TAO_Transport::send_connection_closed_notifications_i (void)
01236 {
01237   this->cleanup_queue_i ();
01238 }
01239 
01240 int
01241 TAO_Transport::send_message_shared_i (TAO_Stub *stub,
01242                                       TAO_Message_Semantics message_semantics,
01243                                       const ACE_Message_Block *message_block,
01244                                       ACE_Time_Value *max_wait_time)
01245 {
01246   int ret = 0;
01247 
01248 #if TAO_HAS_TRANSPORT_CURRENT == 1
01249   size_t const message_length = message_block->length ();
01250 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
01251 
01252   switch (message_semantics)
01253     {
01254       case TAO_Transport::TAO_TWOWAY_REQUEST:
01255         ret = this->send_synchronous_message_i (message_block, max_wait_time);
01256         break;
01257 
01258       case TAO_Transport::TAO_REPLY:
01259         ret = this->send_reply_message_i (message_block, max_wait_time);
01260         break;
01261 
01262       case TAO_Transport::TAO_ONEWAY_REQUEST:
01263         ret = this->send_asynchronous_message_i (stub,
01264                                                  message_block,
01265                                                  max_wait_time);
01266         break;
01267     }
01268 
01269 #if TAO_HAS_TRANSPORT_CURRENT == 1
01270   // "Count" the message, only if no error was encountered.
01271   if (ret != -1 && this->stats_ != 0)
01272     this->stats_->messages_sent (message_length);
01273 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
01274 
01275   return ret;
01276 }
01277 
01278 int
01279 TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
01280                                             const ACE_Message_Block *message_block,
01281                                             ACE_Time_Value *max_wait_time)
01282 {
01283   // Let's figure out if the message should be queued without trying
01284   // to send first:
01285   bool try_sending_first = true;
01286 
01287   bool const queue_empty = (this->head_ == 0);
01288 
01289   TAO::Transport_Queueing_Strategy *queue_strategy =
01290     stub->transport_queueing_strategy ();
01291 
01292   if (!queue_empty)
01293     {
01294       try_sending_first = false;
01295     }
01296   else if (queue_strategy)
01297     {
01298       if (queue_strategy->must_queue (queue_empty))
01299         {
01300           try_sending_first = false;
01301         }
01302     }
01303 
01304   bool partially_sent = false;
01305   bool timeout_encountered = false;
01306 
01307   if (try_sending_first)
01308     {
01309       ssize_t n = 0;
01310       size_t byte_count = 0;
01311       // ... in this case we must try to send the message first ...
01312 
01313       size_t const total_length = message_block->total_length ();
01314 
01315       if (TAO_debug_level > 6)
01316         {
01317           ACE_DEBUG ((LM_DEBUG,
01318              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01319              ACE_TEXT ("trying to send the message (ml = %d)\n"),
01320              this->id (), total_length));
01321         }
01322 
01323       // @@ I don't think we want to hold the mutex here, however if
01324       // we release it we need to recheck the status of the transport
01325       // after we return... once I understand the final form for this
01326       // code I will re-visit this decision
01327       n = this->send_message_block_chain_i (message_block,
01328                                             byte_count,
01329                                             max_wait_time);
01330 
01331       if (n == -1)
01332         {
01333           // ... if this is just an EWOULDBLOCK we must schedule the
01334           // message for later, if it is ETIME we still have to send
01335           // the complete message, because cutting off the message at
01336           // this point will destroy the synchronization with the
01337           // server ...
01338           if (errno != EWOULDBLOCK && errno != ETIME)
01339             {
01340               if (TAO_debug_level > 0)
01341                 {
01342                   ACE_ERROR ((LM_ERROR,
01343                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01344                      ACE_TEXT ("fatal error in ")
01345                      ACE_TEXT ("send_message_block_chain_i - %m\n"),
01346                      this->id ()));
01347                 }
01348               return -1;
01349             }
01350         }
01351 
01352       // ... let's figure out if the complete message was sent ...
01353       if (total_length == byte_count)
01354         {
01355           // Done, just return.  Notice that there are no allocations
01356           // or copies up to this point (though some fancy calling
01357           // back and forth).
01358           // This is the common case for the critical path, it should
01359           // be fast.
01360           return 0;
01361         }
01362 
01363       if (byte_count > 0)
01364       {
01365         partially_sent = true;
01366       }
01367 
01368       // If it was partially sent, then push to front of queue and don't flush
01369       if (errno == ETIME)
01370       {
01371         timeout_encountered = true;
01372         if (byte_count == 0)
01373         {
01374           //This request has timed out and none of it was sent to the transport
01375           //We can't return -1 here, since that would end up closing the tranpsort
01376           if (TAO_debug_level > 2)
01377             {
01378               ACE_DEBUG ((LM_DEBUG,
01379                           ACE_TEXT ("TAO (%P|%t) - ")
01380                           ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ")
01381                           ACE_TEXT ("timeout encountered before any bytes sent\n"),
01382                           this->id ()));
01383             }
01384           throw ::CORBA::TIMEOUT (
01385             CORBA::SystemException::_tao_minor_code (
01386               TAO_TIMEOUT_SEND_MINOR_CODE,
01387               ETIME),
01388             CORBA::COMPLETED_NO);
01389         }
01390       }
01391 
01392       if (TAO_debug_level > 6)
01393         {
01394           ACE_DEBUG ((LM_DEBUG,
01395              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01396              ACE_TEXT ("partial send %d / %d bytes\n"),
01397              this->id (), byte_count, total_length));
01398         }
01399 
01400       // ... part of the data was sent, need to figure out what piece
01401       // of the message block chain must be queued ...
01402       while (message_block != 0 && message_block->length () == 0)
01403         {
01404           message_block = message_block->cont ();
01405         }
01406 
01407       // ... at least some portion of the message block chain should
01408       // remain ...
01409     }
01410 
01411   // ... either the message must be queued or we need to queue it
01412   // because it was not completely sent out ...
01413 
01414   ACE_Time_Value *wait_time = (partially_sent ? 0: max_wait_time);
01415   if (this->queue_message_i (message_block, wait_time, !partially_sent)
01416       == -1)
01417     {
01418       if (TAO_debug_level > 0)
01419         {
01420           ACE_DEBUG ((LM_DEBUG,
01421                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01422                       ACE_TEXT ("send_asynchronous_message_i, ")
01423                       ACE_TEXT ("cannot queue message for  - %m\n"),
01424                       this->id ()));
01425         }
01426       return -1;
01427     }
01428 
01429   if (TAO_debug_level > 6)
01430     {
01431       ACE_DEBUG ((LM_DEBUG,
01432          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01433          ACE_TEXT ("message is queued\n"),
01434          this->id ()));
01435     }
01436 
01437   if (timeout_encountered && partially_sent)
01438     {
01439       //Must close down the transport here since we can't guarantee the
01440       //integrity of the GIOP stream (the next send may try to write to
01441       //the socket before looking at the queue).
01442       if (TAO_debug_level > 0)
01443         {
01444           ACE_DEBUG ((LM_DEBUG,
01445                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01446                       ACE_TEXT ("send_asynchronous_message_i, ")
01447                       ACE_TEXT ("timeout after partial send, closing.\n"),
01448                       this->id ()));
01449         }
01450       return -1;
01451     }
01452   else if (!timeout_encountered)
01453     {
01454       // We can't flush if we have already encountered a timeout
01455       // ... if the queue is full we need to activate the output on the
01456       // queue ...
01457       bool must_flush = false;
01458       const bool constraints_reached =
01459         this->check_buffering_constraints_i (stub,
01460                                              must_flush);
01461 
01462       // ... but we also want to activate it if the message was partially
01463       // sent.... Plus, when we use the blocking flushing strategy the
01464       // queue is flushed as a side-effect of 'schedule_output()'
01465 
01466       TAO_Flushing_Strategy *flushing_strategy =
01467         this->orb_core ()->flushing_strategy ();
01468 
01469       if (constraints_reached || try_sending_first)
01470         {
01471           int const result = flushing_strategy->schedule_output (this);
01472           if (result == TAO_Flushing_Strategy::MUST_FLUSH)
01473             {
01474               must_flush = true;
01475             }
01476         }
01477 
01478       if (must_flush)
01479         {
01480           if (TAO_debug_level > 0)
01481             {
01482               ACE_DEBUG ((LM_DEBUG,
01483                           ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01484                           ACE_TEXT ("send_asynchronous_message_i, ")
01485                           ACE_TEXT ("flushing transport.\n"),
01486                           this->id ()));
01487             }
01488 
01489           size_t sent_byte = sent_byte_count_;
01490           int ret = 0;
01491           {
01492             typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
01493             TAO_REVERSE_LOCK reverse (*this->handler_lock_);
01494             ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
01495             ret = flushing_strategy->flush_transport (this, max_wait_time);
01496           }
01497 
01498           if (ret == -1)
01499             {
01500               if (errno == ETIME)
01501                 {
01502                   if (sent_byte == sent_byte_count_) // if nothing was actually flushed
01503                     {
01504                       //This request has timed out and none of it was sent to the transport
01505                       //We can't return -1 here, since that would end up closing the tranpsort
01506                       if (TAO_debug_level > 2)
01507                         {
01508                           ACE_DEBUG ((LM_DEBUG,
01509                                       ACE_TEXT ("TAO (%P|%t) - ")
01510                                       ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ")
01511                                       ACE_TEXT ("2 timeout encountered before any bytes sent\n"),
01512                                       this->id ()));
01513                         }
01514                       throw ::CORBA::TIMEOUT (CORBA::SystemException::_tao_minor_code
01515                                               (TAO_TIMEOUT_SEND_MINOR_CODE, ETIME),
01516                                               CORBA::COMPLETED_NO);
01517                     }
01518                 }
01519               return -1;
01520             }
01521         }
01522     }
01523   return 0;
01524 }
01525 
01526 int
01527 TAO_Transport::queue_message_i (const ACE_Message_Block *message_block,
01528                                 ACE_Time_Value *max_wait_time, bool back)
01529 {
01530   TAO_Queued_Message *queued_message = 0;
01531   ACE_NEW_RETURN (queued_message,
01532                   TAO_Asynch_Queued_Message (message_block,
01533                                              this->orb_core_,
01534                                              max_wait_time,
01535                                              0,
01536                                              true),
01537                   -1);
01538   if (back) {
01539     queued_message->push_back (this->head_, this->tail_);
01540   }
01541   else {
01542     queued_message->push_front (this->head_, this->tail_);
01543   }
01544 
01545   return 0;
01546 }
01547 
01548 /*
01549  *
01550  * All the methods relevant to the incoming data path of the ORB are
01551  * defined below
01552  *
01553  */
01554 int
01555 TAO_Transport::handle_input (TAO_Resume_Handle &rh,
01556                              ACE_Time_Value * max_wait_time)
01557 {
01558   if (TAO_debug_level > 3)
01559     {
01560       ACE_DEBUG ((LM_DEBUG,
01561          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"),
01562          this->id ()));
01563     }
01564 
01565   // First try to process messages of the head of the incoming queue.
01566   int const retval = this->process_queue_head (rh);
01567 
01568   if (retval <= 0)
01569     {
01570       if (retval == -1)
01571         {
01572           if (TAO_debug_level > 2)
01573             {
01574               ACE_DEBUG ((LM_DEBUG,
01575                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01576                  ACE_TEXT ("error while parsing the head of the queue\n"),
01577                  this->id()));
01578 
01579             }
01580           return -1;
01581         }
01582       else
01583         {
01584           // retval == 0
01585 
01586           // Processed a message in queue successfully. This
01587           // thread must return to thread-pool now.
01588           return 0;
01589         }
01590     }
01591 
01592   TAO_Queued_Data *q_data = 0;
01593 
01594   if (this->incoming_message_stack_.top (q_data) != -1
01595       && q_data->missing_data () != TAO_MISSING_DATA_UNDEFINED)
01596     {
01597       /* PRE: q_data->missing_data_ > 0 as all QD on stack must be incomplete  */
01598       if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1)
01599         {
01600           if (TAO_debug_level > 0)
01601             {
01602               ACE_ERROR ((LM_ERROR,
01603                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01604                  ACE_TEXT ("error consolidating incoming message\n"),
01605                  this->id ()));
01606             }
01607           return -1;
01608         }
01609     }
01610   else
01611     {
01612       if (this->handle_input_parse_data (rh, max_wait_time) == -1)
01613         {
01614           if (TAO_debug_level > 0)
01615             {
01616               ACE_ERROR ((LM_ERROR,
01617                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01618                  ACE_TEXT ("error parsing incoming message\n"),
01619                  this->id ()));
01620             }
01621           return -1;
01622         }
01623     }
01624 
01625   return 0;
01626 }
01627 
01628 int
01629 TAO_Transport::consolidate_process_message (TAO_Queued_Data *q_data,
01630                                             TAO_Resume_Handle &rh)
01631 {
01632   // paranoid check
01633   if (q_data->missing_data () != 0)
01634     {
01635       if (TAO_debug_level > 0)
01636         {
01637            ACE_ERROR ((LM_ERROR,
01638               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01639               ACE_TEXT ("missing data\n"),
01640               this->id ()));
01641         }
01642        return -1;
01643     }
01644 
01645   if (q_data->more_fragments () ||
01646       q_data->msg_type () == GIOP::Fragment)
01647     {
01648       // consolidate message on top of stack, only for fragmented messages
01649       TAO_Queued_Data *new_q_data = 0;
01650 
01651       switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01652         {
01653         case -1: // error
01654           return -1;
01655 
01656         case 0:  // returning consolidated message in q_data
01657           if (!new_q_data)
01658             {
01659               if (TAO_debug_level > 0)
01660                 {
01661                   ACE_ERROR ((LM_ERROR,
01662                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01663                      ACE_TEXT ("error, consolidated message is NULL\n"),
01664                      this->id ()));
01665                 }
01666               return -1;
01667             }
01668 
01669 
01670           if (this->process_parsed_messages (new_q_data, rh) == -1)
01671             {
01672               TAO_Queued_Data::release (new_q_data);
01673 
01674               if (TAO_debug_level > 0)
01675                 {
01676                   ACE_ERROR ((LM_ERROR,
01677                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01678                      ACE_TEXT ("error processing consolidated message\n"),
01679                      this->id ()));
01680                 }
01681               return -1;
01682             }
01683 
01684           TAO_Queued_Data::release (new_q_data);
01685 
01686           break;
01687 
01688         case 1:  // fragment has been stored in messaging_oject()
01689           break;
01690         }
01691     }
01692   else
01693     {
01694       if (this->process_parsed_messages (q_data, rh) == -1)
01695         {
01696           TAO_Queued_Data::release (q_data);
01697 
01698           if (TAO_debug_level > 0)
01699             {
01700               ACE_ERROR ((LM_ERROR,
01701                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01702                  ACE_TEXT ("error processing message\n"),
01703                  this->id ()));
01704             }
01705           return -1;
01706         }
01707 
01708       TAO_Queued_Data::release (q_data);
01709 
01710     }
01711 
01712   return 0;
01713 }
01714 
01715 int
01716 TAO_Transport::consolidate_enqueue_message (TAO_Queued_Data *q_data)
01717 {
01718   // consolidate message on top of stack, only for fragmented messages
01719 
01720   // paranoid check
01721   if (q_data->missing_data () != 0)
01722     {
01723        return -1;
01724     }
01725 
01726   if (q_data->more_fragments () ||
01727       q_data->msg_type () == GIOP::Fragment)
01728     {
01729       TAO_Queued_Data *new_q_data = 0;
01730 
01731       switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01732         {
01733         case -1: // error
01734           return -1;
01735 
01736         case 0:  // returning consolidated message in new_q_data
01737           if (!new_q_data)
01738             {
01739               if (TAO_debug_level > 0)
01740                 {
01741                   ACE_ERROR ((LM_ERROR,
01742                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ")
01743                      ACE_TEXT ("error, consolidated message is NULL\n"),
01744                      this->id ()));
01745                 }
01746               return -1;
01747             }
01748 
01749           if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0)
01750             {
01751               TAO_Queued_Data::release (new_q_data);
01752               return -1;
01753             }
01754           break;
01755 
01756         case 1:  // fragment has been stored in messaging_oject()
01757           break;
01758         }
01759     }
01760   else
01761     {
01762       if (this->incoming_message_queue_.enqueue_tail (q_data) != 0)
01763         {
01764           TAO_Queued_Data::release (q_data);
01765           return -1;
01766         }
01767     }
01768 
01769   return 0; // success
01770 }
01771 
01772 int
01773 TAO_Transport::handle_input_missing_data (TAO_Resume_Handle &rh,
01774                                           ACE_Time_Value * max_wait_time,
01775                                           TAO_Queued_Data *q_data)
01776 {
01777   // paranoid check
01778   if (q_data == 0)
01779     {
01780       return -1;
01781     }
01782 
01783   if (TAO_debug_level > 3)
01784     {
01785       ACE_DEBUG ((LM_DEBUG,
01786          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01787          ACE_TEXT ("enter (missing data == %d)\n"),
01788          this->id (), q_data->missing_data ()));
01789     }
01790 
01791   size_t const recv_size = q_data->missing_data ();
01792 
01793   if (q_data->msg_block ()->space() < recv_size)
01794     {
01795       // make sure the message_block has enough space
01796       size_t const message_size = recv_size + q_data->msg_block ()->length();
01797 
01798       if (ACE_CDR::grow (q_data->msg_block (), message_size) == -1)
01799         {
01800           return -1;
01801         }
01802     }
01803 
01804   // Saving the size of the received buffer in case any one needs to
01805   // get the size of the message thats received in the
01806   // context. Obviously the value will be changed for each recv call
01807   // and the user is supposed to invoke the accessor only in the
01808   // invocation context to get meaningful information.
01809   this->recv_buffer_size_ = recv_size;
01810 
01811   // Read the message into the existing message block on heap
01812   ssize_t const n = this->recv (q_data->msg_block ()->wr_ptr(),
01813                                 recv_size,
01814                                 max_wait_time);
01815 
01816   if (n <= 0)
01817     {
01818       return n;
01819     }
01820 
01821   if (TAO_debug_level > 3)
01822     {
01823       ACE_DEBUG ((LM_DEBUG,
01824          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01825          ACE_TEXT ("read bytes %d\n"),
01826          this->id (), n));
01827     }
01828 
01829   q_data->msg_block ()->wr_ptr(n);
01830   q_data->missing_data (q_data->missing_data () - n);
01831 
01832   if (q_data->missing_data () == 0)
01833     {
01834       // paranoid check
01835       if (this->incoming_message_stack_.pop (q_data) == -1)
01836         {
01837           return -1;
01838         }
01839 
01840       if (this->consolidate_process_message (q_data, rh) == -1)
01841         {
01842           return -1;
01843         }
01844     }
01845 
01846   return 0;
01847 }
01848 
01849 
01850 int
01851 TAO_Transport::handle_input_parse_extra_messages (ACE_Message_Block &message_block)
01852 {
01853 
01854   // store buffer status of last extraction: -1 parse error, 0
01855   // incomplete message header in buffer, 1 complete messages header
01856   // parsed
01857   int buf_status = 0;
01858 
01859   TAO_Queued_Data *q_data = 0;     // init
01860 
01861   // parse buffer until all messages have been extracted, consolidate
01862   // and enqueue complete messages, if the last message being parsed
01863   // has missin data, it is stays on top of incoming_message_stack.
01864   while (message_block.length () > 0 &&
01865          (buf_status = this->messaging_object ()->extract_next_message
01866           (message_block, q_data)) != -1 &&
01867          q_data != 0) // paranoid check
01868     {
01869       if (q_data->missing_data () == 0)
01870         {
01871           if (this->consolidate_enqueue_message (q_data) == -1)
01872             {
01873               return -1;
01874             }
01875         }
01876       else  // incomplete message read, probably the last message in buffer
01877         {
01878           // can not fail
01879           this->incoming_message_stack_.push (q_data);
01880         }
01881 
01882       q_data = 0; // reset
01883     } // while
01884 
01885   if (buf_status == -1)
01886     {
01887       return -1;
01888     }
01889 
01890   return 0;
01891 }
01892 
01893 int
01894 TAO_Transport::handle_input_parse_data  (TAO_Resume_Handle &rh,
01895                                          ACE_Time_Value * max_wait_time)
01896 {
01897 
01898   if (TAO_debug_level > 3)
01899     {
01900       ACE_DEBUG ((LM_DEBUG,
01901          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01902          ACE_TEXT ("enter\n"),
01903          this->id ()));
01904     }
01905 
01906 
01907   // The buffer on the stack which will be used to hold the input
01908   // messages, ACE_CDR::MAX_ALIGNMENT compensates the
01909   // memory-alignment. This improves performance with SUN-Java-ORB-1.4
01910   // and higher that sends fragmented requests of size 1024 bytes.
01911   char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT];
01912 
01913 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
01914   (void) ACE_OS::memset (buf,
01915                          '\0',
01916                          sizeof buf);
01917 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
01918 
01919   // Create a data block
01920   ACE_Data_Block db (sizeof (buf),
01921                      ACE_Message_Block::MB_DATA,
01922                      buf,
01923                      this->orb_core_->input_cdr_buffer_allocator (),
01924                      this->orb_core_->locking_strategy (),
01925                      ACE_Message_Block::DONT_DELETE,
01926                      this->orb_core_->input_cdr_dblock_allocator ());
01927 
01928   // Create a message block
01929   ACE_Message_Block message_block (&db,
01930                                    ACE_Message_Block::DONT_DELETE,
01931                                    this->orb_core_->input_cdr_msgblock_allocator ());
01932 
01933 
01934   // Align the message block
01935   ACE_CDR::mb_align (&message_block);
01936 
01937   size_t recv_size = 0; // Note: unsigned integer
01938 
01939   // Pointer to newly parsed message
01940   TAO_Queued_Data *q_data = 0;
01941 
01942   // optimizing access of constants
01943   size_t const header_length = this->messaging_object ()->header_length ();
01944 
01945   // paranoid check
01946   if (header_length > message_block.space ())
01947     {
01948       return -1;
01949     }
01950 
01951   if (this->orb_core_->orb_params ()->single_read_optimization ())
01952     {
01953       recv_size = message_block.space ();
01954     }
01955   else
01956     {
01957       // Single read optimization has been de-activated. That means
01958       // that we need to read from transport the GIOP header first
01959       // before the payload. This codes first checks the incoming
01960       // stack for partial messages which needs to be
01961       // consolidated. Otherwise we are in new cycle, reading complete
01962       // GIOP header of new incoming message.
01963       if (this->incoming_message_stack_.top (q_data) != -1
01964            && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
01965         {
01966           // There is a partial message on incoming_message_stack_
01967           // whose length is unknown so far. We need to consolidate
01968           // the GIOP header to get to know the payload size,
01969           recv_size = header_length - q_data->msg_block ()->length ();
01970         }
01971       else
01972         {
01973           // Read amount of data forming GIOP header of new incoming
01974           // message.
01975           recv_size = header_length;
01976         }
01977       // POST: 0 <= recv_size <= header_length
01978     }
01979   // POST: 0 <= recv_size <= message_block->space ()
01980 
01981   // If we have a partial message, copy it into our message block and
01982   // clear out the partial message.
01983   if (this->partial_message_ != 0 && this->partial_message_->length () > 0)
01984     {
01985       // (*) Copy back the partial message into current read-buffer,
01986       // verify that the read-strategy of "recv_size" bytes is not
01987       // exceeded. The latter check guarantees that recv_size does not
01988       // roll-over and keeps in range
01989       // 0<=recv_size<=message_block->space()
01990       if (this->partial_message_->length () <= recv_size &&
01991           message_block.copy (this->partial_message_->rd_ptr (),
01992                               this->partial_message_->length ()) == 0)
01993         {
01994 
01995           recv_size -= this->partial_message_->length ();
01996           this->partial_message_->reset ();
01997         }
01998       else
01999         {
02000           return -1;
02001         }
02002     }
02003   // POST: 0 <= recv_size <= buffer_space
02004 
02005   if (0 >= recv_size) // paranoid: the check above (*) guarantees recv_size>=0
02006     {
02007       // This event would cause endless looping, trying frequently to
02008       // read zero bytes from stream.  This might happen, if TAOs
02009       // protocol implementation is not correct and tries to read data
02010       // beyond header without "single_read_optimazation" being
02011       // activated.
02012       if (TAO_debug_level > 0)
02013         {
02014           ACE_ERROR ((LM_ERROR,
02015              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02016              ACE_TEXT ("Error - endless loop detection, closing connection"),
02017              this->id ()));
02018         }
02019       return -1;
02020     }
02021 
02022   // Saving the size of the received buffer in case any one needs to
02023   // get the size of the message thats received in the
02024   // context. Obviously the value will be changed for each recv call
02025   // and the user is supposed to invoke the accessor only in the
02026   // invocation context to get meaningful information.
02027   this->recv_buffer_size_ = recv_size;
02028 
02029   // Read the message into the  message block that we have created on
02030   // the stack.
02031   ssize_t const n = this->recv (message_block.wr_ptr (),
02032                                 recv_size,
02033                                 max_wait_time);
02034 
02035   // If there is an error return to the reactor..
02036   if (n <= 0)
02037     {
02038       return n;
02039     }
02040 
02041   if (TAO_debug_level > 3)
02042     {
02043       ACE_DEBUG ((LM_DEBUG,
02044          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02045          ACE_TEXT ("read %d bytes\n"),
02046          this->id (), n));
02047     }
02048 
02049   // Set the write pointer in the stack buffer
02050   message_block.wr_ptr (n);
02051 
02052   //
02053   // STACK PROCESSING OR MESSAGE CONSOLIDATION
02054   //
02055 
02056   // PRE: data in buffer is aligned && message_block.length() > 0
02057 
02058   if (this->incoming_message_stack_.top (q_data) != -1
02059       && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
02060     {
02061       //
02062       // MESSAGE CONSOLIDATION
02063       //
02064 
02065       // Partial message on incoming_message_stack_ needs to be
02066       // consolidated.  The message header could not be parsed so far
02067       // and therefor the message size is unknown yet. Consolidating
02068       // the message destroys the memory alignment of succeeding
02069       // messages sharing the buffer, for that reason consolidation
02070       // and stack based processing are mutial exclusive.
02071       if (this->messaging_object ()->consolidate_node (q_data,
02072                                                        message_block) == -1)
02073         {
02074            if (TAO_debug_level > 0)
02075             {
02076                 ACE_ERROR ((LM_ERROR,
02077                    ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02078                    ACE_TEXT ("error consolidating message from input buffer\n"),
02079                    this->id () ));
02080              }
02081            return -1;
02082         }
02083 
02084       // Complete message are to be enqueued and later processed
02085       if (q_data->missing_data () == 0)
02086         {
02087           if (this->incoming_message_stack_.pop (q_data) == -1)
02088             {
02089               return -1;
02090             }
02091 
02092           if (this->consolidate_enqueue_message (q_data) == -1)
02093             {
02094               return -1;
02095             }
02096         }
02097 
02098       if (message_block.length () > 0
02099           && this->handle_input_parse_extra_messages (message_block) == -1)
02100         {
02101           return -1;
02102         }
02103 
02104       // In any case try to process the enqueued messages
02105       if (this->process_queue_head (rh) == -1)
02106         {
02107           return -1;
02108         }
02109     }
02110   else
02111     {
02112       //
02113       // STACK PROCESSING (critical path)
02114       //
02115 
02116       // Process the first message in buffer on stack
02117 
02118       // (PRE: first message resides in aligned memory) Make a node of
02119       // the message-block..
02120 
02121       TAO_Queued_Data qd (&message_block,
02122                           this->orb_core_->transport_message_buffer_allocator ());
02123 
02124       size_t mesg_length  = 0;
02125 
02126       if (this->messaging_object ()->parse_next_message (qd,
02127                                                          mesg_length) == -1
02128           || (qd.missing_data () == 0
02129               && mesg_length > message_block.length ()) )
02130         {
02131           // extracting message failed
02132           return -1;
02133         }
02134       // POST: qd.missing_data_ == 0 --> mesg_length <= message_block.length()
02135       // This prevents seeking rd_ptr behind the wr_ptr
02136 
02137       if (qd.missing_data () != 0 ||
02138           qd.more_fragments ()   ||
02139           qd.msg_type () == GIOP::Fragment)
02140         {
02141           if (qd.missing_data () == 0)
02142             {
02143               // Dealing with a fragment
02144               TAO_Queued_Data *nqd =
02145                 TAO_Queued_Data::duplicate (qd);
02146 
02147               if (nqd == 0)
02148                 {
02149                   return -1;
02150                 }
02151 
02152               // mark the end of message in new buffer
02153               char* end_mark = nqd->msg_block ()->rd_ptr ()
02154                              + mesg_length;
02155               nqd->msg_block ()->wr_ptr (end_mark);
02156 
02157               // move the read pointer forward in old buffer
02158               message_block.rd_ptr (mesg_length);
02159 
02160               // enqueue the message
02161               if (this->consolidate_enqueue_message (nqd) == -1)
02162                 {
02163                   return -1;
02164                 }
02165 
02166               if (message_block.length () > 0
02167                   && this->handle_input_parse_extra_messages (message_block) == -1)
02168                 {
02169                   return -1;
02170                 }
02171 
02172               // In any case try to process the enqueued messages
02173               if (this->process_queue_head (rh) == -1)
02174                 {
02175                   return -1;
02176                 }
02177             }
02178           else if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED)
02179             {
02180               // Incomplete message, must be the last one in buffer
02181 
02182               if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED &&
02183                   qd.missing_data () > message_block.space ())
02184                 {
02185                   // Re-Allocate correct size on heap
02186                   if (ACE_CDR::grow (qd.msg_block (),
02187                                      message_block.length ()
02188                                      + qd.missing_data ()) == -1)
02189                     {
02190                       return -1;
02191                     }
02192                 }
02193 
02194               TAO_Queued_Data *nqd =
02195                 TAO_Queued_Data::duplicate (qd);
02196 
02197               if (nqd == 0)
02198                 {
02199                   return -1;
02200                 }
02201 
02202               // move read-pointer to end of buffer
02203               message_block.rd_ptr (message_block.length());
02204 
02205               this->incoming_message_stack_.push (nqd);
02206             }
02207         }
02208       else
02209         {
02210           //
02211           // critical path
02212           //
02213 
02214           // We cant process the message on stack right now. First we
02215           // have got to parse extra messages from message_block,
02216           // putting them into queue.  When this is done we can return
02217           // to process this message, and notifying other threads to
02218           // process the messages in queue.
02219 
02220           char * end_marker = message_block.rd_ptr ()
02221                             + mesg_length;
02222 
02223           if (message_block.length () > mesg_length)
02224             {
02225               // There are more message in data stream to be parsed.
02226               // Safe the rd_ptr to restore later.
02227               char *rd_ptr_stack_mesg = message_block.rd_ptr ();
02228 
02229               // Skip parsed message, jump to next message in buffer
02230               // PRE: mesg_length <= message_block.length ()
02231               message_block.rd_ptr (mesg_length);
02232 
02233               // Extract remaining messages and enqueue them for later
02234               // heap processing
02235               if (this->handle_input_parse_extra_messages (message_block) == -1)
02236                 {
02237                   return -1;
02238                 }
02239 
02240               // correct the end_marker
02241               end_marker = message_block.rd_ptr ();
02242 
02243               // Restore rd_ptr
02244               message_block.rd_ptr (rd_ptr_stack_mesg);
02245             }
02246 
02247           // The following if-else has been copied from
02248           // process_queue_head().  While process_queue_head()
02249           // processes message on heap, here we will process a message
02250           // on stack.
02251 
02252           // Now that we have one message on stack to be processed,
02253           // check whether we have one more message in the queue...
02254           if (this->incoming_message_queue_.queue_length () > 0)
02255             {
02256               if (TAO_debug_level > 0)
02257                 {
02258                   ACE_DEBUG ((LM_DEBUG,
02259                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02260                      ACE_TEXT ("notify reactor\n"),
02261                      this->id ()));
02262 
02263                 }
02264 
02265               const int retval = this->notify_reactor ();
02266 
02267               if (retval == 1)
02268                 {
02269                   // Let the class know that it doesn't need to resume  the
02270                   // handle..
02271                   rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02272                 }
02273               else if (retval < 0)
02274                 return -1;
02275             }
02276           else
02277             {
02278               // As there are no further messages in queue just resume
02279               // the handle. Set the flag incase someone had reset the flag..
02280               rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02281             }
02282 
02283           // PRE: incoming_message_queue is empty
02284           if (this->process_parsed_messages (&qd,
02285                                              rh) == -1)
02286             {
02287               return -1;
02288             }
02289 
02290           // move the rd_ptr tp position of end_marker
02291           message_block.rd_ptr (end_marker);
02292         }
02293     }
02294 
02295   // Now that all cases have been processed, there might be kept some data
02296   // in buffer that needs to be safed for next "handle_input" invocations.
02297    if (message_block.length () > 0)
02298      {
02299        if (this->partial_message_ == 0)
02300          {
02301            this->allocate_partial_message_block ();
02302          }
02303 
02304        if (this->partial_message_ != 0 &&
02305            this->partial_message_->copy (message_block.rd_ptr (),
02306                                          message_block.length ()) == 0)
02307          {
02308            message_block.rd_ptr (message_block.length ());
02309          }
02310        else
02311          {
02312            return -1;
02313          }
02314      }
02315 
02316    return 0;
02317 }
02318 
02319 
02320 int
02321 TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
02322                                         TAO_Resume_Handle &rh)
02323 {
02324   if (TAO_debug_level > 7)
02325     {
02326       ACE_DEBUG ((LM_DEBUG,
02327          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02328          ACE_TEXT ("entering (missing data == %d)\n"),
02329          this->id(), qd->missing_data ()));
02330     }
02331 
02332 #if TAO_HAS_TRANSPORT_CURRENT == 1
02333   // Update stats, if any
02334   if (this->stats_ != 0)
02335     this->stats_->messages_received (qd->msg_block ()->length ());
02336 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
02337 
02338   switch (qd->msg_type ())
02339   {
02340     case GIOP::CloseConnection:
02341     {
02342       if (TAO_debug_level > 0)
02343         ACE_DEBUG ((LM_DEBUG,
02344            ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02345            ACE_TEXT ("received CloseConnection message - %m\n"),
02346            this->id()));
02347 
02348       // Return a "-1" so that the next stage can take care of
02349       // closing connection and the necessary memory management.
02350       return -1;
02351     }
02352     break;
02353     case GIOP::Request:
02354     case GIOP::LocateRequest:
02355     {
02356       // Let us resume the handle before we go ahead to process the
02357       // request. This will open up the handle for other threads.
02358       rh.resume_handle ();
02359 
02360       if (this->messaging_object ()->process_request_message (
02361             this,
02362             qd) == -1)
02363         {
02364           // Return a "-1" so that the next stage can take care of
02365           // closing connection and the necessary memory management.
02366           return -1;
02367         }
02368     }
02369     break;
02370     case GIOP::Reply:
02371     case GIOP::LocateReply:
02372     {
02373       rh.resume_handle ();
02374 
02375       TAO_Pluggable_Reply_Params params (this);
02376 
02377       if (this->messaging_object ()->process_reply_message (params, qd) == -1)
02378         {
02379           if (TAO_debug_level > 0)
02380             ACE_DEBUG ((LM_DEBUG,
02381                ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02382                ACE_TEXT ("error in process_reply_message - %m\n"),
02383                this->id ()));
02384 
02385           return -1;
02386         }
02387 
02388     }
02389     break;
02390     case GIOP::CancelRequest:
02391     {
02392       // The associated request might be incomplete residing
02393       // fragmented in messaging object. We must make sure the
02394       // resources allocated by fragments are released.
02395       if (this->messaging_object ()->discard_fragmented_message (qd) == -1)
02396         {
02397           if (TAO_debug_level > 0)
02398             {
02399               ACE_ERROR ((LM_ERROR,
02400                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02401                  ACE_TEXT ("error processing CancelRequest\n"),
02402                  this->id ()));
02403             }
02404         }
02405 
02406       // We are not able to cancel requests being processed already;
02407       // this is declared as optional feature by CORBA, and TAO does
02408       // not support this currently.
02409 
02410       // Just continue processing, CancelRequest does not mean to cut
02411       // off the connection.
02412     }
02413     break;
02414     case GIOP::MessageError:
02415     {
02416       if (TAO_debug_level > 0)
02417         {
02418           ACE_ERROR ((LM_ERROR,
02419              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02420              ACE_TEXT ("received MessageError, closing connection\n"),
02421              this->id ()));
02422         }
02423       return -1;
02424     }
02425     break;
02426     case GIOP::Fragment:
02427     {
02428       // Nothing to be done.
02429     }
02430     break;
02431   }
02432 
02433   // If not, just return back..
02434   return 0;
02435 }
02436 
02437 int
02438 TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
02439 {
02440   if (TAO_debug_level > 3)
02441     {
02442       ACE_DEBUG ((LM_DEBUG,
02443          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"),
02444          this->id (), this->incoming_message_queue_.queue_length () ));
02445     }
02446 
02447   // See if  message in queue ...
02448   if (this->incoming_message_queue_.queue_length () > 0)
02449     {
02450       // Get the message on the head of the queue..
02451       TAO_Queued_Data *qd =
02452         this->incoming_message_queue_.dequeue_head ();
02453 
02454       if (TAO_debug_level > 3)
02455         {
02456           ACE_DEBUG ((LM_DEBUG,
02457              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02458              ACE_TEXT ("the size of the queue is [%d]\n"),
02459              this->id (),
02460              this->incoming_message_queue_.queue_length()));
02461         }
02462       // Now that we have pulled out out one message out of the queue,
02463       // check whether we have one more message in the queue...
02464       if (this->incoming_message_queue_.queue_length () > 0)
02465         {
02466           if (TAO_debug_level > 0)
02467             {
02468               ACE_DEBUG ((LM_DEBUG,
02469                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02470                  ACE_TEXT ("notify reactor\n"),
02471                  this->id ()));
02472             }
02473 
02474           int const retval = this->notify_reactor ();
02475 
02476           if (retval == 1)
02477             {
02478               // Let the class know that it doesn't need to resume  the
02479               // handle..
02480               rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02481             }
02482           else if (retval < 0)
02483             return -1;
02484         }
02485       else
02486         {
02487           // As we are ready to process the last message just resume
02488           // the handle. Set the flag incase someone had reset the flag..
02489           rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02490         }
02491 
02492       // Process the message...
02493       if (this->process_parsed_messages (qd, rh) == -1)
02494         {
02495           return -1;
02496         }
02497 
02498       // Delete the Queued_Data..
02499       TAO_Queued_Data::release (qd);
02500 
02501       return 0;
02502     }
02503 
02504   return 1;
02505 }
02506 
02507 int
02508 TAO_Transport::notify_reactor (void)
02509 {
02510   if (!this->ws_->is_registered ())
02511     {
02512       return 0;
02513     }
02514 
02515   ACE_Event_Handler *eh = this->event_handler_i ();
02516 
02517   // Get the reactor associated with the event handler
02518   ACE_Reactor *reactor = this->orb_core ()->reactor ();
02519 
02520   if (TAO_debug_level > 0)
02521     {
02522       ACE_DEBUG ((LM_DEBUG,
02523          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02524          ACE_TEXT ("notify to Reactor\n"),
02525          this->id ()));
02526     }
02527 
02528 
02529   // Send a notification to the reactor...
02530   int const retval = reactor->notify (eh, ACE_Event_Handler::READ_MASK);
02531 
02532   if (retval < 0 && TAO_debug_level > 2)
02533     {
02534       // @todo: need to think about what is the action that
02535       // we can take when we get here.
02536       ACE_DEBUG ((LM_DEBUG,
02537          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02538          ACE_TEXT ("notify to the reactor failed..\n"),
02539          this->id ()));
02540     }
02541 
02542   return 1;
02543 }
02544 
02545 TAO::Transport_Cache_Manager &
02546 TAO_Transport::transport_cache_manager (void)
02547 {
02548   return this->orb_core_->lane_resources ().transport_cache ();
02549 }
02550 
02551 void
02552 TAO_Transport::assign_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp)
02553 {
02554   if (this->char_translator_)
02555     {
02556       this->char_translator_->assign (inp);
02557       this->char_translator_->assign (outp);
02558     }
02559   if (this->wchar_translator_)
02560     {
02561       this->wchar_translator_->assign (inp);
02562       this->wchar_translator_->assign (outp);
02563     }
02564 }
02565 
02566 void
02567 TAO_Transport::clear_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp)
02568 {
02569   if (inp)
02570     {
02571       inp->char_translator (0);
02572       inp->wchar_translator (0);
02573     }
02574   if (outp)
02575     {
02576       outp->char_translator (0);
02577       outp->wchar_translator (0);
02578     }
02579 }
02580 
02581 ACE_Event_Handler::Reference_Count
02582 TAO_Transport::add_reference (void)
02583 {
02584   return this->event_handler_i ()->add_reference ();
02585 }
02586 
02587 ACE_Event_Handler::Reference_Count
02588 TAO_Transport::remove_reference (void)
02589 {
02590   return this->event_handler_i ()->remove_reference ();
02591 }
02592 
02593 TAO_OutputCDR &
02594 TAO_Transport::out_stream (void)
02595 {
02596   return this->messaging_object ()->out_stream ();
02597 }
02598 
02599 void
02600 TAO_Transport::messaging_init (TAO_GIOP_Message_Version const &version)
02601 {
02602   this->messaging_object ()->init (version.major, version.minor);
02603 }
02604 
02605 void
02606 TAO_Transport::pre_close (void)
02607 {
02608   this->is_connected_ = false;
02609   this->purge_entry ();
02610   {
02611     ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
02612     this->cleanup_queue_i ();
02613   }
02614 }
02615 
02616 bool
02617 TAO_Transport::post_open (size_t id)
02618 {
02619   this->id_ = id;
02620 
02621   {
02622     ACE_GUARD_RETURN (ACE_Lock,
02623                       ace_mon,
02624                       *this->handler_lock_,
02625                       false);
02626     this->is_connected_ = true;
02627   }
02628 
02629   // When we have data in our outgoing queue schedule ourselves
02630   // for output
02631   if (this->queue_is_empty_i ())
02632     return true;
02633 
02634   // If the wait strategy wants us to be registered with the reactor
02635   // then we do so. If registeration is required and it succeeds,
02636   // #REFCOUNT# becomes two.
02637   if (this->wait_strategy ()->register_handler () != 0)
02638     {
02639       // Registration failures.
02640 
02641       // Purge from the connection cache, if we are not in the cache, this
02642       // just does nothing.
02643       (void) this->purge_entry ();
02644 
02645       // Close the handler.
02646       (void) this->close_connection ();
02647 
02648       if (TAO_debug_level > 0)
02649         ACE_ERROR ((LM_ERROR,
02650            ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_connect , ")
02651            ACE_TEXT ("could not register the transport ")
02652            ACE_TEXT ("in the reactor.\n"),
02653            this->id ()));
02654 
02655       return false;
02656     }
02657 
02658   return true;
02659 }
02660 
02661 void
02662 TAO_Transport::allocate_partial_message_block (void)
02663 {
02664   if (this->partial_message_ == 0)
02665     {
02666       // This value must be at least large enough to hold a GIOP message
02667       // header plus a GIOP fragment header
02668       size_t const partial_message_size =
02669         this->messaging_object ()->header_length ();
02670        // + this->messaging_object ()->fragment_header_length ();
02671        // deprecated, conflicts with not-single_read_opt.
02672 
02673       ACE_NEW (this->partial_message_,
02674                ACE_Message_Block (partial_message_size));
02675     }
02676 }
02677 
02678 /*
02679  * Hook to add concrete implementations from the derived class onto
02680  * TAO's transport.
02681  */
02682 
02683 //@@ TAO_TRANSPORT_SPL_METHODS_ADD_HOOK
02684 
02685 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:37:53 2010 for TAO by  doxygen 1.4.7