Connection_Handler.cpp

Go to the documentation of this file.
00001 //$Id: Connection_Handler.cpp 79169 2007-08-02 08:41:23Z johnnyw $
00002 
00003 #include "tao/Connection_Handler.h"
00004 #include "tao/ORB_Core.h"
00005 #include "tao/debug.h"
00006 #include "tao/Resume_Handle.h"
00007 #include "tao/Transport.h"
00008 #include "tao/Wait_Strategy.h"
00009 
00010 #include "ace/SOCK.h"
00011 #include "ace/Reactor.h"
00012 #include "ace/os_include/sys/os_socket.h"
00013 
00014 //@@ CONNECTION_HANDLER_SPL_INCLUDE_FORWARD_DECL_ADD_HOOK
00015 
00016 #if !defined (__ACE_INLINE__)
00017 #include "tao/Connection_Handler.inl"
00018 #endif /* __ACE_INLINE__ */
00019 
00020 ACE_RCSID (tao,
00021            Connection_Handler,
00022            "$Id: Connection_Handler.cpp 79169 2007-08-02 08:41:23Z johnnyw $")
00023 
00024 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00025 
00026 TAO_Connection_Handler::TAO_Connection_Handler (TAO_ORB_Core *orb_core)
00027   : orb_core_ (orb_core),
00028     transport_ (0),
00029     connection_pending_ (false)
00030 {
00031   // @todo: We need to have a distinct option/ method in the resource
00032   // factory for this and TAO_Transport.
00033   this->lock_ =
00034     this->orb_core_->resource_factory ()->create_cached_connection_lock ();
00035 
00036   // Put ourselves in the connection wait state as soon as we get
00037   // created
00038   this->state_changed (TAO_LF_Event::LFS_CONNECTION_WAIT,
00039                        this->orb_core_->leader_follower ());
00040 }
00041 
00042 TAO_Connection_Handler::~TAO_Connection_Handler (void)
00043 {
00044   // @@ TODO Use auto_ptr<>
00045   delete this->lock_;
00046 
00047   //@@ CONNECTION_HANDLER_DESTRUCTOR_ADD_HOOK
00048 }
00049 
00050 int
00051 TAO_Connection_Handler::shared_open (void)
00052 {
00053   // This reference counting is related to asynch connections.  It
00054   // should probably be managed by the ACE_Strategy_Connector, since
00055   // that's really the reference being managed here.  also, whether
00056   // open ultimately succeeds or fails, the connection attempted is
00057   // ending, so the reference must be removed in any case.
00058   this->cancel_pending_connection();
00059 
00060   return 0;
00061 }
00062 
00063 int
00064 TAO_Connection_Handler::set_socket_option (ACE_SOCK &sock,
00065                                            int snd_size,
00066                                            int rcv_size)
00067 {
00068 #if !defined (ACE_LACKS_SOCKET_BUFSIZ)
00069 
00070   if (snd_size != 0
00071       && sock.set_option (SOL_SOCKET,
00072                           SO_SNDBUF,
00073                           (void *) &snd_size,
00074                           sizeof (snd_size)) == -1
00075       && errno != ENOTSUP)
00076   {
00077     return -1;
00078   }
00079 
00080   if (rcv_size != 0
00081       && sock.set_option (SOL_SOCKET,
00082                           SO_RCVBUF,
00083                           (void *) &rcv_size,
00084                           sizeof (int)) == -1
00085       && errno != ENOTSUP)
00086   {
00087     return -1;
00088   }
00089 #else
00090    ACE_UNUSED_ARG (snd_size);
00091    ACE_UNUSED_ARG (rcv_size);
00092 #endif /* !ACE_LACKS_SOCKET_BUFSIZ */
00093 
00094   // Set the close-on-exec flag for that file descriptor. If the
00095   // operation fails we are out of luck (some platforms do not support
00096   // it and return -1).
00097   (void) sock.enable (ACE_CLOEXEC);
00098 
00099   return 0;
00100 }
00101 
00102 int
00103 TAO_Connection_Handler::svc_i (void)
00104 {
00105   int result = 0;
00106 
00107   if (TAO_debug_level > 0)
00108     ACE_DEBUG ((LM_DEBUG,
00109                 ACE_TEXT ("TAO (%P|%t) - Connection_Handler::svc_i begin\n")));
00110 
00111   // Here we simply synthesize the "typical" event loop one might find
00112   // in a reactive handler, except that this can simply block waiting
00113   // for input.
00114 
00115   ACE_Time_Value *max_wait_time = 0;
00116   ACE_Time_Value timeout;
00117   ACE_Time_Value current_timeout;
00118 
00119   if (this->orb_core_->thread_per_connection_timeout (timeout))
00120     {
00121       current_timeout = timeout;
00122       max_wait_time = &current_timeout;
00123     }
00124 
00125   TAO_Resume_Handle rh (this->orb_core_,
00126                         ACE_INVALID_HANDLE);
00127 
00128   // We exit of the loop if
00129   // - If the ORB core is shutdown by another thread
00130   // - Or if the transport is null. This could happen if an error
00131   // occured.
00132   // - Or if during processing a return value of -1 is received.
00133   while (!this->orb_core_->has_shutdown ()
00134          && result >= 0)
00135     {
00136       // Let the transport know that it is used
00137       (void) this->transport ()->update_transport ();
00138 
00139       result =
00140         this->transport ()->handle_input (rh,
00141                                           max_wait_time);
00142 
00143       if (result == -1 && errno == ETIME)
00144         {
00145           // Ignore timeouts, they are only used to wake up and
00146           // shutdown.
00147           result = 0;
00148 
00149           // Reset errno to make sure we don't trip over an old value
00150           // of errno in case it is not reset when the recv() call
00151           // fails if the socket has been closed.
00152           errno = 0;
00153         }
00154       else if (result == -1)
00155         {
00156           // Something went wrong with the socket. Just quit
00157           return result;
00158         }
00159 
00160       current_timeout = timeout;
00161 
00162       if (TAO_debug_level > 0)
00163         ACE_DEBUG ((LM_DEBUG,
00164                     "TAO (%P|%t) - Connection_Handler::svc_i - "
00165                     "loop <%d>\n", current_timeout.msec ()));
00166     }
00167 
00168   if (TAO_debug_level > 0)
00169     ACE_DEBUG  ((LM_DEBUG,
00170                  "TAO (%P|%t) - Connection_Handler::svc_i end\n"));
00171 
00172   return result;
00173 }
00174 
00175 void
00176 TAO_Connection_Handler::transport (TAO_Transport* transport)
00177 {
00178   this->transport_ = transport;
00179 
00180   // Enable reference counting on the event handler.
00181   this->transport_->event_handler_i ()->reference_counting_policy ().value (
00182       ACE_Event_Handler::Reference_Counting_Policy::ENABLED
00183     );
00184 }
00185 
00186 int
00187 TAO_Connection_Handler::handle_output_eh (
00188     ACE_HANDLE, ACE_Event_Handler * eh)
00189 {
00190   // Let the transport that it is going to be used
00191   (void) this->transport ()->update_transport ();
00192 
00193   // Instantiate the resume handle here.. This will automatically
00194   // resume the handle once data is written..
00195   TAO_Resume_Handle resume_handle (this->orb_core (),
00196                                    eh->get_handle ());
00197 
00198   int return_value = 0;
00199   this->pre_io_hook (return_value);
00200   if (return_value != 0)
00201     {
00202       resume_handle.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
00203       return return_value;
00204     }
00205 
00206   return_value = this->transport ()->handle_output ();
00207 
00208   this->pos_io_hook (return_value);
00209 
00210   if (return_value != 0)
00211     {
00212       resume_handle.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
00213     }
00214 
00215   return return_value;
00216 }
00217 
00218 int
00219 TAO_Connection_Handler::handle_input_eh (
00220   ACE_HANDLE h, ACE_Event_Handler *eh)
00221 {
00222   // If we can't process upcalls just return
00223   if (!this->transport ()->wait_strategy ()->can_process_upcalls ())
00224     {
00225       if (TAO_debug_level > 6)
00226         ACE_DEBUG ((LM_DEBUG,
00227                     "TAO (%P|%t) - Connection_Handler[%d]::handle_input_eh, "
00228                     "not going to handle_input on transport "
00229                     "because upcalls temporarily suspended on this thread\n",
00230                     this->transport()->id()));
00231       return 0;
00232     }
00233 
00234   int const result = this->handle_input_internal (h, eh);
00235 
00236   if (result == -1)
00237     {
00238       this->close_connection ();
00239       return 0;
00240     }
00241 
00242   return result;
00243 }
00244 
00245 int
00246 TAO_Connection_Handler::handle_input_internal (
00247     ACE_HANDLE h, ACE_Event_Handler * eh)
00248 {
00249   // Let the transport know that it is used
00250   (void) this->transport ()->update_transport ();
00251 
00252   // Grab the transport id now and use the cached value for printing
00253   // since the  transport could dissappear by the time the thread
00254   // returns.
00255   size_t const t_id =
00256     this->transport ()->id ();
00257 
00258   if (TAO_debug_level > 6)
00259     {
00260       ACE_HANDLE handle = eh->get_handle();
00261       ACE_DEBUG ((LM_DEBUG,
00262                   "TAO (%P|%t) - Connection_Handler[%d]::handle_input, "
00263                   "handle = %d/%d\n",
00264                   t_id, handle, h));
00265     }
00266 
00267   TAO_Resume_Handle resume_handle (this->orb_core (), eh->get_handle ());
00268 
00269   int return_value = 0;
00270 
00271   this->pre_io_hook (return_value);
00272   if (return_value != 0)
00273     return return_value;
00274 
00275   return_value = this->transport ()->handle_input (resume_handle);
00276 
00277   this->pos_io_hook (return_value);
00278 
00279   // Bug 1647; might need to change resume_handle's flag or
00280   // change handle_input return value.
00281   resume_handle.handle_input_return_value_hook(return_value);
00282 
00283   if (TAO_debug_level > 6)
00284     {
00285       ACE_HANDLE handle = eh->get_handle ();
00286       ACE_DEBUG ((LM_DEBUG,
00287                   "TAO (%P|%t) - Connection_Handler[%d]::handle_input, "
00288                   "handle = %d/%d, retval = %d\n",
00289                   t_id, handle, h, return_value));
00290     }
00291 
00292   if (return_value == -1)
00293     resume_handle.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
00294   return return_value;
00295 }
00296 
00297 int
00298 TAO_Connection_Handler::close_connection_eh (ACE_Event_Handler *eh)
00299 {
00300   // Save the ID for debugging messages
00301   ACE_HANDLE handle = eh->get_handle ();
00302 
00303   size_t const id = this->transport ()->id ();
00304   if (TAO_debug_level)
00305     {
00306       ACE_DEBUG  ((LM_DEBUG,
00307                    "TAO (%P|%t) - Connection_Handler[%d]::"
00308                    "close_connection_eh, purging entry from cache\n",
00309                    handle));
00310     }
00311 
00312   this->transport ()->pre_close ();
00313 
00314   // @@ This seems silly, but if we have no reason to be in the
00315   // reactor, then we dont remove ourselves.
00316   if (this->transport ()->wait_strategy ()->is_registered ())
00317     {
00318       ACE_Reactor *eh_reactor = eh->reactor ();
00319 
00320       if (this->orb_core_->has_shutdown () == 0)
00321         {
00322           // If the ORB is nil, get the reactor from orb_core which gets it
00323           // from LF.
00324           if (eh_reactor == 0)
00325             eh_reactor = this->transport()->orb_core()->reactor ();
00326         }
00327 
00328       // The Reactor must not be null, otherwise something else is
00329       // horribly broken.
00330       ACE_ASSERT (eh_reactor != 0);
00331 
00332       if (TAO_debug_level)
00333         {
00334           ACE_DEBUG  ((LM_DEBUG,
00335                        "TAO (%P|%t) - Connection_Handler[%d]::"
00336                        "close_connection_eh, removing from the reactor\n",
00337                        handle));
00338         }
00339 
00340       // Use id instead of handle. Why? "handle" may be invalid for RW
00341       // cases  when drop_reply_on_shutdown is on, and when the
00342       // orb_core is shutting down. This means that the handler will
00343       // be left behind in the reactor which would create problems
00344       // later. Just forcefully remove them. If none exists reactor
00345       // will make things safer.
00346       ACE_HANDLE tmp_handle = handle;
00347       if (this->orb_core_->has_shutdown ())
00348         tmp_handle = (ACE_HANDLE) id;
00349 
00350       eh_reactor->remove_handler (tmp_handle,
00351                                   ACE_Event_Handler::ALL_EVENTS_MASK |
00352                                   ACE_Event_Handler::DONT_CALL);
00353 
00354       // Also cancel any timers, we may create those for time-limited
00355       // buffering
00356       if (TAO_debug_level)
00357         {
00358           ACE_DEBUG  ((LM_DEBUG,
00359                        "TAO (%P|%t) - Connection_Handler[%d]::"
00360                        "close_connection_eh, cancel all timers\n",
00361                        handle));
00362         }
00363 
00364       eh_reactor->cancel_timer (eh);
00365 
00366       // @@ This seems silly, the reactor is a much better authority to
00367       //    find out if a handle is registered...
00368       this->transport ()->wait_strategy ()->is_registered (false);
00369     }
00370 
00371   // This call should be made only after the cache and reactor are
00372   // cleaned up. This call can make upcalls to the application which
00373   // in turn can make remote calls (Bug 1551 and Bug 1482). The remote
00374   // calls from the application can try to use this handler from the
00375   // cache or from the reactor. So clean them up before this is
00376   // called.
00377   this->transport ()->send_connection_closed_notifications ();
00378   this->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
00379                        this->orb_core_->leader_follower ());
00380 
00381   if (TAO_debug_level)
00382     {
00383       ACE_DEBUG  ((LM_DEBUG,
00384                    "TAO (%P|%t) - Connection_Handler[%d]::"
00385                    "close_connection_eh\n",
00386                    id));
00387     }
00388 
00389   return 1;
00390 }
00391 
00392 /*
00393  * Comment hook to comment the base class implementations
00394  * that do nothing. Specialized versions from derived
00395  * class will directly override these methods. Add
00396  * all methods that are virtual, have do nothing implementations
00397  * within this hook for later specialization.
00398  */
00399 //@@ CONNECTION_HANDLER_SPL_COMMENT_HOOK_START
00400 
00401 int
00402 TAO_Connection_Handler::set_dscp_codepoint (CORBA::Boolean)
00403 {
00404   return 0;
00405 }
00406 
00407 int
00408 TAO_Connection_Handler::set_dscp_codepoint (CORBA::Long)
00409 {
00410   return 0;
00411 }
00412 
00413 int
00414 TAO_Connection_Handler::release_os_resources (void)
00415 {
00416   return 0;
00417 }
00418 
00419 //@@ CONNECTION_HANDLER_SPL_COMMENT_HOOK_END
00420 
00421 void
00422 TAO_Connection_Handler::pre_io_hook (int &)
00423 {
00424 }
00425 
00426 void
00427 TAO_Connection_Handler::pos_io_hook (int &)
00428 {
00429 }
00430 
00431 int
00432 TAO_Connection_Handler::close_handler (void)
00433 {
00434   this->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
00435                        this->orb_core_->leader_follower ());
00436   this->transport ()->purge_entry();
00437   this->transport ()->remove_reference ();
00438 
00439   // @@ I think that if the connection_pending state is true
00440   // when close_handler is calld, we should probably release
00441   // another reference so that the connector doesn't have to
00442   // worry about it.
00443 
00444   return 0;
00445 }
00446 
00447 //@@ CONNECTION_HANDLER_SPL_METHODS_ADD_HOOK
00448 
00449 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Sun Jan 27 13:07:31 2008 for TAO by doxygen 1.3.6