00001
00002
00003 #include "tao/Connection_Handler.h"
00004 #include "tao/ORB_Core.h"
00005 #include "tao/debug.h"
00006 #include "tao/Resume_Handle.h"
00007 #include "tao/Transport.h"
00008 #include "tao/Wait_Strategy.h"
00009
00010 #include "ace/SOCK.h"
00011 #include "ace/Reactor.h"
00012 #include "ace/os_include/sys/os_socket.h"
00013
00014
00015
00016 #if !defined (__ACE_INLINE__)
00017 #include "tao/Connection_Handler.inl"
00018 #endif
00019
00020 ACE_RCSID (tao,
00021 Connection_Handler,
00022 "$Id: Connection_Handler.cpp 79169 2007-08-02 08:41:23Z johnnyw $")
00023
00024 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00025
00026 TAO_Connection_Handler::TAO_Connection_Handler (TAO_ORB_Core *orb_core)
00027 : orb_core_ (orb_core),
00028 transport_ (0),
00029 connection_pending_ (false)
00030 {
00031
00032
00033 this->lock_ =
00034 this->orb_core_->resource_factory ()->create_cached_connection_lock ();
00035
00036
00037
00038 this->state_changed (TAO_LF_Event::LFS_CONNECTION_WAIT,
00039 this->orb_core_->leader_follower ());
00040 }
00041
00042 TAO_Connection_Handler::~TAO_Connection_Handler (void)
00043 {
00044
00045 delete this->lock_;
00046
00047
00048 }
00049
00050 int
00051 TAO_Connection_Handler::shared_open (void)
00052 {
00053
00054
00055
00056
00057
00058 this->cancel_pending_connection();
00059
00060 return 0;
00061 }
00062
00063 int
00064 TAO_Connection_Handler::set_socket_option (ACE_SOCK &sock,
00065 int snd_size,
00066 int rcv_size)
00067 {
00068 #if !defined (ACE_LACKS_SOCKET_BUFSIZ)
00069
00070 if (snd_size != 0
00071 && sock.set_option (SOL_SOCKET,
00072 SO_SNDBUF,
00073 (void *) &snd_size,
00074 sizeof (snd_size)) == -1
00075 && errno != ENOTSUP)
00076 {
00077 return -1;
00078 }
00079
00080 if (rcv_size != 0
00081 && sock.set_option (SOL_SOCKET,
00082 SO_RCVBUF,
00083 (void *) &rcv_size,
00084 sizeof (int)) == -1
00085 && errno != ENOTSUP)
00086 {
00087 return -1;
00088 }
00089 #else
00090 ACE_UNUSED_ARG (snd_size);
00091 ACE_UNUSED_ARG (rcv_size);
00092 #endif
00093
00094
00095
00096
00097 (void) sock.enable (ACE_CLOEXEC);
00098
00099 return 0;
00100 }
00101
00102 int
00103 TAO_Connection_Handler::svc_i (void)
00104 {
00105 int result = 0;
00106
00107 if (TAO_debug_level > 0)
00108 ACE_DEBUG ((LM_DEBUG,
00109 ACE_TEXT ("TAO (%P|%t) - Connection_Handler::svc_i begin\n")));
00110
00111
00112
00113
00114
00115 ACE_Time_Value *max_wait_time = 0;
00116 ACE_Time_Value timeout;
00117 ACE_Time_Value current_timeout;
00118
00119 if (this->orb_core_->thread_per_connection_timeout (timeout))
00120 {
00121 current_timeout = timeout;
00122 max_wait_time = ¤t_timeout;
00123 }
00124
00125 TAO_Resume_Handle rh (this->orb_core_,
00126 ACE_INVALID_HANDLE);
00127
00128
00129
00130
00131
00132
00133 while (!this->orb_core_->has_shutdown ()
00134 && result >= 0)
00135 {
00136
00137 (void) this->transport ()->update_transport ();
00138
00139 result =
00140 this->transport ()->handle_input (rh,
00141 max_wait_time);
00142
00143 if (result == -1 && errno == ETIME)
00144 {
00145
00146
00147 result = 0;
00148
00149
00150
00151
00152 errno = 0;
00153 }
00154 else if (result == -1)
00155 {
00156
00157 return result;
00158 }
00159
00160 current_timeout = timeout;
00161
00162 if (TAO_debug_level > 0)
00163 ACE_DEBUG ((LM_DEBUG,
00164 "TAO (%P|%t) - Connection_Handler::svc_i - "
00165 "loop <%d>\n", current_timeout.msec ()));
00166 }
00167
00168 if (TAO_debug_level > 0)
00169 ACE_DEBUG ((LM_DEBUG,
00170 "TAO (%P|%t) - Connection_Handler::svc_i end\n"));
00171
00172 return result;
00173 }
00174
00175 void
00176 TAO_Connection_Handler::transport (TAO_Transport* transport)
00177 {
00178 this->transport_ = transport;
00179
00180
00181 this->transport_->event_handler_i ()->reference_counting_policy ().value (
00182 ACE_Event_Handler::Reference_Counting_Policy::ENABLED
00183 );
00184 }
00185
00186 int
00187 TAO_Connection_Handler::handle_output_eh (
00188 ACE_HANDLE, ACE_Event_Handler * eh)
00189 {
00190
00191 (void) this->transport ()->update_transport ();
00192
00193
00194
00195 TAO_Resume_Handle resume_handle (this->orb_core (),
00196 eh->get_handle ());
00197
00198 int return_value = 0;
00199 this->pre_io_hook (return_value);
00200 if (return_value != 0)
00201 {
00202 resume_handle.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
00203 return return_value;
00204 }
00205
00206 return_value = this->transport ()->handle_output ();
00207
00208 this->pos_io_hook (return_value);
00209
00210 if (return_value != 0)
00211 {
00212 resume_handle.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
00213 }
00214
00215 return return_value;
00216 }
00217
00218 int
00219 TAO_Connection_Handler::handle_input_eh (
00220 ACE_HANDLE h, ACE_Event_Handler *eh)
00221 {
00222
00223 if (!this->transport ()->wait_strategy ()->can_process_upcalls ())
00224 {
00225 if (TAO_debug_level > 6)
00226 ACE_DEBUG ((LM_DEBUG,
00227 "TAO (%P|%t) - Connection_Handler[%d]::handle_input_eh, "
00228 "not going to handle_input on transport "
00229 "because upcalls temporarily suspended on this thread\n",
00230 this->transport()->id()));
00231 return 0;
00232 }
00233
00234 int const result = this->handle_input_internal (h, eh);
00235
00236 if (result == -1)
00237 {
00238 this->close_connection ();
00239 return 0;
00240 }
00241
00242 return result;
00243 }
00244
00245 int
00246 TAO_Connection_Handler::handle_input_internal (
00247 ACE_HANDLE h, ACE_Event_Handler * eh)
00248 {
00249
00250 (void) this->transport ()->update_transport ();
00251
00252
00253
00254
00255 size_t const t_id =
00256 this->transport ()->id ();
00257
00258 if (TAO_debug_level > 6)
00259 {
00260 ACE_HANDLE handle = eh->get_handle();
00261 ACE_DEBUG ((LM_DEBUG,
00262 "TAO (%P|%t) - Connection_Handler[%d]::handle_input, "
00263 "handle = %d/%d\n",
00264 t_id, handle, h));
00265 }
00266
00267 TAO_Resume_Handle resume_handle (this->orb_core (), eh->get_handle ());
00268
00269 int return_value = 0;
00270
00271 this->pre_io_hook (return_value);
00272 if (return_value != 0)
00273 return return_value;
00274
00275 return_value = this->transport ()->handle_input (resume_handle);
00276
00277 this->pos_io_hook (return_value);
00278
00279
00280
00281 resume_handle.handle_input_return_value_hook(return_value);
00282
00283 if (TAO_debug_level > 6)
00284 {
00285 ACE_HANDLE handle = eh->get_handle ();
00286 ACE_DEBUG ((LM_DEBUG,
00287 "TAO (%P|%t) - Connection_Handler[%d]::handle_input, "
00288 "handle = %d/%d, retval = %d\n",
00289 t_id, handle, h, return_value));
00290 }
00291
00292 if (return_value == -1)
00293 resume_handle.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
00294 return return_value;
00295 }
00296
00297 int
00298 TAO_Connection_Handler::close_connection_eh (ACE_Event_Handler *eh)
00299 {
00300
00301 ACE_HANDLE handle = eh->get_handle ();
00302
00303 size_t const id = this->transport ()->id ();
00304 if (TAO_debug_level)
00305 {
00306 ACE_DEBUG ((LM_DEBUG,
00307 "TAO (%P|%t) - Connection_Handler[%d]::"
00308 "close_connection_eh, purging entry from cache\n",
00309 handle));
00310 }
00311
00312 this->transport ()->pre_close ();
00313
00314
00315
00316 if (this->transport ()->wait_strategy ()->is_registered ())
00317 {
00318 ACE_Reactor *eh_reactor = eh->reactor ();
00319
00320 if (this->orb_core_->has_shutdown () == 0)
00321 {
00322
00323
00324 if (eh_reactor == 0)
00325 eh_reactor = this->transport()->orb_core()->reactor ();
00326 }
00327
00328
00329
00330 ACE_ASSERT (eh_reactor != 0);
00331
00332 if (TAO_debug_level)
00333 {
00334 ACE_DEBUG ((LM_DEBUG,
00335 "TAO (%P|%t) - Connection_Handler[%d]::"
00336 "close_connection_eh, removing from the reactor\n",
00337 handle));
00338 }
00339
00340
00341
00342
00343
00344
00345
00346 ACE_HANDLE tmp_handle = handle;
00347 if (this->orb_core_->has_shutdown ())
00348 tmp_handle = (ACE_HANDLE) id;
00349
00350 eh_reactor->remove_handler (tmp_handle,
00351 ACE_Event_Handler::ALL_EVENTS_MASK |
00352 ACE_Event_Handler::DONT_CALL);
00353
00354
00355
00356 if (TAO_debug_level)
00357 {
00358 ACE_DEBUG ((LM_DEBUG,
00359 "TAO (%P|%t) - Connection_Handler[%d]::"
00360 "close_connection_eh, cancel all timers\n",
00361 handle));
00362 }
00363
00364 eh_reactor->cancel_timer (eh);
00365
00366
00367
00368 this->transport ()->wait_strategy ()->is_registered (false);
00369 }
00370
00371
00372
00373
00374
00375
00376
00377 this->transport ()->send_connection_closed_notifications ();
00378 this->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
00379 this->orb_core_->leader_follower ());
00380
00381 if (TAO_debug_level)
00382 {
00383 ACE_DEBUG ((LM_DEBUG,
00384 "TAO (%P|%t) - Connection_Handler[%d]::"
00385 "close_connection_eh\n",
00386 id));
00387 }
00388
00389 return 1;
00390 }
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401 int
00402 TAO_Connection_Handler::set_dscp_codepoint (CORBA::Boolean)
00403 {
00404 return 0;
00405 }
00406
00407 int
00408 TAO_Connection_Handler::set_dscp_codepoint (CORBA::Long)
00409 {
00410 return 0;
00411 }
00412
00413 int
00414 TAO_Connection_Handler::release_os_resources (void)
00415 {
00416 return 0;
00417 }
00418
00419
00420
00421 void
00422 TAO_Connection_Handler::pre_io_hook (int &)
00423 {
00424 }
00425
00426 void
00427 TAO_Connection_Handler::pos_io_hook (int &)
00428 {
00429 }
00430
00431 int
00432 TAO_Connection_Handler::close_handler (void)
00433 {
00434 this->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
00435 this->orb_core_->leader_follower ());
00436 this->transport ()->purge_entry();
00437 this->transport ()->remove_reference ();
00438
00439
00440
00441
00442
00443
00444 return 0;
00445 }
00446
00447
00448
00449 TAO_END_VERSIONED_NAMESPACE_DECL