00001
00002
00003 #include "ace/TP_Reactor.h"
00004 #include "ace/Thread.h"
00005 #include "ace/Timer_Queue.h"
00006 #include "ace/Sig_Handler.h"
00007 #include "ace/Log_Msg.h"
00008 #include "ace/OS_NS_sys_time.h"
00009
00010 #if !defined (__ACE_INLINE__)
00011 #include "ace/TP_Reactor.inl"
00012 #endif
00013
00014 ACE_RCSID (ace,
00015 TP_Reactor,
00016 "$Id: TP_Reactor.cpp 85604 2009-06-11 13:26:00Z johnnyw $")
00017
00018 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00019
00020 ACE_ALLOC_HOOK_DEFINE (ACE_TP_Reactor)
00021
00022 int
00023 ACE_TP_Token_Guard::acquire_read_token (ACE_Time_Value *max_wait_time)
00024 {
00025 ACE_TRACE ("ACE_TP_Token_Guard::acquire_read_token");
00026
00027
00028
00029
00030
00031 int result = 0;
00032
00033 if (max_wait_time)
00034 {
00035 ACE_Time_Value tv = ACE_OS::gettimeofday ();
00036 tv += *max_wait_time;
00037
00038 ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook,
00039 0,
00040 &tv));
00041 }
00042 else
00043 {
00044 ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook));
00045 }
00046
00047
00048 if (result == -1)
00049 {
00050 if (errno == ETIME)
00051 return 0;
00052 else
00053 return -1;
00054 }
00055
00056
00057 this->owner_ = true;
00058
00059 return result;
00060 }
00061
00062 int
00063 ACE_TP_Token_Guard::acquire_token (ACE_Time_Value *max_wait_time)
00064 {
00065 ACE_TRACE ("ACE_TP_Token_Guard::acquire_token");
00066
00067
00068
00069 int result = 0;
00070
00071 if (max_wait_time)
00072 {
00073 ACE_Time_Value tv = ACE_OS::gettimeofday ();
00074 tv += *max_wait_time;
00075
00076 ACE_MT (result = this->token_.acquire (0,
00077 0,
00078 &tv));
00079 }
00080 else
00081 {
00082 ACE_MT (result = this->token_.acquire ());
00083 }
00084
00085
00086 if (result == -1)
00087 {
00088 if (errno == ETIME)
00089 return 0;
00090 else
00091 return -1;
00092 }
00093
00094
00095 this->owner_ = true;
00096
00097 return result;
00098 }
00099
00100
00101 ACE_TP_Reactor::ACE_TP_Reactor (ACE_Sig_Handler *sh,
00102 ACE_Timer_Queue *tq,
00103 bool mask_signals,
00104 int s_queue)
00105 : ACE_Select_Reactor (sh, tq, ACE_DISABLE_NOTIFY_PIPE_DEFAULT, 0, mask_signals, s_queue)
00106 {
00107 ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00108 this->supress_notify_renew (1);
00109 }
00110
00111 ACE_TP_Reactor::ACE_TP_Reactor (size_t max_number_of_handles,
00112 bool restart,
00113 ACE_Sig_Handler *sh,
00114 ACE_Timer_Queue *tq,
00115 bool mask_signals,
00116 int s_queue)
00117 : ACE_Select_Reactor (max_number_of_handles, restart, sh, tq, ACE_DISABLE_NOTIFY_PIPE_DEFAULT, 0, mask_signals, s_queue)
00118 {
00119 ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00120 this->supress_notify_renew (1);
00121 }
00122
00123 int
00124 ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id)
00125 {
00126 ACE_TRACE ("ACE_TP_Reactor::owner");
00127 if (o_id)
00128 *o_id = ACE_Thread::self ();
00129
00130 return 0;
00131 }
00132
00133 int
00134 ACE_TP_Reactor::owner (ACE_thread_t *t_id)
00135 {
00136 ACE_TRACE ("ACE_TP_Reactor::owner");
00137 *t_id = ACE_Thread::self ();
00138
00139 return 0;
00140 }
00141
00142 int
00143 ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time)
00144 {
00145 ACE_TRACE ("ACE_TP_Reactor::handle_events");
00146
00147
00148
00149
00150 ACE_Countdown_Time countdown (max_wait_time);
00151
00152
00153
00154
00155
00156
00157
00158 ACE_TP_Token_Guard guard (this->token_);
00159
00160 int const result = guard.acquire_read_token (max_wait_time);
00161
00162
00163 if (!guard.is_owner ())
00164 return result;
00165
00166
00167 if (this->deactivated_)
00168 return -1;
00169
00170
00171 countdown.update ();
00172
00173 return this->dispatch_i (max_wait_time, guard);
00174 }
00175
00176 int
00177 ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time,
00178 ACE_TP_Token_Guard &guard)
00179 {
00180 int event_count = this->get_event_for_dispatching (max_wait_time);
00181
00182
00183
00184 int const initial_event_count = event_count;
00185
00186 int result = 0;
00187
00188
00189
00190
00191
00192 #if 0
00193
00194
00195
00196
00197
00198 if (event_count == -1)
00199 {
00200
00201
00202
00203
00204
00205
00206 return this->handle_signals (event_count, guard);
00207 }
00208 #endif // #if 0
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221 result = this->handle_timer_events (event_count, guard);
00222
00223 if (result > 0)
00224 return result;
00225
00226
00227
00228 if (event_count > 0)
00229 {
00230
00231
00232
00233 result = this->handle_notify_events (event_count, guard);
00234
00235 if (result > 0)
00236 return result;
00237
00238
00239 }
00240
00241 if (event_count > 0)
00242 {
00243
00244 result = this->handle_socket_events (event_count, guard);
00245 }
00246
00247 if (event_count != 0 && event_count == initial_event_count)
00248 {
00249 this->state_changed_ = true;
00250 }
00251
00252 return result;
00253 }
00254
00255
00256 #if 0
00257
00258
00259
00260
00261 int
00262 ACE_TP_Reactor::handle_signals (int & ,
00263 ACE_TP_Token_Guard & )
00264 {
00265 ACE_TRACE ("ACE_TP_Reactor::handle_signals");
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275 if (ACE_Sig_Handler::sig_pending () != 0)
00276 {
00277 ACE_Sig_Handler::sig_pending (0);
00278
00279
00280
00281
00282
00283 if 0
00284
00285
00286
00287
00288
00289
00290 active_handle_count = this->any_ready (dispatch_set);
00291 else
00292
00293 endif
00294
00295
00296
00297
00298 return 1;
00299 }
00300
00301 return -1;
00302 }
00303 #endif // #if 0
00304
00305
00306 int
00307 ACE_TP_Reactor::handle_timer_events (int & ,
00308 ACE_TP_Token_Guard &guard)
00309 {
00310 if (this->timer_queue_ == 0 || this->timer_queue_->is_empty())
00311 {
00312 return 0;
00313 }
00314
00315
00316 ACE_Time_Value cur_time (this->timer_queue_->gettimeofday () +
00317 this->timer_queue_->timer_skew ());
00318
00319
00320
00321 ACE_Timer_Node_Dispatch_Info info;
00322
00323 if (this->timer_queue_->dispatch_info (cur_time, info))
00324 {
00325 const void *upcall_act = 0;
00326
00327
00328 this->timer_queue_->preinvoke (info, cur_time, upcall_act);
00329
00330
00331 guard.release_token ();
00332
00333
00334 this->timer_queue_->upcall (info, cur_time);
00335
00336
00337 this->timer_queue_->postinvoke (info, cur_time, upcall_act);
00338
00339
00340 return 1;
00341 }
00342
00343 return 0;
00344 }
00345
00346 int
00347 ACE_TP_Reactor::handle_notify_events (int & ,
00348 ACE_TP_Token_Guard &guard)
00349 {
00350
00351 ACE_HANDLE notify_handle = this->get_notify_handle ();
00352
00353 int result = 0;
00354
00355
00356
00357 if (notify_handle == ACE_INVALID_HANDLE)
00358 return result;
00359
00360
00361 ACE_Notification_Buffer buffer;
00362
00363
00364 this->ready_set_.rd_mask_.clr_bit (notify_handle);
00365
00366
00367
00368 while (this->notify_handler_->read_notify_pipe (notify_handle, buffer) > 0)
00369 {
00370
00371
00372
00373
00374
00375 if (this->notify_handler_->is_dispatchable (buffer) > 0)
00376 {
00377
00378 guard.release_token ();
00379
00380
00381 this->notify_handler_->dispatch_notify (buffer);
00382
00383
00384 result = 1;
00385
00386
00387 break;
00388 }
00389 }
00390
00391
00392
00393
00394 return result;
00395 }
00396
00397 int
00398 ACE_TP_Reactor::handle_socket_events (int &event_count,
00399 ACE_TP_Token_Guard &guard)
00400 {
00401
00402
00403 ACE_EH_Dispatch_Info dispatch_info;
00404
00405 this->get_socket_event_info (dispatch_info);
00406
00407
00408
00409 if (!dispatch_info.dispatch ())
00410 {
00411
00412 if (dispatch_info.event_handler_ == 0)
00413 {
00414 this->handler_rep_.unbind(dispatch_info.handle_,
00415 dispatch_info.mask_);
00416 }
00417
00418
00419 return 0;
00420 }
00421
00422
00423
00424
00425
00426
00427 if (dispatch_info.event_handler_ != this->notify_handler_)
00428 if (this->suspend_i (dispatch_info.handle_) == -1)
00429 return 0;
00430
00431
00432 if (dispatch_info.reference_counting_required_)
00433 dispatch_info.event_handler_->add_reference ();
00434
00435
00436 guard.release_token ();
00437
00438 int result = 0;
00439
00440
00441
00442 --event_count;
00443
00444
00445 if (this->dispatch_socket_event (dispatch_info) == 0)
00446 ++result;
00447
00448 return result;
00449 }
00450
00451 int
00452 ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time)
00453 {
00454
00455
00456 if (this->state_changed_)
00457 {
00458 this->ready_set_.rd_mask_.reset ();
00459 this->ready_set_.wr_mask_.reset ();
00460 this->ready_set_.ex_mask_.reset ();
00461
00462 this->state_changed_ = false;
00463 }
00464 else
00465 {
00466
00467
00468
00469
00470
00471 this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ());
00472 this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ());
00473 this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ());
00474 }
00475
00476 return this->wait_for_multiple_events (this->ready_set_, max_wait_time);
00477 }
00478
00479 int
00480 ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event)
00481 {
00482
00483
00484
00485
00486
00487 int found_io = 0;
00488 ACE_HANDLE handle;
00489
00490
00491
00492 {
00493 ACE_Handle_Set_Iterator handle_iter (this->ready_set_.wr_mask_);
00494
00495 while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00496 {
00497 if (this->is_suspended_i (handle))
00498 continue;
00499
00500
00501 event.set (handle,
00502 this->handler_rep_.find (handle),
00503 ACE_Event_Handler::WRITE_MASK,
00504 &ACE_Event_Handler::handle_output);
00505
00506 this->clear_handle_read_set (handle);
00507 found_io = 1;
00508 }
00509 }
00510
00511 if (!found_io)
00512 {
00513 ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_);
00514
00515 while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00516 {
00517 if (this->is_suspended_i (handle))
00518 continue;
00519
00520
00521 event.set (handle,
00522 this->handler_rep_.find (handle),
00523 ACE_Event_Handler::EXCEPT_MASK,
00524 &ACE_Event_Handler::handle_exception);
00525
00526 this->clear_handle_read_set (handle);
00527
00528 found_io = 1;
00529 }
00530 }
00531
00532 if (!found_io)
00533 {
00534 ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
00535
00536 while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00537 {
00538 if (this->is_suspended_i (handle))
00539 continue;
00540
00541
00542 event.set (handle,
00543 this->handler_rep_.find (handle),
00544 ACE_Event_Handler::READ_MASK,
00545 &ACE_Event_Handler::handle_input);
00546
00547 this->clear_handle_read_set (handle);
00548 found_io = 1;
00549 }
00550 }
00551
00552 return found_io;
00553 }
00554
00555
00556 int
00557 ACE_TP_Reactor::dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info)
00558 {
00559 ACE_TRACE ("ACE_TP_Reactor::dispatch_socket_event");
00560
00561 ACE_Event_Handler * const event_handler = dispatch_info.event_handler_;
00562 ACE_EH_PTMF const callback = dispatch_info.callback_;
00563
00564
00565 if (event_handler == 0)
00566 return -1;
00567
00568
00569
00570
00571
00572
00573 int status = 1;
00574 while (status > 0)
00575 status = (event_handler->*callback) (dispatch_info.handle_);
00576
00577
00578 return this->post_process_socket_event (dispatch_info, status);
00579 }
00580
00581 int
00582 ACE_TP_Reactor::post_process_socket_event (ACE_EH_Dispatch_Info &dispatch_info,
00583 int status)
00584 {
00585 int result = 0;
00586
00587
00588
00589 if (status < 0 ||
00590 (dispatch_info.event_handler_ != this->notify_handler_ &&
00591 dispatch_info.resume_flag_ ==
00592 ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER))
00593 {
00594
00595
00596
00597
00598 ACE_TP_Token_Guard guard (this->token_);
00599
00600 result = guard.acquire_token ();
00601
00602
00603 if (!guard.is_owner ())
00604 return result;
00605
00606
00607
00608
00609
00610 ACE_Event_Handler const * const eh =
00611 this->handler_rep_.find (dispatch_info.handle_);
00612
00613
00614
00615 if (eh == dispatch_info.event_handler_)
00616 {
00617 if (status < 0)
00618 {
00619 result =
00620 this->remove_handler_i (dispatch_info.handle_,
00621 dispatch_info.mask_);
00622 }
00623
00624
00625 if (dispatch_info.event_handler_ != this->notify_handler_ &&
00626 dispatch_info.resume_flag_ ==
00627 ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)
00628 this->resume_i (dispatch_info.handle_);
00629 }
00630 }
00631
00632
00633 if (dispatch_info.reference_counting_required_)
00634 dispatch_info.event_handler_->remove_reference ();
00635
00636 return result;
00637 }
00638
00639 int
00640 ACE_TP_Reactor::resumable_handler (void)
00641 {
00642 return 1;
00643 }
00644
00645 int
00646 ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time)
00647 {
00648 return this->handle_events (&max_wait_time);
00649 }
00650
00651 void
00652 ACE_TP_Reactor::notify_handle (ACE_HANDLE,
00653 ACE_Reactor_Mask,
00654 ACE_Handle_Set &,
00655 ACE_Event_Handler *eh,
00656 ACE_EH_PTMF)
00657 {
00658 ACE_ERROR ((LM_ERROR,
00659 ACE_TEXT ("ACE_TP_Reactor::notify_handle: ")
00660 ACE_TEXT ("Wrong version of notify_handle() got called\n")));
00661
00662 ACE_ASSERT (eh == 0);
00663 ACE_UNUSED_ARG (eh);
00664 }
00665
00666 ACE_HANDLE
00667 ACE_TP_Reactor::get_notify_handle (void)
00668 {
00669
00670
00671 ACE_HANDLE const read_handle =
00672 this->notify_handler_->notify_handle ();
00673
00674
00675
00676 if (read_handle != ACE_INVALID_HANDLE &&
00677 this->ready_set_.rd_mask_.is_set (read_handle))
00678 {
00679 return read_handle;
00680 }
00681
00682
00683 return ACE_INVALID_HANDLE;
00684 }
00685
00686 ACE_END_VERSIONED_NAMESPACE_DECL