Transport.cpp

Go to the documentation of this file.
00001 // Transport.cpp,v 1.150 2006/06/20 06:33:40 jwillemsen Exp
00002 
00003 #include "tao/Transport.h"
00004 
00005 #include "tao/LF_Follower.h"
00006 #include "tao/Leader_Follower.h"
00007 #include "tao/Client_Strategy_Factory.h"
00008 #include "tao/Wait_Strategy.h"
00009 #include "tao/Transport_Mux_Strategy.h"
00010 #include "tao/Stub.h"
00011 #include "tao/Transport_Queueing_Strategies.h"
00012 #include "tao/Connection_Handler.h"
00013 #include "tao/Pluggable_Messaging.h"
00014 #include "tao/Synch_Queued_Message.h"
00015 #include "tao/Asynch_Queued_Message.h"
00016 #include "tao/Flushing_Strategy.h"
00017 #include "tao/Thread_Lane_Resources.h"
00018 #include "tao/Resume_Handle.h"
00019 #include "tao/Codeset_Manager.h"
00020 #include "tao/Codeset_Translator_Base.h"
00021 #include "tao/debug.h"
00022 #include "tao/CDR.h"
00023 #include "tao/ORB_Core.h"
00024 #include "tao/MMAP_Allocator.h"
00025 
00026 #include "ace/OS_NS_sys_time.h"
00027 #include "ace/OS_NS_stdio.h"
00028 #include "ace/Reactor.h"
00029 #include "ace/os_include/sys/os_uio.h"
00030 
00031 /*
00032  * Specialization hook to add include files from
00033  * concrete transport implementation.
00034  */
00035 //@@ TAO_TRANSPORT_SPL_INCLUDE_FORWARD_DECL_ADD_HOOK
00036 
00037 #if !defined (__ACE_INLINE__)
00038 # include "tao/Transport.inl"
00039 #endif /* __ACE_INLINE__ */
00040 
00041 
00042 ACE_RCSID (tao,
00043            Transport,
00044            "Transport.cpp,v 1.150 2006/06/20 06:33:40 jwillemsen Exp")
00045 
00046 /*
00047  * Static function in file scope
00048  */
00049 static void
00050 dump_iov (iovec *iov, int iovcnt, size_t id,
00051           size_t current_transfer,
00052           const char *location)
00053 {
00054   ACE_Log_Msg::instance ()->acquire ();
00055 
00056   ACE_DEBUG ((LM_DEBUG,
00057               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
00058               ACE_TEXT ("sending %d buffers\n"),
00059               id, ACE_TEXT_CHAR_TO_TCHAR (location), iovcnt));
00060 
00061   for (int i = 0; i != iovcnt && 0 < current_transfer; ++i)
00062     {
00063       size_t iov_len = iov[i].iov_len;
00064 
00065       // Possibly a partially sent iovec entry.
00066       if (current_transfer < iov_len)
00067         {
00068           iov_len = current_transfer;
00069         }
00070 
00071       ACE_DEBUG ((LM_DEBUG,
00072                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
00073                   ACE_TEXT ("buffer %d/%d has %d bytes\n"),
00074                   id, ACE_TEXT_CHAR_TO_TCHAR(location),
00075                   i, iovcnt,
00076                   iov_len));
00077 
00078       size_t len;
00079 
00080       for (size_t offset = 0; offset < iov_len; offset += len)
00081         {
00082           ACE_TCHAR header[1024];
00083           ACE_OS::sprintf (header,
00084                            ACE_TEXT("TAO - ")
00085                            ACE_TEXT("Transport[")
00086                            ACE_SIZE_T_FORMAT_SPECIFIER
00087                            ACE_TEXT("]::%s")
00088                            ACE_TEXT(" (")
00089                            ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT("/")
00090                            ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT(")"),
00091                            id, location, offset, iov_len);
00092 
00093           len = iov_len - offset;
00094 
00095           if (len > 512)
00096             {
00097               len = 512;
00098             }
00099 
00100           ACE_HEX_DUMP ((LM_DEBUG,
00101                          static_cast<char*> (iov[i].iov_base) + offset,
00102                          len,
00103                          header));
00104         }
00105       current_transfer -= iov_len;
00106     }
00107 
00108   ACE_DEBUG ((LM_DEBUG,
00109               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
00110               ACE_TEXT ("end of data\n"),
00111               id, ACE_TEXT_CHAR_TO_TCHAR(location)));
00112 
00113   ACE_Log_Msg::instance ()->release ();
00114 }
00115 
00116 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00117 
00118 TAO_Transport::TAO_Transport (CORBA::ULong tag,
00119                               TAO_ORB_Core *orb_core)
00120   : tag_ (tag)
00121   , orb_core_ (orb_core)
00122   , cache_map_entry_ (0)
00123   , bidirectional_flag_ (-1)
00124   , opening_connection_role_ (TAO::TAO_UNSPECIFIED_ROLE)
00125   , head_ (0)
00126   , tail_ (0)
00127   , incoming_message_queue_ (orb_core)
00128   , current_deadline_ (ACE_Time_Value::zero)
00129   , flush_timer_id_ (-1)
00130   , transport_timer_ (this)
00131   , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ())
00132   , id_ ((size_t) this)
00133   , purging_order_ (0)
00134   , recv_buffer_size_ (0)
00135   , sent_byte_count_ (0)
00136   , is_connected_ (false)
00137   , char_translator_ (0)
00138   , wchar_translator_ (0)
00139   , tcs_set_ (0)
00140   , first_request_ (1)
00141   , partial_message_ (0)
00142 #ifdef ACE_HAS_SENDFILE
00143     // The ORB has been configured to use the MMAP allocator, meaning
00144     // we could/should use sendfile() to send data.  Cast once rather
00145     // here rather than during each send.  This assumes that all
00146     // TAO_OutputCDR instances are using the same TAO_MMAP_Allocator
00147     // instance as the underlying output CDR buffer allocator.
00148   , mmap_allocator_ (
00149       dynamic_cast<TAO_MMAP_Allocator *> (
00150         orb_core->output_cdr_buffer_allocator ()))
00151 #endif  /* ACE_HAS_SENDFILE */
00152 {
00153   TAO_Client_Strategy_Factory *cf =
00154     this->orb_core_->client_factory ();
00155 
00156   // Create WS now.
00157   this->ws_ = cf->create_wait_strategy (this);
00158 
00159   // Create TMS now.
00160   this->tms_ = cf->create_transport_mux_strategy (this);
00161 
00162   /*
00163    * Hook to add code that initializes components that
00164    * belong to the concrete protocol implementation.
00165    * Further additions to this Transport class will
00166    * need to add code *before* this hook.
00167    */
00168   //@@ TAO_TRANSPORT_SPL_CONSTRUCTOR_ADD_HOOK
00169 }
00170 
00171 TAO_Transport::~TAO_Transport (void)
00172 {
00173   delete this->ws_;
00174 
00175   delete this->tms_;
00176 
00177   delete this->handler_lock_;
00178 
00179   if (!this->is_connected_)
00180     {
00181       // When we have a not connected transport we could have buffered
00182       // messages on this transport which we have to cleanup now.
00183       this->cleanup_queue_i();
00184 
00185       // Cleanup our cache entry
00186       this->purge_entry();
00187     }
00188 
00189   // Release the partial message block, however we may
00190   // have never allocated one.
00191   ACE_Message_Block::release (this->partial_message_);
00192 
00193   // By the time the destructor is reached here all the connection stuff
00194   // *must* have been cleaned up.
00195 
00196   // The following assert is needed for the test "Bug_2494_Regression".
00197   // See the bugzilla bug #2494 for details.
00198   ACE_ASSERT (this->head_ == 0);
00199   ACE_ASSERT (this->cache_map_entry_ == 0);
00200 
00201   /*
00202    * Hook to add code that cleans up components
00203    * belong to the concrete protocol implementation.
00204    * Further additions to this Transport class will
00205    * need to add code *before* this hook.
00206    */
00207   //@@ TAO_TRANSPORT_SPL_DESTRUCTOR_ADD_HOOK
00208 }
00209 
00210 void
00211 TAO_Transport::provide_handler (TAO::Connection_Handler_Set &handlers)
00212 {
00213   (void) this->add_reference ();
00214 
00215   handlers.insert (this->connection_handler_i ());
00216 }
00217 
00218 bool
00219 TAO_Transport::provide_blockable_handler (TAO::Connection_Handler_Set &h)
00220 {
00221   if (this->ws_->non_blocking () ||
00222       this->opening_connection_role_ == TAO::TAO_SERVER_ROLE)
00223     return false;
00224 
00225   (void) this->add_reference ();
00226 
00227   h.insert (this->connection_handler_i ());
00228 
00229   return true;
00230 }
00231 
00232 bool
00233 TAO_Transport::idle_after_send (void)
00234 {
00235   return this->tms ()->idle_after_send ();
00236 }
00237 
00238 bool
00239 TAO_Transport::idle_after_reply (void)
00240 {
00241   return this->tms ()->idle_after_reply ();
00242 }
00243 
00244 /*
00245  * A concrete transport class specializes this
00246  * method. This hook allows commenting this function
00247  * when TAO's transport is specialized. Note: All
00248  * functions that have an implementation that does
00249  * nothing should be added within this hook to
00250  * enable specialization.
00251  */
00252 //@@ TAO_TRANSPORT_SPL_COMMENT_HOOK_START
00253 
00254 int
00255 TAO_Transport::tear_listen_point_list (TAO_InputCDR &)
00256 {
00257   ACE_NOTSUP_RETURN (-1);
00258 }
00259 
00260 int
00261 TAO_Transport::send_message_shared (TAO_Stub *stub,
00262                                     int message_semantics,
00263                                     const ACE_Message_Block *message_block,
00264                                     ACE_Time_Value *max_wait_time)
00265 {
00266   int result = 0;
00267 
00268   {
00269     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00270 
00271     result =
00272       this->send_message_shared_i (stub, message_semantics,
00273                                    message_block, max_wait_time);
00274   }
00275 
00276   if (result == -1)
00277     {
00278       this->close_connection ();
00279     }
00280 
00281   return result;
00282 }
00283 
00284 //@@ TAO_TRANSPORT_SPL_COMMENT_HOOK_END
00285 
00286 bool
00287 TAO_Transport::post_connect_hook (void)
00288 {
00289   return true;
00290 }
00291 
00292 void
00293 TAO_Transport::close_connection (void)
00294 {
00295   this->connection_handler_i ()->close_connection ();
00296 }
00297 
00298 int
00299 TAO_Transport::register_handler (void)
00300 {
00301   if (TAO_debug_level > 4)
00302     {
00303       ACE_DEBUG ((LM_DEBUG,
00304                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"),
00305                   this->id ()));
00306     }
00307 
00308   ACE_Reactor * const r = this->orb_core_->reactor ();
00309 
00310   // @@note: This should be okay since the register handler call will
00311   // not make a nested call into the transport.
00312   ACE_GUARD_RETURN (ACE_Lock,
00313                     ace_mon,
00314                     *this->handler_lock_,
00315                     false);
00316 
00317   if (r == this->event_handler_i ()->reactor ())
00318     {
00319       return 0;
00320     }
00321 
00322   // Set the flag in the Connection Handler and in the Wait Strategy
00323   // @@Maybe we should set these flags after registering with the
00324   // reactor. What if the  registration fails???
00325   this->ws_->is_registered (true);
00326 
00327   // Register the handler with the reactor
00328   return r->register_handler (this->event_handler_i (),
00329                               ACE_Event_Handler::READ_MASK);
00330 }
00331 
00332 #ifdef ACE_HAS_SENDFILE
00333 ssize_t
00334 TAO_Transport::sendfile (TAO_MMAP_Allocator * /* allocator */,
00335                          iovec * iov,
00336                          int iovcnt,
00337                          size_t &bytes_transferred,
00338                          ACE_Time_Value const * timeout)
00339 {
00340   // Concrete pluggable transport doesn't implement sendfile().
00341   // Fallback on TAO_Transport::send().
00342 
00343   // @@ We can probably refactor the TAO_IIOP_Transport::sendfile()
00344   //    implementation to this base class method, and leave any TCP
00345   //    specific configuration out of this base class method.
00346   //      -Ossama
00347   return this->send (iov, iovcnt, bytes_transferred, timeout);
00348 }
00349 #endif  /* ACE_HAS_SENDFILE */
00350 
00351 int
00352 TAO_Transport::generate_locate_request (
00353     TAO_Target_Specification &spec,
00354     TAO_Operation_Details &opdetails,
00355     TAO_OutputCDR &output)
00356 {
00357   if (this->messaging_object ()->generate_locate_request_header (opdetails,
00358                                                                  spec,
00359                                                                  output)
00360        == -1)
00361     {
00362       if (TAO_debug_level > 0)
00363         {
00364           ACE_DEBUG ((LM_DEBUG,
00365                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ")
00366                       ACE_TEXT ("error while marshalling the LocateRequest header\n"),
00367                       this->id ()));
00368         }
00369 
00370       return -1;
00371     }
00372 
00373   return 0;
00374 }
00375 
00376 int
00377 TAO_Transport::generate_request_header (
00378     TAO_Operation_Details &opdetails,
00379     TAO_Target_Specification &spec,
00380     TAO_OutputCDR &output)
00381 {
00382   // codeset service context is only supposed to be sent in the first request
00383   // on a particular connection.
00384   if (this->first_request_)
00385     {
00386       TAO_Codeset_Manager * const csm = this->orb_core ()->codeset_manager ();
00387       if (csm)
00388         csm->generate_service_context (opdetails,*this);
00389     }
00390 
00391   if (this->messaging_object ()->generate_request_header (opdetails,
00392                                                           spec,
00393                                                           output) == -1)
00394     {
00395       if (TAO_debug_level > 0)
00396         {
00397         ACE_DEBUG ((LM_DEBUG,
00398                    ACE_TEXT ("(%P|%t) - Transport[%d]::generate_request_header, ")
00399                    ACE_TEXT ("error while marshalling the Request header\n"),
00400                       this->id()));
00401         }
00402 
00403       return -1;
00404     }
00405 
00406   return 0;
00407 }
00408 
00409 /// @todo Ideally the following should be inline.
00410 /// @todo purge_entry has a return value, use it
00411 int
00412 TAO_Transport::recache_transport (TAO_Transport_Descriptor_Interface *desc)
00413 {
00414   // First purge our entry
00415   this->purge_entry ();
00416 
00417   // Then add ourselves to the cache
00418   return this->transport_cache_manager ().cache_transport (desc,
00419                                                            this);
00420 }
00421 
00422 int
00423 TAO_Transport::purge_entry (void)
00424 {
00425   return this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
00426 }
00427 
00428 int
00429 TAO_Transport::make_idle (void)
00430 {
00431   if (TAO_debug_level > 3)
00432     {
00433       ACE_DEBUG ((LM_DEBUG,
00434                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"),
00435                   this->id ()));
00436     }
00437 
00438   return this->transport_cache_manager ().make_idle (this->cache_map_entry_);
00439 }
00440 
00441 int
00442 TAO_Transport::update_transport (void)
00443 {
00444   return this->transport_cache_manager ().update_entry (this->cache_map_entry_);
00445 }
00446 
00447 /*
00448  *
00449  *  Methods called and used in the output path of the ORB.
00450  *
00451  */
00452 int
00453 TAO_Transport::handle_output (void)
00454 {
00455   if (TAO_debug_level > 3)
00456     {
00457       ACE_DEBUG ((LM_DEBUG,
00458                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output\n"),
00459                   this->id ()));
00460     }
00461 
00462   // The flushing strategy (potentially via the Reactor) wants to send
00463   // more data, first check if there is a current message that needs
00464   // more sending...
00465   int const retval = this->drain_queue ();
00466 
00467   if (TAO_debug_level > 3)
00468     {
00469       ACE_DEBUG ((LM_DEBUG,
00470                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ")
00471                   ACE_TEXT ("drain_queue returns %d/%d\n"),
00472                   this->id (),
00473                   retval, errno));
00474     }
00475 
00476   // Any errors are returned directly to the Reactor
00477   return retval;
00478 }
00479 
00480 int
00481 TAO_Transport::format_queue_message (TAO_OutputCDR &stream)
00482 {
00483   if (this->messaging_object ()->format_message (stream) != 0)
00484     return -1;
00485 
00486   return this->queue_message_i (stream.begin());
00487 }
00488 
00489 int
00490 TAO_Transport::send_message_block_chain (const ACE_Message_Block *mb,
00491                                          size_t &bytes_transferred,
00492                                          ACE_Time_Value *max_wait_time)
00493 {
00494   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00495 
00496   return this->send_message_block_chain_i (mb,
00497                                            bytes_transferred,
00498                                            max_wait_time);
00499 }
00500 
00501 int
00502 TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb,
00503                                            size_t &bytes_transferred,
00504                                            ACE_Time_Value *)
00505 {
00506   size_t const total_length = mb->total_length ();
00507 
00508   // We are going to block, so there is no need to clone
00509   // the message block.
00510   TAO_Synch_Queued_Message synch_message (mb,
00511                                           this->orb_core_);
00512 
00513   synch_message.push_back (this->head_, this->tail_);
00514 
00515   int const n = this->drain_queue_i ();
00516 
00517   if (n == -1)
00518     {
00519       synch_message.remove_from_list (this->head_, this->tail_);
00520       return -1; // Error while sending...
00521     }
00522   else if (n == 1)
00523     {
00524       bytes_transferred = total_length;
00525       return 1;  // Empty queue, message was sent..
00526     }
00527 
00528   // Remove the temporary message from the queue...
00529   synch_message.remove_from_list (this->head_, this->tail_);
00530 
00531   bytes_transferred =
00532     total_length - synch_message.message_length ();
00533 
00534   return 0;
00535 }
00536 
00537 int
00538 TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
00539                                            ACE_Time_Value *max_wait_time)
00540 {
00541   // We are going to block, so there is no need to clone
00542   // the message block.
00543   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00544 
00545   // Push synch_message on to the back of the queue.
00546   synch_message.push_back (this->head_, this->tail_);
00547 
00548   int const n =
00549     this->send_synch_message_helper_i (synch_message,
00550                                        max_wait_time);
00551 
00552   if (n == -1 || n == 1)
00553     {
00554       return n;
00555     }
00556 
00557   // @todo: Check for timeouts!
00558   // if (max_wait_time != 0 && errno == ETIME) return -1;
00559   TAO_Flushing_Strategy *flushing_strategy =
00560     this->orb_core ()->flushing_strategy ();
00561   (void) flushing_strategy->schedule_output (this);
00562 
00563   // Release the mutex, other threads may modify the queue as we
00564   // block for a long time writing out data.
00565   int result;
00566   {
00567     typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00568     TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00569     ACE_GUARD_RETURN (TAO_REVERSE_LOCK,
00570                       ace_mon,
00571                       reverse,
00572                       -1);
00573 
00574     result = flushing_strategy->flush_message (this,
00575                                                &synch_message,
00576                                                max_wait_time);
00577   }
00578 
00579   if (result == -1)
00580     {
00581       synch_message.remove_from_list (this->head_, this->tail_);
00582 
00583       if (errno == ETIME)
00584         {
00585           if (this->head_ == &synch_message)
00586             {
00587               // This is a timeout, there is only one nasty case: the
00588               // message has been partially sent!  We simply cannot take
00589               // the message out of the queue, because that would corrupt
00590               // the connection.
00591               //
00592               // What we do is replace the queued message with an
00593               // asynchronous message, that contains only what remains of
00594               // the timed out request.  If you think about sending
00595               // CancelRequests in this case: there is no much point in
00596               // doing that: the receiving ORB would probably ignore it,
00597               // and figuring out the request ID would be a bit of a
00598               // nightmare.
00599               //
00600 
00601               synch_message.remove_from_list (this->head_, this->tail_);
00602               TAO_Queued_Message *queued_message = 0;
00603               ACE_NEW_RETURN (queued_message,
00604                               TAO_Asynch_Queued_Message (
00605                                   synch_message.current_block (),
00606                                   this->orb_core_,
00607                                   0,
00608                                   1),
00609                               -1);
00610               queued_message->push_front (this->head_, this->tail_);
00611             }
00612         }
00613 
00614       if (TAO_debug_level > 0)
00615         {
00616           ACE_ERROR ((LM_ERROR,
00617              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ")
00618              ACE_TEXT ("error while flushing message - %m\n"),
00619              this->id ()));
00620         }
00621 
00622       return -1;
00623     }
00624 
00625   return 1;
00626 }
00627 
00628 
00629 int
00630 TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb,
00631                                      ACE_Time_Value *max_wait_time)
00632 {
00633   // Dont clone now.. We could be sent in one shot!
00634   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00635 
00636   synch_message.push_back (this->head_,
00637                            this->tail_);
00638 
00639   int const n =
00640     this->send_synch_message_helper_i (synch_message,
00641                                        max_wait_time);
00642 
00643   if (n == -1 || n == 1)
00644     {
00645       return n;
00646     }
00647 
00648   if (TAO_debug_level > 3)
00649     {
00650       ACE_DEBUG ((LM_DEBUG,
00651          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ")
00652          ACE_TEXT ("preparing to add to queue before leaving \n"),
00653          this->id ()));
00654     }
00655 
00656   // Till this point we shouldn't have any copying and that is the
00657   // point anyway. Now, remove the node from the list
00658   synch_message.remove_from_list (this->head_,
00659                                   this->tail_);
00660 
00661   // Clone the node that we have.
00662   TAO_Queued_Message *msg =
00663     synch_message.clone (this->orb_core_->transport_message_buffer_allocator ());
00664 
00665   // Stick it in the queue
00666   msg->push_back (this->head_,
00667                   this->tail_);
00668 
00669   TAO_Flushing_Strategy *flushing_strategy =
00670     this->orb_core ()->flushing_strategy ();
00671 
00672   int result = flushing_strategy->schedule_output (this);
00673 
00674   if (result == -1)
00675     {
00676       if (TAO_debug_level > 5)
00677         {
00678           ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_"
00679                       "message_i dequeuing msg due to schedule_output "
00680                       "failure\n", this->id ()));
00681         }
00682       msg->remove_from_list (this->head_, this->tail_);
00683       msg->destroy ();
00684     }
00685 
00686   return 1;
00687 }
00688 
00689 int
00690 TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_message,
00691                                             ACE_Time_Value * /*max_wait_time*/)
00692 {
00693   // @@todo: Need to send timeouts for writing..
00694   int const n = this->drain_queue_i ();
00695 
00696   if (n == -1)
00697     {
00698       synch_message.remove_from_list (this->head_, this->tail_);
00699       return -1; // Error while sending...
00700     }
00701   else if (n == 1)
00702     {
00703       return 1;  // Empty queue, message was sent..
00704     }
00705 
00706   if (synch_message.all_data_sent ())
00707     {
00708       return 1;
00709     }
00710 
00711   return 0;
00712 }
00713 
00714 int
00715 TAO_Transport::queue_is_empty_i (void)
00716 {
00717   return (this->head_ == 0);
00718 }
00719 
00720 
00721 int
00722 TAO_Transport::schedule_output_i (void)
00723 {
00724   ACE_Event_Handler *eh = this->event_handler_i ();
00725   ACE_Reactor *reactor = eh->reactor ();
00726 
00727   // Check to see if our event handler is still registered with the
00728   // reactor.  It's possible for another thread to have run close_connection()
00729   // since we last used the event handler.
00730   ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ());
00731   if (found != eh)
00732     {
00733       if(TAO_debug_level > 3)
00734         {
00735           ACE_DEBUG ((LM_DEBUG,
00736                       "TAO (%P|%t) - Transport[%d]::schedule_output_i "
00737                       "event handler not found in reactor, returning -1\n",
00738                       this->id ()));
00739         }
00740       if (found)
00741         {
00742           found->remove_reference ();
00743         }
00744       return -1;
00745     }
00746   found->remove_reference ();
00747 
00748   if (TAO_debug_level > 3)
00749     {
00750       ACE_DEBUG ((LM_DEBUG,
00751          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"),
00752          this->id ()));
00753     }
00754 
00755   return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00756 }
00757 
00758 int
00759 TAO_Transport::cancel_output_i (void)
00760 {
00761   ACE_Event_Handler * const eh = this->event_handler_i ();
00762   ACE_Reactor *const reactor = eh->reactor ();
00763 
00764   if (TAO_debug_level > 3)
00765     {
00766       ACE_DEBUG ((LM_DEBUG,
00767          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"),
00768          this->id ()));
00769     }
00770 
00771   return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00772 }
00773 
00774 int
00775 TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */,
00776                                const void *act)
00777 {
00778   if (TAO_debug_level > 6)
00779     {
00780       ACE_DEBUG ((LM_DEBUG,
00781          ACE_TEXT ("TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, ")
00782          ACE_TEXT ("timer expired\n"),
00783          this->id ()));
00784     }
00785 
00786   /// This is the only legal ACT in the current configuration....
00787   if (act != &this->current_deadline_)
00788     {
00789       return -1;
00790     }
00791 
00792   if (this->flush_timer_pending ())
00793     {
00794       // The timer is always a oneshot timer, so mark is as not
00795       // pending.
00796       this->reset_flush_timer ();
00797 
00798       TAO_Flushing_Strategy *flushing_strategy =
00799         this->orb_core ()->flushing_strategy ();
00800       (void) flushing_strategy->schedule_output (this);
00801     }
00802 
00803   return 0;
00804 }
00805 
00806 int
00807 TAO_Transport::drain_queue (void)
00808 {
00809   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00810   int const retval = this->drain_queue_i ();
00811 
00812   if (retval == 1)
00813     {
00814       // ... there is no current message or it was completely
00815       // sent, cancel output...
00816       TAO_Flushing_Strategy *flushing_strategy =
00817         this->orb_core ()->flushing_strategy ();
00818 
00819       flushing_strategy->cancel_output (this);
00820 
00821       return 0;
00822     }
00823 
00824   return retval;
00825 }
00826 
00827 int
00828 TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
00829 {
00830   size_t byte_count = 0;
00831 
00832   // ... send the message ...
00833   ssize_t retval = -1;
00834 
00835 #ifdef ACE_HAS_SENDFILE
00836   if (this->mmap_allocator_)
00837     retval = this->sendfile (this->mmap_allocator_,
00838                              iov,
00839                              iovcnt,
00840                              byte_count);
00841   else
00842 #endif  /* ACE_HAS_SENDFILE */
00843     retval = this->send (iov, iovcnt, byte_count);
00844 
00845   if (TAO_debug_level == 5)
00846     {
00847       dump_iov (iov, iovcnt, this->id (),
00848                 byte_count, "drain_queue_helper");
00849     }
00850 
00851   // ... now we need to update the queue, removing elements
00852   // that have been sent, and updating the last element if it
00853   // was only partially sent ...
00854   this->cleanup_queue (byte_count);
00855   iovcnt = 0;
00856 
00857   if (retval == 0)
00858     {
00859       if (TAO_debug_level > 4)
00860         {
00861           ACE_DEBUG ((LM_DEBUG,
00862              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00863              ACE_TEXT ("send() returns 0\n"),
00864              this->id ()));
00865         }
00866       return -1;
00867     }
00868   else if (retval == -1)
00869     {
00870       if (TAO_debug_level > 4)
00871         {
00872           ACE_DEBUG ((LM_DEBUG,
00873              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00874              ACE_TEXT ("error during %p\n"),
00875              this->id (), ACE_TEXT ("send()")));
00876         }
00877 
00878       if (errno == EWOULDBLOCK || errno == EAGAIN)
00879         {
00880           return 0;
00881         }
00882 
00883       return -1;
00884     }
00885 
00886   // ... start over, how do we guarantee progress?  Because if
00887   // no bytes are sent send() can only return 0 or -1
00888 
00889   // Total no. of bytes sent for a send call
00890   this->sent_byte_count_ += byte_count;
00891 
00892   if (TAO_debug_level > 4)
00893     {
00894       ACE_DEBUG ((LM_DEBUG,
00895          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00896          ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"),
00897          this->id(), byte_count, (this->head_ == 0)));
00898     }
00899 
00900   return 1;
00901 }
00902 
00903 int
00904 TAO_Transport::drain_queue_i (void)
00905 {
00906   // This is the vector used to send data, it must be declared outside
00907   // the loop because after the loop there may still be data to be
00908   // sent
00909   int iovcnt = 0;
00910 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00911   iovec iov[ACE_IOV_MAX] = { 0 , 0 };
00912 #else
00913   iovec iov[ACE_IOV_MAX];
00914 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
00915 
00916   // We loop over all the elements in the queue ...
00917   TAO_Queued_Message *i = this->head_;
00918 
00919   // Reset the value so that the counting is done for each new send
00920   // call.
00921   this->sent_byte_count_ = 0;
00922 
00923   while (i != 0)
00924     {
00925       // ... each element fills the iovector ...
00926       i->fill_iov (ACE_IOV_MAX, iovcnt, iov);
00927 
00928       // ... the vector is full, no choice but to send some data out.
00929       // We need to loop because a single message can span multiple
00930       // IOV_MAX elements ...
00931       if (iovcnt == ACE_IOV_MAX)
00932         {
00933           int const retval =
00934             this->drain_queue_helper (iovcnt, iov);
00935 
00936           if (TAO_debug_level > 4)
00937             {
00938               ACE_DEBUG ((LM_DEBUG,
00939                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
00940                  ACE_TEXT ("helper retval = %d\n"),
00941                  this->id (), retval));
00942             }
00943 
00944           if (retval != 1)
00945             {
00946               return retval;
00947             }
00948 
00949           i = this->head_;
00950           continue;
00951         }
00952       // ... notice that this line is only reached if there is still
00953       // room in the iovector ...
00954       i = i->next ();
00955     }
00956 
00957   if (iovcnt != 0)
00958     {
00959       int const retval = this->drain_queue_helper (iovcnt, iov);
00960 
00961       if (TAO_debug_level > 4)
00962         {
00963           ACE_DEBUG ((LM_DEBUG,
00964               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
00965               ACE_TEXT ("helper retval = %d\n"),
00966               this->id (), retval));
00967         }
00968 
00969       if (retval != 1)
00970         {
00971           return retval;
00972         }
00973     }
00974 
00975   if (this->head_ == 0)
00976     {
00977       if (this->flush_timer_pending ())
00978         {
00979           ACE_Event_Handler *eh = this->event_handler_i ();
00980           ACE_Reactor * const reactor = eh->reactor ();
00981           reactor->cancel_timer (this->flush_timer_id_);
00982           this->reset_flush_timer ();
00983         }
00984 
00985       return 1;
00986     }
00987 
00988   return 0;
00989 }
00990 
00991 void
00992 TAO_Transport::cleanup_queue_i ()
00993 {
00994   if (TAO_debug_level > 4)
00995     {
00996       ACE_DEBUG ((LM_DEBUG,
00997          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
00998          ACE_TEXT ("cleaning up complete queue\n"),
00999          this->id ()));
01000     }
01001 
01002   // Cleanup all messages
01003   while (this->head_ != 0)
01004     {
01005       TAO_Queued_Message *i = this->head_;
01006 
01007        // @@ This is a good point to insert a flag to indicate that a
01008        //    CloseConnection message was successfully received.
01009       i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
01010                         this->orb_core_->leader_follower ());
01011 
01012       i->remove_from_list (this->head_, this->tail_);
01013 
01014       i->destroy ();
01015    }
01016 }
01017 
01018 void
01019 TAO_Transport::cleanup_queue (size_t byte_count)
01020 {
01021   while (this->head_ != 0 && byte_count > 0)
01022     {
01023       TAO_Queued_Message *i = this->head_;
01024 
01025       if (TAO_debug_level > 4)
01026         {
01027           ACE_DEBUG ((LM_DEBUG,
01028              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01029              ACE_TEXT ("byte_count = %d\n"),
01030              this->id (), byte_count));
01031         }
01032 
01033       // Update the state of the first message
01034       i->bytes_transferred (byte_count);
01035 
01036       if (TAO_debug_level > 4)
01037         {
01038           ACE_DEBUG ((LM_DEBUG,
01039              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01040              ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"),
01041              this->id (), byte_count, i->all_data_sent (),
01042              i->message_length ()));
01043         }
01044 
01045       // ... if all the data was sent the message must be removed from
01046       // the queue...
01047       if (i->all_data_sent ())
01048         {
01049           i->remove_from_list (this->head_, this->tail_);
01050           i->destroy ();
01051         }
01052     }
01053 }
01054 
01055 int
01056 TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub,
01057                                               bool &must_flush)
01058 {
01059   // First let's compute the size of the queue:
01060   size_t msg_count = 0;
01061   size_t total_bytes = 0;
01062 
01063   for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
01064     {
01065       ++msg_count;
01066       total_bytes += i->message_length ();
01067     }
01068 
01069   bool set_timer;
01070   ACE_Time_Value new_deadline;
01071 
01072   bool constraints_reached =
01073     stub->transport_queueing_strategy ().
01074       buffering_constraints_reached (stub,
01075                                      msg_count,
01076                                      total_bytes,
01077                                      must_flush,
01078                                      this->current_deadline_,
01079                                      set_timer,
01080                                      new_deadline);
01081 
01082   // ... set the new timer, also cancel any previous timers ...
01083   if (set_timer)
01084     {
01085       ACE_Event_Handler *eh = this->event_handler_i ();
01086       ACE_Reactor *reactor = eh->reactor ();
01087       this->current_deadline_ = new_deadline;
01088       ACE_Time_Value delay =
01089         new_deadline - ACE_OS::gettimeofday ();
01090 
01091       if (this->flush_timer_pending ())
01092         {
01093           reactor->cancel_timer (this->flush_timer_id_);
01094         }
01095 
01096       this->flush_timer_id_ =
01097         reactor->schedule_timer (&this->transport_timer_,
01098                                  &this->current_deadline_,
01099                                  delay);
01100     }
01101 
01102   return constraints_reached;
01103 }
01104 
01105 void
01106 TAO_Transport::report_invalid_event_handler (const char *caller)
01107 {
01108   if (TAO_debug_level > 0)
01109     {
01110       ACE_DEBUG ((LM_DEBUG,
01111          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler")
01112          ACE_TEXT ("(%s) no longer associated with handler [tag=%d]\n"),
01113          this->id (), ACE_TEXT_CHAR_TO_TCHAR (caller), this->tag_));
01114     }
01115 }
01116 
01117 void
01118 TAO_Transport::send_connection_closed_notifications (void)
01119 {
01120   {
01121     ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
01122 
01123     this->send_connection_closed_notifications_i ();
01124   }
01125 
01126   this->tms ()->connection_closed ();
01127 }
01128 
01129 void
01130 TAO_Transport::send_connection_closed_notifications_i (void)
01131 {
01132   this->cleanup_queue_i ();
01133 
01134   this->messaging_object ()->reset ();
01135 }
01136 
01137 int
01138 TAO_Transport::send_message_shared_i (TAO_Stub *stub,
01139                                       int message_semantics,
01140                                       const ACE_Message_Block *message_block,
01141                                       ACE_Time_Value *max_wait_time)
01142 {
01143   switch (message_semantics)
01144     {
01145       case TAO_Transport::TAO_TWOWAY_REQUEST:
01146         return this->send_synchronous_message_i (message_block,
01147                                                  max_wait_time);
01148       case TAO_Transport::TAO_REPLY:
01149         return this->send_reply_message_i (message_block,
01150                                            max_wait_time);
01151       case TAO_Transport::TAO_ONEWAY_REQUEST:
01152         return this->send_asynchronous_message_i (stub,
01153                                                   message_block,
01154                                                   max_wait_time);
01155     }
01156 
01157   return -1;
01158 }
01159 
01160 int
01161 TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
01162                                             const ACE_Message_Block *message_block,
01163                                             ACE_Time_Value *max_wait_time)
01164 {
01165   // Let's figure out if the message should be queued without trying
01166   // to send first:
01167   bool try_sending_first = true;
01168 
01169   const bool queue_empty = (this->head_ == 0);
01170 
01171   if (!queue_empty)
01172     {
01173       try_sending_first = false;
01174     }
01175   else if (stub->transport_queueing_strategy ().must_queue (queue_empty))
01176     {
01177       try_sending_first = false;
01178     }
01179 
01180   if (try_sending_first)
01181     {
01182       ssize_t n = 0;
01183       size_t byte_count = 0;
01184       // ... in this case we must try to send the message first ...
01185 
01186       const size_t total_length = message_block->total_length ();
01187 
01188       if (TAO_debug_level > 6)
01189         {
01190           ACE_DEBUG ((LM_DEBUG,
01191              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01192              ACE_TEXT ("trying to send the message (ml = %d)\n"),
01193              this->id (), total_length));
01194         }
01195 
01196       // @@ I don't think we want to hold the mutex here, however if
01197       // we release it we need to recheck the status of the transport
01198       // after we return... once I understand the final form for this
01199       // code I will re-visit this decision
01200       n = this->send_message_block_chain_i (message_block,
01201                                             byte_count,
01202                                             max_wait_time);
01203       if (n == -1)
01204         {
01205           // ... if this is just an EWOULDBLOCK we must schedule the
01206           // message for later, if it is ETIME we still have to send
01207           // the complete message, because cutting off the message at
01208           // this point will destroy the synchronization with the
01209           // server ...
01210           if (errno != EWOULDBLOCK && errno != ETIME)
01211             {
01212               if (TAO_debug_level > 0)
01213                 {
01214                   ACE_ERROR ((LM_ERROR,
01215                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01216                      ACE_TEXT ("fatal error in ")
01217                      ACE_TEXT ("send_message_block_chain_i - %m\n"),
01218                      this->id ()));
01219                 }
01220               return -1;
01221             }
01222         }
01223 
01224       // ... let's figure out if the complete message was sent ...
01225       if (total_length == byte_count)
01226         {
01227           // Done, just return.  Notice that there are no allocations
01228           // or copies up to this point (though some fancy calling
01229           // back and forth).
01230           // This is the common case for the critical path, it should
01231           // be fast.
01232           return 0;
01233         }
01234 
01235       if (TAO_debug_level > 6)
01236         {
01237           ACE_DEBUG ((LM_DEBUG,
01238              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01239              ACE_TEXT ("partial send %d / %d bytes\n"),
01240              this->id (), byte_count, total_length));
01241         }
01242 
01243       // ... part of the data was sent, need to figure out what piece
01244       // of the message block chain must be queued ...
01245       while (message_block != 0 && message_block->length () == 0)
01246         {
01247           message_block = message_block->cont ();
01248         }
01249 
01250       // ... at least some portion of the message block chain should
01251       // remain ...
01252     }
01253 
01254   // ... either the message must be queued or we need to queue it
01255   // because it was not completely sent out ...
01256 
01257   if (TAO_debug_level > 6)
01258     {
01259       ACE_DEBUG ((LM_DEBUG,
01260          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01261          ACE_TEXT ("message is queued\n"),
01262          this->id ()));
01263     }
01264 
01265   if (this->queue_message_i(message_block) == -1)
01266   {
01267     if (TAO_debug_level > 0)
01268     ACE_DEBUG ((LM_DEBUG,
01269        ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01270        ACE_TEXT ("cannot queue message for ")
01271        ACE_TEXT (" - %m\n"),
01272        this->id ()));
01273     return -1;
01274   }
01275 
01276   // ... if the queue is full we need to activate the output on the
01277   // queue ...
01278   bool must_flush = false;
01279   const bool constraints_reached =
01280     this->check_buffering_constraints_i (stub,
01281                                          must_flush);
01282 
01283   // ... but we also want to activate it if the message was partially
01284   // sent.... Plus, when we use the blocking flushing strategy the
01285   // queue is flushed as a side-effect of 'schedule_output()'
01286 
01287   TAO_Flushing_Strategy *flushing_strategy =
01288     this->orb_core ()->flushing_strategy ();
01289 
01290   if (constraints_reached || try_sending_first)
01291     {
01292       (void) flushing_strategy->schedule_output (this);
01293     }
01294 
01295   if (must_flush)
01296     {
01297       typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
01298       TAO_REVERSE_LOCK reverse (*this->handler_lock_);
01299       ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
01300 
01301       (void) flushing_strategy->flush_transport (this);
01302     }
01303 
01304   return 0;
01305 }
01306 
01307 int
01308 TAO_Transport::queue_message_i(const ACE_Message_Block *message_block)
01309 {
01310   TAO_Queued_Message *queued_message = 0;
01311   ACE_NEW_RETURN (queued_message,
01312                   TAO_Asynch_Queued_Message (message_block,
01313                                              this->orb_core_,
01314                                              0,
01315                                              1),
01316                   -1);
01317   queued_message->push_back (this->head_, this->tail_);
01318 
01319   return 0;
01320 }
01321 
01322 /*
01323  *
01324  * All the methods relevant to the incoming data path of the ORB are
01325  * defined below
01326  *
01327  */
01328 int
01329 TAO_Transport::handle_input (TAO_Resume_Handle &rh,
01330                              ACE_Time_Value * max_wait_time,
01331                              int /* block */ /* deprecated parameter */ )
01332 {
01333   if (TAO_debug_level > 3)
01334     {
01335       ACE_DEBUG ((LM_DEBUG,
01336          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"),
01337          this->id ()));
01338     }
01339 
01340   // First try to process messages of the head of the incoming queue.
01341   int const retval = this->process_queue_head (rh);
01342 
01343   if (retval <= 0)
01344     {
01345       if (retval == -1)
01346         {
01347           if (TAO_debug_level > 2)
01348             {
01349               ACE_DEBUG ((LM_DEBUG,
01350                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01351                  ACE_TEXT ("error while parsing the head of the queue\n"),
01352                  this->id()));
01353 
01354             }
01355           return -1;
01356         }
01357       else
01358         {
01359           // retval == 0
01360 
01361           // Processed a message in queue successfully. This
01362           // thread must return to thread-pool now.
01363           return 0;
01364         }
01365     }
01366 
01367   TAO_Queued_Data *q_data = 0;
01368 
01369   if (this->incoming_message_stack_.top (q_data) != -1
01370       && q_data->missing_data_ != TAO_MISSING_DATA_UNDEFINED)
01371     {
01372       /* PRE: q_data->missing_data_ > 0 as all QD on stack must be incomplete  */
01373       if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1)
01374         {
01375           if (TAO_debug_level > 0)
01376             {
01377               ACE_ERROR ((LM_ERROR,
01378                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01379                  ACE_TEXT ("error consolidating incoming message\n"),
01380                  this->id ()));
01381             }
01382           return -1;
01383         }
01384     }
01385   else
01386     {
01387       if (this->handle_input_parse_data (rh, max_wait_time) == -1)
01388         {
01389           if (TAO_debug_level > 0)
01390             {
01391               ACE_ERROR ((LM_ERROR,
01392                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01393                  ACE_TEXT ("error parsing incoming message\n"),
01394                  this->id ()));
01395             }
01396           return -1;
01397         }
01398     }
01399 
01400   return 0;
01401 }
01402 
01403 int
01404 TAO_Transport::consolidate_process_message (TAO_Queued_Data *q_data,
01405                                             TAO_Resume_Handle &rh)
01406 {
01407   // paranoid check
01408   if (q_data->missing_data_ != 0)
01409     {
01410       if (TAO_debug_level > 0)
01411         {
01412            ACE_ERROR ((LM_ERROR,
01413               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01414               ACE_TEXT ("missing data\n"),
01415               this->id ()));
01416         }
01417        return -1;
01418     }
01419 
01420   if (q_data->more_fragments_ ||
01421       q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
01422     {
01423       // consolidate message on top of stack, only for fragmented messages
01424       TAO_Queued_Data *new_q_data = 0;
01425 
01426       switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01427         {
01428         case -1: // error
01429           return -1;
01430 
01431         case 0:  // returning consolidated message in q_data
01432           if (!new_q_data)
01433             {
01434               if (TAO_debug_level > 0)
01435                 {
01436                   ACE_ERROR ((LM_ERROR,
01437                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01438                      ACE_TEXT ("error, consolidated message is NULL\n"),
01439                      this->id ()));
01440                 }
01441               return -1;
01442             }
01443 
01444 
01445           if (this->process_parsed_messages (new_q_data, rh) == -1)
01446             {
01447               TAO_Queued_Data::release (new_q_data);
01448 
01449               if (TAO_debug_level > 0)
01450                 {
01451                   ACE_ERROR ((LM_ERROR,
01452                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01453                      ACE_TEXT ("error processing consolidated message\n"),
01454                      this->id ()));
01455                 }
01456               return -1;
01457             }
01458 
01459           TAO_Queued_Data::release (new_q_data);
01460 
01461           break;
01462 
01463         case 1:  // fragment has been stored in messaging_oject()
01464           break;
01465         }
01466     }
01467   else
01468     {
01469       if (this->process_parsed_messages (q_data, rh) == -1)
01470         {
01471           TAO_Queued_Data::release (q_data);
01472 
01473           if (TAO_debug_level > 0)
01474             {
01475               ACE_ERROR ((LM_ERROR,
01476                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01477                  ACE_TEXT ("error processing message\n"),
01478                  this->id ()));
01479             }
01480           return -1;
01481         }
01482 
01483       TAO_Queued_Data::release (q_data);
01484 
01485     }
01486 
01487   return 0;
01488 }
01489 
01490 int
01491 TAO_Transport::consolidate_enqueue_message (TAO_Queued_Data *q_data)
01492 {
01493   // consolidate message on top of stack, only for fragmented messages
01494 
01495   // paranoid check
01496   if (q_data->missing_data_ != 0)
01497     {
01498        return -1;
01499     }
01500 
01501   if (q_data->more_fragments_ ||
01502       q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
01503     {
01504       TAO_Queued_Data *new_q_data = 0;
01505 
01506       switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01507         {
01508         case -1: // error
01509           return -1;
01510 
01511         case 0:  // returning consolidated message in new_q_data
01512           if (!new_q_data)
01513             {
01514               if (TAO_debug_level > 0)
01515                 {
01516                   ACE_ERROR ((LM_ERROR,
01517                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ")
01518                      ACE_TEXT ("error, consolidated message is NULL\n"),
01519                      this->id ()));
01520                 }
01521               return -1;
01522             }
01523 
01524           if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0)
01525             {
01526               TAO_Queued_Data::release (new_q_data);
01527               return -1;
01528             }
01529           break;
01530 
01531         case 1:  // fragment has been stored in messaging_oject()
01532           break;
01533         }
01534     }
01535   else
01536     {
01537       if (this->incoming_message_queue_.enqueue_tail (q_data) != 0)
01538         {
01539           TAO_Queued_Data::release (q_data);
01540           return -1;
01541         }
01542     }
01543 
01544   return 0; // success
01545 }
01546 
01547 int
01548 TAO_Transport::handle_input_missing_data (TAO_Resume_Handle &rh,
01549                                           ACE_Time_Value * max_wait_time,
01550                                           TAO_Queued_Data *q_data)
01551 {
01552   // paranoid check
01553   if (q_data == 0)
01554     {
01555       return -1;
01556     }
01557 
01558   if (TAO_debug_level > 3)
01559     {
01560       ACE_DEBUG ((LM_DEBUG,
01561          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01562          ACE_TEXT ("enter (missing data == %d)\n"),
01563          this->id (), q_data->missing_data_));
01564     }
01565 
01566   size_t const recv_size = q_data->missing_data_;
01567 
01568   // make sure the message_block has enough space
01569   size_t const message_size =  recv_size
01570                                + q_data->msg_block_->length();
01571 
01572   if (q_data->msg_block_->space() < recv_size)
01573     {
01574       if (ACE_CDR::grow (q_data->msg_block_, message_size) == -1)
01575         {
01576           return -1;
01577         }
01578     }
01579 
01580   // Saving the size of the received buffer in case any one needs to
01581   // get the size of the message thats received in the
01582   // context. Obviously the value will be changed for each recv call
01583   // and the user is supposed to invoke the accessor only in the
01584   // invocation context to get meaningful information.
01585   this->recv_buffer_size_ = recv_size;
01586 
01587   // Read the message into the existing message block on heap
01588   ssize_t const n = this->recv (q_data->msg_block_->wr_ptr(),
01589                                 recv_size,
01590                                 max_wait_time);
01591 
01592 
01593   if (n <= 0)
01594     {
01595       return n;
01596     }
01597 
01598   if (TAO_debug_level > 3)
01599     {
01600       ACE_DEBUG ((LM_DEBUG,
01601          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01602          ACE_TEXT ("read bytes %d\n"),
01603          this->id (), n));
01604     }
01605 
01606   q_data->msg_block_->wr_ptr(n);
01607   q_data->missing_data_ -= n;
01608 
01609   if (q_data->missing_data_ == 0)
01610     {
01611       // paranoid check
01612       if (this->incoming_message_stack_.pop (q_data) == -1)
01613         {
01614           return -1;
01615         }
01616 
01617       if (this->consolidate_process_message (q_data, rh) == -1)
01618         {
01619           return -1;
01620         }
01621     }
01622 
01623   return 0;
01624 }
01625 
01626 
01627 int
01628 TAO_Transport::handle_input_parse_extra_messages (ACE_Message_Block &message_block)
01629 {
01630 
01631   // store buffer status of last extraction: -1 parse error, 0
01632   // incomplete message header in buffer, 1 complete messages header
01633   // parsed
01634   int buf_status = 0;
01635 
01636   TAO_Queued_Data *q_data = 0;     // init
01637 
01638   // parse buffer until all messages have been extracted, consolidate
01639   // and enqueue complete messages, if the last message being parsed
01640   // has missin data, it is stays on top of incoming_message_stack.
01641   while (message_block.length () > 0 &&
01642          (buf_status = this->messaging_object ()->extract_next_message
01643           (message_block, q_data)) != -1 &&
01644          q_data != 0) // paranoid check
01645     {
01646       if (q_data->missing_data_ == 0)
01647         {
01648           if (this->consolidate_enqueue_message (q_data) == -1)
01649             {
01650               return -1;
01651             }
01652         }
01653       else  // incomplete message read, probably the last message in buffer
01654         {
01655           // can not fail
01656           this->incoming_message_stack_.push (q_data);
01657         }
01658 
01659       q_data = 0; // reset
01660     } // while
01661 
01662   if (buf_status == -1)
01663     {
01664       return -1;
01665     }
01666 
01667   return 0;
01668 }
01669 
01670 int
01671 TAO_Transport::handle_input_parse_data  (TAO_Resume_Handle &rh,
01672                                          ACE_Time_Value * max_wait_time)
01673 {
01674 
01675   if (TAO_debug_level > 3)
01676     {
01677       ACE_DEBUG ((LM_DEBUG,
01678          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01679          ACE_TEXT ("enter\n"),
01680          this->id ()));
01681     }
01682 
01683 
01684   // The buffer on the stack which will be used to hold the input
01685   // messages, ACE_CDR::MAX_ALIGNMENT compensates the
01686   // memory-alignment. This improves performance with SUN-Java-ORB-1.4
01687   // and higher that sends fragmented requests of size 1024 bytes.
01688   char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT];
01689 
01690 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
01691   (void) ACE_OS::memset (buf,
01692                          '\0',
01693                          sizeof buf);
01694 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
01695 
01696   // Create a data block
01697   ACE_Data_Block db (sizeof (buf),
01698                      ACE_Message_Block::MB_DATA,
01699                      buf,
01700                      this->orb_core_->input_cdr_buffer_allocator (),
01701                      this->orb_core_->locking_strategy (),
01702                      ACE_Message_Block::DONT_DELETE,
01703                      this->orb_core_->input_cdr_dblock_allocator ());
01704 
01705   // Create a message block
01706   ACE_Message_Block message_block (&db,
01707                                    ACE_Message_Block::DONT_DELETE,
01708                                    this->orb_core_->input_cdr_msgblock_allocator ());
01709 
01710 
01711   // Align the message block
01712   ACE_CDR::mb_align (&message_block);
01713 
01714   size_t recv_size = 0; // Note: unsigned integer
01715 
01716   // Pointer to newly parsed message
01717   TAO_Queued_Data *q_data = 0;
01718 
01719   // optimizing access of constants
01720   const size_t header_length =
01721             this->messaging_object ()->header_length ();
01722 
01723   // paranoid check
01724   if (header_length > message_block.space ())
01725     {
01726       return -1;
01727     }
01728 
01729   if (this->orb_core_->orb_params ()->single_read_optimization ())
01730     {
01731       recv_size =
01732         message_block.space ();
01733     }
01734   else
01735     {
01736       // Single read optimization has been de-activated. That means
01737       // that we need to read from transport the GIOP header first
01738       // before the payload. This codes first checks the incoming
01739       // stack for partial messages which needs to be
01740       // consolidated. Otherwise we are in new cycle, reading complete
01741       // GIOP header of new incoming message.
01742       if (this->incoming_message_stack_.top (q_data) != -1
01743            && q_data->missing_data_ == TAO_MISSING_DATA_UNDEFINED)
01744         {
01745           // There is a partial message on incoming_message_stack_
01746           // whose length is unknown so far. We need to consolidate
01747           // the GIOP header to get to know the payload size,
01748           recv_size = header_length - q_data->msg_block_->length ();
01749         }
01750       else
01751         {
01752           // Read amount of data forming GIOP header of new incoming
01753           // message.
01754           recv_size = header_length;
01755         }
01756       // POST: 0 <= recv_size <= header_length
01757     }
01758   // POST: 0 <= recv_size <= message_block->space ()
01759 
01760   // If we have a partial message, copy it into our message block and
01761   // clear out the partial message.
01762   if (this->partial_message_ != 0 && this->partial_message_->length () > 0)
01763     {
01764       // (*) Copy back the partial message into current read-buffer,
01765       // verify that the read-strategy of "recv_size" bytes is not
01766       // exceeded. The latter check guarantees that recv_size does not
01767       // roll-over and keeps in range
01768       // 0<=recv_size<=message_block->space()
01769       if (this->partial_message_->length () <= recv_size &&
01770           message_block.copy (this->partial_message_->rd_ptr (),
01771                               this->partial_message_->length ()) == 0)
01772         {
01773 
01774           recv_size -= this->partial_message_->length ();
01775           this->partial_message_->reset ();
01776         }
01777       else
01778         {
01779           return -1;
01780         }
01781     }
01782   // POST: 0 <= recv_size <= buffer_space
01783 
01784   if (0 >= recv_size) // paranoid: the check above (*) guarantees recv_size>=0
01785     {
01786       // This event would cause endless looping, trying frequently to
01787       // read zero bytes from stream.  This might happen, if TAOs
01788       // protocol implementation is not correct and tries to read data
01789       // beyond header without "single_read_optimazation" being
01790       // activated.
01791       if (TAO_debug_level > 0)
01792         {
01793           ACE_ERROR ((LM_ERROR,
01794              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01795              ACE_TEXT ("Error - endless loop detection, closing connection"),
01796              this->id ()));
01797         }
01798       return -1;
01799     }
01800 
01801   // Saving the size of the received buffer in case any one needs to
01802   // get the size of the message thats received in the
01803   // context. Obviously the value will be changed for each recv call
01804   // and the user is supposed to invoke the accessor only in the
01805   // invocation context to get meaningful information.
01806   this->recv_buffer_size_ = recv_size;
01807 
01808   // Read the message into the  message block that we have created on
01809   // the stack.
01810   const ssize_t n = this->recv (message_block.wr_ptr (),
01811                                 recv_size,
01812                                 max_wait_time);
01813 
01814   // If there is an error return to the reactor..
01815   if (n <= 0)
01816     {
01817       return n;
01818     }
01819 
01820   if (TAO_debug_level > 3)
01821     {
01822       ACE_DEBUG ((LM_DEBUG,
01823          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01824          ACE_TEXT ("read %d bytes\n"),
01825          this->id (), n));
01826     }
01827 
01828   // Set the write pointer in the stack buffer
01829   message_block.wr_ptr (n);
01830 
01831   //
01832   // STACK PROCESSING OR MESSAGE CONSOLIDATION
01833   //
01834 
01835   // PRE: data in buffer is aligned && message_block.length() > 0
01836 
01837   if (this->incoming_message_stack_.top (q_data) != -1
01838       && q_data->missing_data_ == TAO_MISSING_DATA_UNDEFINED)
01839     {
01840       //
01841       // MESSAGE CONSOLIDATION
01842       //
01843 
01844       // Partial message on incoming_message_stack_ needs to be
01845       // consolidated.  The message header could not be parsed so far
01846       // and therefor the message size is unknown yet. Consolidating
01847       // the message destroys the memory alignment of succeeding
01848       // messages sharing the buffer, for that reason consolidation
01849       // and stack based processing are mutial exclusive.
01850       if (this->messaging_object ()->consolidate_node (q_data,
01851                                                        message_block) == -1)
01852         {
01853            if (TAO_debug_level > 0)
01854             {
01855                 ACE_ERROR ((LM_ERROR,
01856                    ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01857                    ACE_TEXT ("error consolidating message from input buffer\n"),
01858                    this->id () ));
01859              }
01860            return -1;
01861         }
01862 
01863       // Complete message are to be enqueued and later processed
01864       if (q_data->missing_data_ == 0)
01865         {
01866           if (this->incoming_message_stack_.pop (q_data) == -1)
01867             {
01868               return -1;
01869             }
01870 
01871           if (this->consolidate_enqueue_message (q_data) == -1)
01872             {
01873               return -1;
01874             }
01875         }
01876 
01877       if (message_block.length () > 0
01878           && this->handle_input_parse_extra_messages (message_block) == -1)
01879         {
01880           return -1;
01881         }
01882 
01883       // In any case try to process the enqueued messages
01884       if (this->process_queue_head (rh) == -1)
01885         {
01886           return -1;
01887         }
01888     }
01889   else
01890     {
01891       //
01892       // STACK PROCESSING (critical path)
01893       //
01894 
01895       // Process the first message in buffer on stack
01896 
01897       // (PRE: first message resides in aligned memory) Make a node of
01898       // the message-block..
01899 
01900       TAO_Queued_Data qd (&message_block,
01901                           this->orb_core_->transport_message_buffer_allocator ());
01902 
01903       size_t mesg_length  = 0;
01904 
01905       if (this->messaging_object ()->parse_next_message (message_block,
01906                                                          qd,
01907                                                          mesg_length) == -1
01908           || (qd.missing_data_ == 0
01909               && mesg_length > message_block.length ()) )
01910         {
01911           // extracting message failed
01912           return -1;
01913         }
01914       // POST: qd.missing_data_ == 0 --> mesg_length <= message_block.length()
01915       // This prevents seeking rd_ptr behind the wr_ptr
01916 
01917       if (qd.missing_data_ != 0 ||
01918           qd.more_fragments_   ||
01919           qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
01920         {
01921           if (qd.missing_data_ == 0)
01922             {
01923               // Dealing with a fragment
01924               TAO_Queued_Data *nqd =
01925                 TAO_Queued_Data::duplicate (qd);
01926 
01927               if (nqd == 0)
01928                 {
01929                   return -1;
01930                 }
01931 
01932               // mark the end of message in new buffer
01933               char* end_mark = nqd->msg_block_->rd_ptr ()
01934                              + mesg_length;
01935               nqd->msg_block_->wr_ptr (end_mark);
01936 
01937               // move the read pointer forward in old buffer
01938               message_block.rd_ptr (mesg_length);
01939 
01940               // enqueue the message
01941               if (this->consolidate_enqueue_message (nqd) == -1)
01942                 {
01943                   return -1;
01944                 }
01945 
01946               if (message_block.length () > 0
01947                   && this->handle_input_parse_extra_messages (message_block) == -1)
01948                 {
01949                   return -1;
01950                 }
01951 
01952               // In any case try to process the enqueued messages
01953               if (this->process_queue_head (rh) == -1)
01954                 {
01955                   return -1;
01956                 }
01957             }
01958           else if (qd.missing_data_ != TAO_MISSING_DATA_UNDEFINED)
01959             {
01960               // Incomplete message, must be the last one in buffer
01961 
01962               if (qd.missing_data_ != TAO_MISSING_DATA_UNDEFINED &&
01963                   qd.missing_data_ > message_block.space ())
01964                 {
01965                   // Re-Allocate correct size on heap
01966                   if (ACE_CDR::grow (qd.msg_block_,
01967                                      message_block.length ()
01968                                      + qd.missing_data_) == -1)
01969                     {
01970                       return -1;
01971                     }
01972                 }
01973 
01974               TAO_Queued_Data *nqd =
01975                 TAO_Queued_Data::duplicate (qd);
01976 
01977               if (nqd == 0)
01978                 {
01979                   return -1;
01980                 }
01981 
01982               // move read-pointer to end of buffer
01983               message_block.rd_ptr (message_block.length());
01984 
01985               this->incoming_message_stack_.push (nqd);
01986             }
01987         }
01988       else
01989         {
01990           //
01991           // critical path
01992           //
01993 
01994           // We cant process the message on stack right now. First we
01995           // have got to parse extra messages from message_block,
01996           // putting them into queue.  When this is done we can return
01997           // to process this message, and notifying other threads to
01998           // process the messages in queue.
01999 
02000           char * end_marker = message_block.rd_ptr ()
02001                             + mesg_length;
02002 
02003           if (message_block.length () > mesg_length)
02004             {
02005               // There are more message in data stream to be parsed.
02006               // Safe the rd_ptr to restore later.
02007               char *rd_ptr_stack_mesg = message_block.rd_ptr ();
02008 
02009               // Skip parsed message, jump to next message in buffer
02010               // PRE: mesg_length <= message_block.length ()
02011               message_block.rd_ptr (mesg_length);
02012 
02013               // Extract remaining messages and enqueue them for later
02014               // heap processing
02015               if (this->handle_input_parse_extra_messages (message_block) == -1)
02016                 {
02017                   return -1;
02018                 }
02019 
02020               // correct the end_marker
02021               end_marker = message_block.rd_ptr ();
02022 
02023               // Restore rd_ptr
02024               message_block.rd_ptr (rd_ptr_stack_mesg);
02025             }
02026 
02027           // The following if-else has been copied from
02028           // process_queue_head().  While process_queue_head()
02029           // processes message on heap, here we will process a message
02030           // on stack.
02031 
02032           // Now that we have one message on stack to be processed,
02033           // check whether we have one more message in the queue...
02034           if (this->incoming_message_queue_.queue_length () > 0)
02035             {
02036               if (TAO_debug_level > 0)
02037                 {
02038                   ACE_DEBUG ((LM_DEBUG,
02039                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02040                      ACE_TEXT ("notify reactor\n"),
02041                      this->id ()));
02042 
02043                 }
02044 
02045               const int retval = this->notify_reactor ();
02046 
02047               if (retval == 1)
02048                 {
02049                   // Let the class know that it doesn't need to resume  the
02050                   // handle..
02051                   rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02052                 }
02053               else if (retval < 0)
02054                 return -1;
02055             }
02056           else
02057             {
02058               // As there are no further messages in queue just resume
02059               // the handle. Set the flag incase someone had reset the flag..
02060               rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02061             }
02062 
02063           // PRE: incoming_message_queue is empty
02064           if (this->process_parsed_messages (&qd,
02065                                              rh) == -1)
02066             {
02067               return -1;
02068             }
02069 
02070           // move the rd_ptr tp position of end_marker
02071           message_block.rd_ptr (end_marker);
02072         }
02073     }
02074 
02075   // Now that all cases have been processed, there might be kept some data
02076   // in buffer that needs to be safed for next "handle_input" invocations.
02077    if (message_block.length () > 0)
02078      {
02079        if (this->partial_message_ == 0)
02080          {
02081            this->allocate_partial_message_block ();
02082          }
02083 
02084        if (this->partial_message_ != 0 &&
02085            this->partial_message_->copy (message_block.rd_ptr (),
02086                                          message_block.length ()) == 0)
02087          {
02088            message_block.rd_ptr (message_block.length ());
02089          }
02090        else
02091          {
02092            return -1;
02093          }
02094      }
02095 
02096    return 0;
02097 }
02098 
02099 
02100 int
02101 TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
02102                                         TAO_Resume_Handle &rh)
02103 {
02104   if (TAO_debug_level > 7)
02105     {
02106       ACE_DEBUG ((LM_DEBUG,
02107          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02108          ACE_TEXT ("entering (missing data == %d)\n"),
02109          this->id(), qd->missing_data_));
02110     }
02111 
02112   // Get the <message_type> that we have received
02113   const TAO_Pluggable_Message_Type t = qd->msg_type_;
02114 
02115   // int result = 0;
02116 
02117   if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION)
02118     {
02119       if (TAO_debug_level > 0)
02120         ACE_DEBUG ((LM_DEBUG,
02121            ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02122            ACE_TEXT ("received CloseConnection message - %m\n"),
02123            this->id()));
02124 
02125       // Return a "-1" so that the next stage can take care of
02126       // closing connection and the necessary memory management.
02127       return -1;
02128     }
02129   else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST ||
02130            t == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST)
02131     {
02132       // Let us resume the handle before we go ahead to process the
02133       // request. This will open up the handle for other threads.
02134       rh.resume_handle ();
02135 
02136       if (this->messaging_object ()->process_request_message (
02137             this,
02138             qd) == -1)
02139         {
02140           // Return a "-1" so that the next stage can take care of
02141           // closing connection and the necessary memory management.
02142           return -1;
02143         }
02144     }
02145   else if (t == TAO_PLUGGABLE_MESSAGE_REPLY ||
02146            t == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY)
02147     {
02148       rh.resume_handle ();
02149 
02150       TAO_Pluggable_Reply_Params params (this);
02151 
02152       if (this->messaging_object ()->process_reply_message (params,
02153                                                             qd) == -1)
02154         {
02155           if (TAO_debug_level > 0)
02156             ACE_DEBUG ((LM_DEBUG,
02157                ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02158                ACE_TEXT ("error in process_reply_message - %m\n"),
02159                this->id ()));
02160 
02161           return -1;
02162         }
02163 
02164     }
02165   else if (t == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST)
02166     {
02167       // The associated request might be incomplpete residing
02168       // fragmented in messaging object. We must make sure the
02169       // resources allocated by fragments are released.
02170 
02171       if (this->messaging_object ()->discard_fragmented_message (qd) == -1)
02172         {
02173           if (TAO_debug_level > 0)
02174             {
02175               ACE_ERROR ((LM_ERROR,
02176                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02177                  ACE_TEXT ("error processing CancelRequest\n"),
02178                  this->id ()));
02179             }
02180         }
02181 
02182       // We are not able to cancel requests being processed already;
02183       // this is declared as optional feature by CORBA, and TAO does
02184       // not support this currently.
02185 
02186       // Just continue processing, CancelRequest does not mean to cut
02187       // off the connection.
02188     }
02189   else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
02190     {
02191       if (TAO_debug_level > 0)
02192         {
02193           ACE_ERROR ((LM_ERROR,
02194              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02195              ACE_TEXT ("received MessageError, closing connection\n"),
02196              this->id ()));
02197         }
02198       return -1;
02199     }
02200 
02201   // If not, just return back..
02202   return 0;
02203 }
02204 
02205 int
02206 TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
02207 {
02208   if (TAO_debug_level > 3)
02209     {
02210       ACE_DEBUG ((LM_DEBUG,
02211          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"),
02212          this->id (), this->incoming_message_queue_.queue_length () ));
02213     }
02214 
02215   // See if  message in queue ...
02216   if (this->incoming_message_queue_.queue_length () > 0)
02217     {
02218       // Get the message on the head of the queue..
02219       TAO_Queued_Data *qd =
02220         this->incoming_message_queue_.dequeue_head ();
02221 
02222       if (TAO_debug_level > 3)
02223         {
02224           ACE_DEBUG ((LM_DEBUG,
02225              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02226              ACE_TEXT ("the size of the queue is [%d]\n"),
02227              this->id (),
02228              this->incoming_message_queue_.queue_length()));
02229         }
02230       // Now that we have pulled out out one message out of the queue,
02231       // check whether we have one more message in the queue...
02232       if (this->incoming_message_queue_.queue_length () > 0)
02233         {
02234           if (TAO_debug_level > 0)
02235             {
02236               ACE_DEBUG ((LM_DEBUG,
02237                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02238                  ACE_TEXT ("notify reactor\n"),
02239                  this->id ()));
02240 
02241             }
02242 
02243           const int retval = this->notify_reactor ();
02244 
02245           if (retval == 1)
02246             {
02247               // Let the class know that it doesn't need to resume  the
02248               // handle..
02249               rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02250             }
02251           else if (retval < 0)
02252             return -1;
02253         }
02254       else
02255         {
02256           // As we are ready to process the last message just resume
02257           // the handle. Set the flag incase someone had reset the flag..
02258           rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02259         }
02260 
02261       // Process the message...
02262       if (this->process_parsed_messages (qd, rh) == -1)
02263         {
02264           return -1;
02265         }
02266 
02267       // Delete the Queued_Data..
02268       TAO_Queued_Data::release (qd);
02269 
02270       return 0;
02271     }
02272 
02273   return 1;
02274 }
02275 
02276 int
02277 TAO_Transport::notify_reactor (void)
02278 {
02279   if (!this->ws_->is_registered ())
02280     {
02281       return 0;
02282     }
02283 
02284   ACE_Event_Handler *eh = this->event_handler_i ();
02285 
02286   // Get the reactor associated with the event handler
02287   ACE_Reactor *reactor = this->orb_core ()->reactor ();
02288 
02289   if (TAO_debug_level > 0)
02290     {
02291       ACE_DEBUG ((LM_DEBUG,
02292          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02293          ACE_TEXT ("notify to Reactor\n"),
02294          this->id ()));
02295     }
02296 
02297 
02298   // Send a notification to the reactor...
02299   const int retval = reactor->notify (eh,
02300                                       ACE_Event_Handler::READ_MASK);
02301 
02302   if (retval < 0 && TAO_debug_level > 2)
02303     {
02304       // @@todo: need to think about what is the action that
02305       // we can take when we get here.
02306       ACE_DEBUG ((LM_DEBUG,
02307          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02308          ACE_TEXT ("notify to the reactor failed..\n"),
02309          this->id ()));
02310     }
02311 
02312   return 1;
02313 }
02314 
02315 TAO::Transport_Cache_Manager &
02316 TAO_Transport::transport_cache_manager (void)
02317 {
02318   return this->orb_core_->lane_resources ().transport_cache ();
02319 }
02320 
02321 void
02322 TAO_Transport::assign_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp)
02323 {
02324   if (this->char_translator_)
02325     {
02326       this->char_translator_->assign (inp);
02327       this->char_translator_->assign (outp);
02328     }
02329   if (this->wchar_translator_)
02330     {
02331       this->wchar_translator_->assign (inp);
02332       this->wchar_translator_->assign (outp);
02333     }
02334 }
02335 
02336 void
02337 TAO_Transport::clear_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp)
02338 {
02339   if (inp)
02340     {
02341       inp->char_translator (0);
02342       inp->wchar_translator (0);
02343     }
02344   if (outp)
02345     {
02346       outp->char_translator (0);
02347       outp->wchar_translator (0);
02348     }
02349 }
02350 
02351 ACE_Event_Handler::Reference_Count
02352 TAO_Transport::add_reference (void)
02353 {
02354   return this->event_handler_i ()->add_reference ();
02355 }
02356 
02357 ACE_Event_Handler::Reference_Count
02358 TAO_Transport::remove_reference (void)
02359 {
02360   return this->event_handler_i ()->remove_reference ();
02361 }
02362 
02363 TAO_OutputCDR &
02364 TAO_Transport::out_stream (void)
02365 {
02366   return this->messaging_object ()->out_stream ();
02367 }
02368 
02369 bool
02370 TAO_Transport::post_open (size_t id)
02371 {
02372   this->id_ = id;
02373 
02374   {
02375     ACE_GUARD_RETURN (ACE_Lock,
02376                       ace_mon,
02377                       *this->handler_lock_,
02378                       false);
02379     this->is_connected_ = true;
02380   }
02381 
02382   // When we have data in our outgoing queue schedule ourselves
02383   // for output
02384   if (this->queue_is_empty_i ())
02385     return true;
02386 
02387   // If the wait strategy wants us to be registered with the reactor
02388   // then we do so. If registeration is required and it succeeds,
02389   // #REFCOUNT# becomes two.
02390   if (this->wait_strategy ()->register_handler () == 0)
02391     {
02392       TAO_Flushing_Strategy *flushing_strategy =
02393         this->orb_core ()->flushing_strategy ();
02394       (void) flushing_strategy->schedule_output (this);
02395     }
02396   else
02397     {
02398       // Registration failures.
02399 
02400       // Purge from the connection cache, if we are not in the cache, this
02401       // just does nothing.
02402       (void) this->purge_entry ();
02403 
02404       // Close the handler.
02405       (void) this->close_connection ();
02406 
02407       if (TAO_debug_level > 0)
02408         ACE_ERROR ((LM_ERROR,
02409            ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_connect , ")
02410            ACE_TEXT ("could not register the transport ")
02411            ACE_TEXT ("in the reactor.\n"),
02412            this->id ()));
02413 
02414       return false;
02415     }
02416 
02417   return true;
02418 }
02419 
02420 void
02421 TAO_Transport::allocate_partial_message_block (void)
02422 {
02423   if (this->partial_message_ == 0)
02424     {
02425       // This value must be at least large enough to hold a GIOP message
02426       // header plus a GIOP fragment header
02427       const size_t partial_message_size =
02428         this->messaging_object ()->header_length ();
02429        // + this->messaging_object ()->fragment_header_length ();
02430        // deprecated, conflicts with not-single_read_opt.
02431 
02432       ACE_NEW (this->partial_message_,
02433                ACE_Message_Block (partial_message_size));
02434     }
02435 }
02436 
02437 /*
02438  * Hook to add concrete implementations from the derived class onto
02439  * TAO's transport.
02440  */
02441 
02442 //@@ TAO_TRANSPORT_SPL_METHODS_ADD_HOOK
02443 
02444 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 11:54:26 2006 for TAO by doxygen 1.3.6