00001
00002
00003 #include "tao/Connection_Handler.h"
00004 #include "tao/ORB_Core.h"
00005 #include "tao/debug.h"
00006 #include "tao/Resume_Handle.h"
00007 #include "tao/Transport.h"
00008 #include "tao/Wait_Strategy.h"
00009
00010 #include "ace/SOCK.h"
00011 #include "ace/Reactor.h"
00012 #include "ace/os_include/sys/os_socket.h"
00013 #include "ace/Svc_Handler.h"
00014
00015
00016
00017 #if !defined (__ACE_INLINE__)
00018 #include "tao/Connection_Handler.inl"
00019 #endif
00020
00021 ACE_RCSID (tao,
00022 Connection_Handler,
00023 "$Id: Connection_Handler.cpp 86593 2009-09-02 14:58:00Z vzykov $")
00024
00025 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00026
00027 TAO_Connection_Handler::TAO_Connection_Handler (TAO_ORB_Core *orb_core)
00028 : orb_core_ (orb_core),
00029 transport_ (0),
00030 connection_pending_ (false),
00031 is_closed_ (false)
00032 {
00033
00034
00035 this->state_changed (TAO_LF_Event::LFS_CONNECTION_WAIT,
00036 this->orb_core_->leader_follower ());
00037 }
00038
00039 TAO_Connection_Handler::~TAO_Connection_Handler (void)
00040 {
00041
00042 }
00043
00044 int
00045 TAO_Connection_Handler::shared_open (void)
00046 {
00047
00048
00049
00050
00051
00052 this->cancel_pending_connection();
00053
00054 return 0;
00055 }
00056
00057 int
00058 TAO_Connection_Handler::set_socket_option (ACE_SOCK &sock,
00059 int snd_size,
00060 int rcv_size)
00061 {
00062 #if !defined (ACE_LACKS_SO_SNDBUF)
00063 if (snd_size != 0
00064 && sock.set_option (SOL_SOCKET,
00065 SO_SNDBUF,
00066 (void *) &snd_size,
00067 sizeof (snd_size)) == -1
00068 && errno != ENOTSUP)
00069 {
00070 return -1;
00071 }
00072 #endif
00073
00074 #if !defined (ACE_LACKS_SO_RCVBUF)
00075 if (rcv_size != 0
00076 && sock.set_option (SOL_SOCKET,
00077 SO_RCVBUF,
00078 (void *) &rcv_size,
00079 sizeof (int)) == -1
00080 && errno != ENOTSUP)
00081 {
00082 return -1;
00083 }
00084 #endif
00085
00086 #if defined (ACE_LACKS_SO_SNDBUF) && defined (ACE_LACKS_SO_RCVBUF)
00087 ACE_UNUSED_ARG (snd_size);
00088 ACE_UNUSED_ARG (rcv_size);
00089 #endif
00090
00091
00092
00093
00094 (void) sock.enable (ACE_CLOEXEC);
00095
00096 return 0;
00097 }
00098
00099 int
00100 TAO_Connection_Handler::svc_i (void)
00101 {
00102 int result = 0;
00103
00104 if (TAO_debug_level > 0)
00105 ACE_DEBUG ((LM_DEBUG,
00106 ACE_TEXT ("TAO (%P|%t) - Connection_Handler::svc_i begin\n")));
00107
00108
00109
00110
00111
00112 ACE_Time_Value *max_wait_time = 0;
00113 ACE_Time_Value timeout;
00114 ACE_Time_Value current_timeout;
00115
00116 if (this->orb_core_->thread_per_connection_timeout (timeout))
00117 {
00118 current_timeout = timeout;
00119 max_wait_time = ¤t_timeout;
00120 }
00121
00122 TAO_Resume_Handle rh (this->orb_core_,
00123 ACE_INVALID_HANDLE);
00124
00125
00126
00127
00128
00129
00130 while (!this->orb_core_->has_shutdown ()
00131 && result >= 0)
00132 {
00133
00134 (void) this->transport ()->update_transport ();
00135
00136 result = this->transport ()->handle_input (rh, max_wait_time);
00137
00138 if (result == -1 && errno == ETIME)
00139 {
00140
00141
00142 result = 0;
00143
00144
00145
00146
00147 errno = 0;
00148 }
00149 else if (result == -1)
00150 {
00151
00152 return result;
00153 }
00154
00155 current_timeout = timeout;
00156
00157 if (TAO_debug_level > 0)
00158 ACE_DEBUG ((LM_DEBUG,
00159 "TAO (%P|%t) - Connection_Handler::svc_i - "
00160 "loop <%d>\n", current_timeout.msec ()));
00161 }
00162
00163 if (TAO_debug_level > 0)
00164 ACE_DEBUG ((LM_DEBUG,
00165 "TAO (%P|%t) - Connection_Handler::svc_i end\n"));
00166
00167 return result;
00168 }
00169
00170 void
00171 TAO_Connection_Handler::transport (TAO_Transport* transport)
00172 {
00173 this->transport_ = transport;
00174
00175
00176 this->transport_->event_handler_i ()->reference_counting_policy ().value (
00177 ACE_Event_Handler::Reference_Counting_Policy::ENABLED
00178 );
00179 }
00180
00181 int
00182 TAO_Connection_Handler::handle_output_eh (
00183 ACE_HANDLE, ACE_Event_Handler * eh)
00184 {
00185
00186 (void) this->transport ()->update_transport ();
00187
00188
00189
00190 TAO_Resume_Handle resume_handle (this->orb_core (),
00191 eh->get_handle ());
00192
00193 int return_value = 0;
00194 this->pre_io_hook (return_value);
00195 if (return_value != 0)
00196 {
00197 resume_handle.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
00198 return return_value;
00199 }
00200
00201
00202 TAO::Transport::Drain_Constraints dc;
00203 if (this->transport ()->handle_output (dc) == TAO_Transport::DR_ERROR)
00204 {
00205 return_value = -1;
00206 }
00207
00208 this->pos_io_hook (return_value);
00209
00210 if (return_value != 0)
00211 {
00212 resume_handle.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
00213 }
00214
00215 return return_value;
00216 }
00217
00218 int
00219 TAO_Connection_Handler::handle_input_eh (ACE_HANDLE h, ACE_Event_Handler *eh)
00220 {
00221
00222 if (!this->transport ()->wait_strategy ()->can_process_upcalls ())
00223 {
00224 if (TAO_debug_level > 6)
00225 ACE_DEBUG ((LM_DEBUG,
00226 "TAO (%P|%t) - Connection_Handler[%d]::handle_input_eh, "
00227 "not going to handle_input on transport "
00228 "because upcalls temporarily suspended on this thread\n",
00229 this->transport()->id()));
00230 return 0;
00231 }
00232
00233 int const result = this->handle_input_internal (h, eh);
00234
00235 if (result == -1)
00236 {
00237 this->close_connection ();
00238 return 0;
00239 }
00240
00241 return result;
00242 }
00243
00244 int
00245 TAO_Connection_Handler::handle_input_internal (
00246 ACE_HANDLE h, ACE_Event_Handler * eh)
00247 {
00248
00249 (void) this->transport ()->update_transport ();
00250
00251
00252
00253
00254 size_t const t_id = this->transport ()->id ();
00255
00256 if (TAO_debug_level > 6)
00257 {
00258 ACE_HANDLE handle = eh->get_handle();
00259 ACE_DEBUG ((LM_DEBUG,
00260 "TAO (%P|%t) - Connection_Handler[%d]::handle_input, "
00261 "handle = %d/%d\n",
00262 t_id, handle, h));
00263 }
00264
00265 TAO_Resume_Handle resume_handle (this->orb_core (), eh->get_handle ());
00266
00267 int return_value = 0;
00268
00269 this->pre_io_hook (return_value);
00270
00271 if (return_value != 0)
00272 return return_value;
00273
00274 return_value = this->transport ()->handle_input (resume_handle);
00275
00276 this->pos_io_hook (return_value);
00277
00278
00279
00280 resume_handle.handle_input_return_value_hook(return_value);
00281
00282 if (TAO_debug_level > 6)
00283 {
00284 ACE_HANDLE handle = eh->get_handle ();
00285 ACE_DEBUG ((LM_DEBUG,
00286 "TAO (%P|%t) - Connection_Handler[%d]::handle_input, "
00287 "handle = %d/%d, retval = %d\n",
00288 t_id, handle, h, return_value));
00289 }
00290
00291 if (return_value == -1)
00292 resume_handle.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
00293 return return_value;
00294 }
00295
00296 int
00297 TAO_Connection_Handler::close_connection_eh (ACE_Event_Handler *eh)
00298 {
00299
00300 this->is_closed_ = true;
00301
00302
00303 ACE_HANDLE const handle = eh->get_handle ();
00304
00305 size_t const id = this->transport ()->id ();
00306 if (TAO_debug_level)
00307 {
00308 ACE_DEBUG ((LM_DEBUG,
00309 "TAO (%P|%t) - Connection_Handler[%d]::"
00310 "close_connection_eh, purging entry from cache\n",
00311 handle));
00312 }
00313
00314 this->transport ()->pre_close ();
00315
00316
00317
00318 if (this->transport ()->wait_strategy ()->is_registered ())
00319 {
00320 ACE_Reactor *eh_reactor = eh->reactor ();
00321
00322 if (this->orb_core_->has_shutdown () == 0)
00323 {
00324
00325
00326 if (eh_reactor == 0)
00327 eh_reactor = this->transport()->orb_core()->reactor ();
00328 }
00329
00330
00331
00332 ACE_ASSERT (eh_reactor != 0);
00333
00334 if (TAO_debug_level)
00335 {
00336 ACE_DEBUG ((LM_DEBUG,
00337 "TAO (%P|%t) - Connection_Handler[%d]::"
00338 "close_connection_eh, removing from the reactor\n",
00339 handle));
00340 }
00341
00342
00343
00344
00345
00346
00347
00348 ACE_HANDLE tmp_handle = handle;
00349 if (this->orb_core_->has_shutdown ())
00350 tmp_handle = (ACE_HANDLE) id;
00351
00352 eh_reactor->remove_handler (tmp_handle,
00353 ACE_Event_Handler::ALL_EVENTS_MASK |
00354 ACE_Event_Handler::DONT_CALL);
00355
00356
00357
00358 if (TAO_debug_level)
00359 {
00360 ACE_DEBUG ((LM_DEBUG,
00361 "TAO (%P|%t) - Connection_Handler[%d]::"
00362 "close_connection_eh, cancel all timers\n",
00363 handle));
00364 }
00365
00366 eh_reactor->cancel_timer (eh);
00367
00368
00369
00370 this->transport ()->wait_strategy ()->is_registered (false);
00371 }
00372
00373
00374
00375
00376
00377
00378
00379 this->transport ()->send_connection_closed_notifications ();
00380 this->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
00381 this->orb_core_->leader_follower ());
00382
00383 if (TAO_debug_level)
00384 {
00385 ACE_DEBUG ((LM_DEBUG,
00386 "TAO (%P|%t) - Connection_Handler[%d]::"
00387 "close_connection_eh\n",
00388 id));
00389 }
00390
00391 return 1;
00392 }
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402
00403 int
00404 TAO_Connection_Handler::set_dscp_codepoint (CORBA::Boolean)
00405 {
00406 return 0;
00407 }
00408
00409 int
00410 TAO_Connection_Handler::set_dscp_codepoint (CORBA::Long)
00411 {
00412 return 0;
00413 }
00414
00415 int
00416 TAO_Connection_Handler::release_os_resources (void)
00417 {
00418 return 0;
00419 }
00420
00421
00422
00423 void
00424 TAO_Connection_Handler::pre_io_hook (int &)
00425 {
00426 }
00427
00428 void
00429 TAO_Connection_Handler::pos_io_hook (int &)
00430 {
00431 }
00432
00433 int
00434 TAO_Connection_Handler::close_handler (u_long)
00435 {
00436 this->is_closed_ = true;
00437 this->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
00438 this->orb_core_->leader_follower ());
00439
00440
00441 this->cancel_pending_connection ();
00442
00443
00444 this->transport ()->purge_entry();
00445
00446 return 0;
00447 }
00448
00449
00450
00451 TAO_END_VERSIONED_NAMESPACE_DECL