00001
00002
00003 #include "ace/TP_Reactor.h"
00004 #include "ace/Thread.h"
00005 #include "ace/Timer_Queue.h"
00006 #include "ace/Signal.h"
00007 #include "ace/Log_Msg.h"
00008 #include "ace/OS_NS_sys_time.h"
00009
00010 #if !defined (__ACE_INLINE__)
00011 #include "ace/TP_Reactor.inl"
00012 #endif
00013
00014 ACE_RCSID (ace,
00015 TP_Reactor,
00016 "TP_Reactor.cpp,v 4.79 2006/03/20 10:10:49 jwillemsen Exp")
00017
00018 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00019
00020 ACE_ALLOC_HOOK_DEFINE (ACE_TP_Reactor)
00021
00022 int
00023 ACE_TP_Token_Guard::acquire_read_token (ACE_Time_Value *max_wait_time)
00024 {
00025 ACE_TRACE ("ACE_TP_Token_Guard::acquire_read_token");
00026
00027
00028
00029
00030
00031 int result = 0;
00032
00033 if (max_wait_time)
00034 {
00035 ACE_Time_Value tv = ACE_OS::gettimeofday ();
00036 tv += *max_wait_time;
00037
00038 ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook,
00039 0,
00040 &tv));
00041 }
00042 else
00043 {
00044 ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook));
00045 }
00046
00047
00048 if (result == -1)
00049 {
00050 if (errno == ETIME)
00051 return 0;
00052 else
00053 return -1;
00054 }
00055
00056
00057 this->owner_ = 1;
00058
00059 return result;
00060 }
00061
00062 int
00063 ACE_TP_Token_Guard::acquire_token (ACE_Time_Value *max_wait_time)
00064 {
00065 ACE_TRACE ("ACE_TP_Token_Guard::acquire_token");
00066
00067
00068
00069 int result = 0;
00070
00071 if (max_wait_time)
00072 {
00073 ACE_Time_Value tv = ACE_OS::gettimeofday ();
00074 tv += *max_wait_time;
00075
00076 ACE_MT (result = this->token_.acquire (0,
00077 0,
00078 &tv));
00079 }
00080 else
00081 {
00082 ACE_MT (result = this->token_.acquire ());
00083 }
00084
00085
00086 if (result == -1)
00087 {
00088 if (errno == ETIME)
00089 return 0;
00090 else
00091 return -1;
00092 }
00093
00094
00095 this->owner_ = 1;
00096
00097 return result;
00098 }
00099
00100
00101 ACE_TP_Reactor::ACE_TP_Reactor (ACE_Sig_Handler *sh,
00102 ACE_Timer_Queue *tq,
00103 int mask_signals,
00104 int s_queue)
00105 : ACE_Select_Reactor (sh, tq, 0, 0, mask_signals, s_queue)
00106 {
00107 ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00108 this->supress_notify_renew (1);
00109 }
00110
00111 ACE_TP_Reactor::ACE_TP_Reactor (size_t max_number_of_handles,
00112 int restart,
00113 ACE_Sig_Handler *sh,
00114 ACE_Timer_Queue *tq,
00115 int mask_signals,
00116 int s_queue)
00117 : ACE_Select_Reactor (max_number_of_handles, restart, sh, tq, 0, 0, mask_signals, s_queue)
00118 {
00119 ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00120 this->supress_notify_renew (1);
00121 }
00122
00123 int
00124 ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id)
00125 {
00126 ACE_TRACE ("ACE_TP_Reactor::owner");
00127 if (o_id)
00128 *o_id = ACE_Thread::self ();
00129
00130 return 0;
00131 }
00132
00133 int
00134 ACE_TP_Reactor::owner (ACE_thread_t *t_id)
00135 {
00136 ACE_TRACE ("ACE_TP_Reactor::owner");
00137 *t_id = ACE_Thread::self ();
00138
00139 return 0;
00140 }
00141
00142 int
00143 ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time)
00144 {
00145 ACE_TRACE ("ACE_TP_Reactor::handle_events");
00146
00147
00148
00149
00150 ACE_Countdown_Time countdown (max_wait_time);
00151
00152
00153
00154
00155
00156
00157
00158 ACE_TP_Token_Guard guard (this->token_);
00159
00160 int const result = guard.acquire_read_token (max_wait_time);
00161
00162
00163 if (!guard.is_owner ())
00164 return result;
00165
00166
00167 if (this->deactivated_)
00168 return -1;
00169
00170
00171 countdown.update ();
00172
00173 return this->dispatch_i (max_wait_time,
00174 guard);
00175 }
00176
00177
00178
00179
00180
00181 int
00182 ACE_TP_Reactor::register_handler (int,
00183 ACE_Event_Handler *,
00184 ACE_Sig_Action *,
00185 ACE_Event_Handler **,
00186 ACE_Sig_Action *)
00187 {
00188 ACE_NOTSUP_RETURN (-1);
00189 }
00190
00191
00192 int
00193 ACE_TP_Reactor::register_handler (const ACE_Sig_Set &,
00194 ACE_Event_Handler *,
00195 ACE_Sig_Action *)
00196 {
00197 ACE_NOTSUP_RETURN (-1);
00198 }
00199
00200 int
00201 ACE_TP_Reactor::register_handler (ACE_Event_Handler *eh,
00202 ACE_Reactor_Mask mask)
00203 {
00204 return ACE_Select_Reactor::register_handler (eh,
00205 mask);
00206 }
00207
00208 int
00209 ACE_TP_Reactor::register_handler (ACE_HANDLE handle,
00210 ACE_Event_Handler *eh,
00211 ACE_Reactor_Mask mask)
00212 {
00213 return ACE_Select_Reactor::register_handler (handle,
00214 eh,
00215 mask);
00216 }
00217
00218 #if defined (ACE_WIN32)
00219
00220 int
00221 ACE_TP_Reactor::register_handler (ACE_Event_Handler *eh,
00222 ACE_HANDLE h)
00223 {
00224 return ACE_Select_Reactor::register_handler (eh,
00225 h);
00226 }
00227
00228 #endif
00229 int
00230 ACE_TP_Reactor::register_handler (ACE_HANDLE event_handle,
00231 ACE_HANDLE io_handle,
00232 ACE_Event_Handler *event_handler,
00233 ACE_Reactor_Mask mask)
00234 {
00235 return ACE_Select_Reactor::register_handler (event_handle,
00236 io_handle,
00237 event_handler,
00238 mask);
00239 }
00240
00241 int
00242 ACE_TP_Reactor::register_handler (const ACE_Handle_Set &handles,
00243 ACE_Event_Handler *eh,
00244 ACE_Reactor_Mask mask)
00245 {
00246 return ACE_Select_Reactor::register_handler (handles,
00247 eh,
00248 mask);
00249 }
00250
00251 int
00252 ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time,
00253 ACE_TP_Token_Guard &guard)
00254 {
00255 int event_count =
00256 this->get_event_for_dispatching (max_wait_time);
00257
00258 int result = 0;
00259
00260
00261
00262
00263
00264
00265 if (event_count == -1)
00266 {
00267
00268
00269
00270
00271
00272
00273 return this->handle_signals (event_count,
00274 guard);
00275 }
00276
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288 result = this->handle_timer_events (event_count,
00289 guard);
00290
00291 if (result > 0)
00292 return result;
00293
00294
00295
00296 if (event_count > 0)
00297 {
00298
00299
00300
00301 result = this->handle_notify_events (event_count,
00302 guard);
00303
00304 if (result > 0)
00305 return result;
00306
00307
00308 }
00309
00310 if (event_count > 0)
00311 {
00312
00313 return this->handle_socket_events (event_count,
00314 guard);
00315 }
00316
00317 return 0;
00318 }
00319
00320 int
00321 ACE_TP_Reactor::handle_signals (int & ,
00322 ACE_TP_Token_Guard & )
00323 {
00324 ACE_TRACE ("ACE_TP_Reactor::handle_signals");
00325
00326
00327
00328
00329
00330
00331
00332
00333
00334 if (ACE_Sig_Handler::sig_pending () != 0)
00335 {
00336 ACE_Sig_Handler::sig_pending (0);
00337
00338
00339
00340
00341
00342 #if 0
00343
00344
00345
00346
00347
00348
00349 active_handle_count = this->any_ready (dispatch_set);
00350 #else
00351
00352 #endif
00353
00354
00355
00356
00357 return 1;
00358 }
00359
00360 return -1;
00361 }
00362
00363
00364 int
00365 ACE_TP_Reactor::handle_timer_events (int & ,
00366 ACE_TP_Token_Guard &guard)
00367 {
00368 if (this->timer_queue_->is_empty())
00369 {
00370 return 0;
00371 }
00372
00373
00374 ACE_Time_Value cur_time (this->timer_queue_->gettimeofday () +
00375 this->timer_queue_->timer_skew ());
00376
00377
00378
00379 ACE_Timer_Node_Dispatch_Info info;
00380
00381 if (this->timer_queue_->dispatch_info (cur_time,
00382 info))
00383 {
00384 const void *upcall_act = 0;
00385
00386
00387 this->timer_queue_->preinvoke (info,
00388 cur_time,
00389 upcall_act);
00390
00391
00392 guard.release_token ();
00393
00394
00395 this->timer_queue_->upcall (info,
00396 cur_time);
00397
00398
00399 this->timer_queue_->postinvoke (info,
00400 cur_time,
00401 upcall_act);
00402
00403
00404 return 1;
00405 }
00406
00407 return 0;
00408 }
00409
00410 int
00411 ACE_TP_Reactor::handle_notify_events (int & ,
00412 ACE_TP_Token_Guard &guard)
00413 {
00414
00415 ACE_HANDLE notify_handle =
00416 this->get_notify_handle ();
00417
00418 int result = 0;
00419
00420
00421
00422 if (notify_handle == ACE_INVALID_HANDLE)
00423 return result;
00424
00425
00426 ACE_Notification_Buffer buffer;
00427
00428
00429 this->ready_set_.rd_mask_.clr_bit (notify_handle);
00430
00431
00432
00433 while (this->notify_handler_->read_notify_pipe (notify_handle,
00434 buffer) > 0)
00435 {
00436
00437
00438
00439
00440
00441 if (this->notify_handler_->is_dispatchable (buffer) > 0)
00442 {
00443
00444 guard.release_token ();
00445
00446
00447 this->notify_handler_->dispatch_notify (buffer);
00448
00449
00450 result = 1;
00451
00452
00453 break;
00454 }
00455 }
00456
00457
00458
00459
00460 return result;
00461 }
00462
00463 int
00464 ACE_TP_Reactor::handle_socket_events (int &event_count,
00465 ACE_TP_Token_Guard &guard)
00466 {
00467
00468
00469 ACE_EH_Dispatch_Info dispatch_info;
00470
00471 this->get_socket_event_info (dispatch_info);
00472
00473
00474
00475 if (!dispatch_info.dispatch ())
00476 {
00477 return 0;
00478 }
00479
00480
00481
00482
00483
00484
00485 if (dispatch_info.event_handler_ != this->notify_handler_)
00486 if (this->suspend_i (dispatch_info.handle_) == -1)
00487 return 0;
00488
00489
00490 if (dispatch_info.reference_counting_required_)
00491 dispatch_info.event_handler_->add_reference ();
00492
00493
00494 guard.release_token ();
00495
00496 int result = 0;
00497
00498
00499
00500 --event_count;
00501
00502
00503 if (this->dispatch_socket_event (dispatch_info) == 0)
00504 ++result;
00505
00506 return result;
00507 }
00508
00509 int
00510 ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time)
00511 {
00512
00513
00514 if (this->state_changed_)
00515 {
00516 this->ready_set_.rd_mask_.reset ();
00517 this->ready_set_.wr_mask_.reset ();
00518 this->ready_set_.ex_mask_.reset ();
00519
00520 this->state_changed_ = false;
00521 }
00522 else
00523 {
00524
00525
00526
00527
00528
00529 this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ());
00530 this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ());
00531 this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ());
00532 }
00533
00534 return this->wait_for_multiple_events (this->ready_set_,
00535 max_wait_time);
00536 }
00537
00538 int
00539 ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event)
00540 {
00541
00542
00543
00544
00545
00546 int found_io = 0;
00547 ACE_HANDLE handle;
00548
00549
00550
00551 {
00552 ACE_Handle_Set_Iterator handle_iter (this->ready_set_.wr_mask_);
00553
00554 while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00555 {
00556 if (this->is_suspended_i (handle))
00557 continue;
00558
00559
00560 event.set (handle,
00561 this->handler_rep_.find (handle),
00562 ACE_Event_Handler::WRITE_MASK,
00563 &ACE_Event_Handler::handle_output);
00564
00565 this->clear_handle_read_set (handle);
00566 found_io = 1;
00567 }
00568 }
00569
00570 if (!found_io)
00571 {
00572 ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_);
00573
00574 while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00575 {
00576 if (this->is_suspended_i (handle))
00577 continue;
00578
00579
00580 event.set (handle,
00581 this->handler_rep_.find (handle),
00582 ACE_Event_Handler::EXCEPT_MASK,
00583 &ACE_Event_Handler::handle_exception);
00584
00585 this->clear_handle_read_set (handle);
00586
00587 found_io = 1;
00588 }
00589 }
00590
00591 if (!found_io)
00592 {
00593 ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
00594
00595 while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00596 {
00597 if (this->is_suspended_i (handle))
00598 continue;
00599
00600
00601 event.set (handle,
00602 this->handler_rep_.find (handle),
00603 ACE_Event_Handler::READ_MASK,
00604 &ACE_Event_Handler::handle_input);
00605
00606 this->clear_handle_read_set (handle);
00607 found_io = 1;
00608 }
00609 }
00610
00611 return found_io;
00612 }
00613
00614
00615 int
00616 ACE_TP_Reactor::dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info)
00617 {
00618 ACE_TRACE ("ACE_TP_Reactor::dispatch_socket_event");
00619
00620 ACE_Event_Handler * const event_handler = dispatch_info.event_handler_;
00621 ACE_EH_PTMF const callback = dispatch_info.callback_;
00622
00623
00624 if (event_handler == 0)
00625 return -1;
00626
00627
00628
00629
00630
00631
00632 int status = 1;
00633 while (status > 0)
00634 status = (event_handler->*callback) (dispatch_info.handle_);
00635
00636
00637 return this->post_process_socket_event (dispatch_info, status);
00638 }
00639
00640 int
00641 ACE_TP_Reactor::post_process_socket_event (ACE_EH_Dispatch_Info &dispatch_info,
00642 int status)
00643 {
00644 int result = 0;
00645
00646
00647
00648 if (status < 0 ||
00649 (dispatch_info.event_handler_ != this->notify_handler_ &&
00650 dispatch_info.resume_flag_ ==
00651 ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER))
00652 {
00653
00654
00655
00656
00657 ACE_TP_Token_Guard guard (this->token_);
00658
00659 result = guard.acquire_token ();
00660
00661
00662 if (!guard.is_owner ())
00663 return result;
00664
00665 if (status < 0)
00666 {
00667 result =
00668 this->remove_handler_i (dispatch_info.handle_, dispatch_info.mask_);
00669 }
00670
00671
00672 if (dispatch_info.event_handler_ != this->notify_handler_ &&
00673 dispatch_info.resume_flag_ ==
00674 ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)
00675 this->resume_i (dispatch_info.handle_);
00676 }
00677
00678
00679 if (dispatch_info.reference_counting_required_)
00680 dispatch_info.event_handler_->remove_reference ();
00681
00682 return result;
00683 }
00684
00685 int
00686 ACE_TP_Reactor::resumable_handler (void)
00687 {
00688 return 1;
00689 }
00690
00691 int
00692 ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time)
00693 {
00694 return this->handle_events (&max_wait_time);
00695 }
00696
00697 void
00698 ACE_TP_Reactor::notify_handle (ACE_HANDLE,
00699 ACE_Reactor_Mask,
00700 ACE_Handle_Set &,
00701 ACE_Event_Handler *eh,
00702 ACE_EH_PTMF)
00703 {
00704 ACE_ERROR ((LM_ERROR,
00705 ACE_LIB_TEXT ("ACE_TP_Reactor::notify_handle: ")
00706 ACE_LIB_TEXT ("Wrong version of notify_handle() got called \n")));
00707
00708 ACE_ASSERT (eh == 0);
00709 ACE_UNUSED_ARG (eh);
00710 }
00711
00712 ACE_HANDLE
00713 ACE_TP_Reactor::get_notify_handle (void)
00714 {
00715
00716
00717 ACE_HANDLE const read_handle =
00718 this->notify_handler_->notify_handle ();
00719
00720
00721
00722 if (read_handle != ACE_INVALID_HANDLE &&
00723 this->ready_set_.rd_mask_.is_set (read_handle))
00724 {
00725 return read_handle;
00726 }
00727
00728
00729 return ACE_INVALID_HANDLE;
00730 }
00731
00732 ACE_END_VERSIONED_NAMESPACE_DECL