00001
00002
00003 #include "ace/TP_Reactor.h"
00004 #include "ace/Thread.h"
00005 #include "ace/Timer_Queue.h"
00006 #include "ace/Sig_Handler.h"
00007 #include "ace/Log_Msg.h"
00008 #include "ace/OS_NS_sys_time.h"
00009
00010 #if !defined (__ACE_INLINE__)
00011 #include "ace/TP_Reactor.inl"
00012 #endif
00013
00014 ACE_RCSID (ace,
00015 TP_Reactor,
00016 "$Id: TP_Reactor.cpp 79134 2007-07-31 18:23:50Z johnnyw $")
00017
00018 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00019
00020 ACE_ALLOC_HOOK_DEFINE (ACE_TP_Reactor)
00021
00022 int
00023 ACE_TP_Token_Guard::acquire_read_token (ACE_Time_Value *max_wait_time)
00024 {
00025 ACE_TRACE ("ACE_TP_Token_Guard::acquire_read_token");
00026
00027
00028
00029
00030
00031 int result = 0;
00032
00033 if (max_wait_time)
00034 {
00035 ACE_Time_Value tv = ACE_OS::gettimeofday ();
00036 tv += *max_wait_time;
00037
00038 ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook,
00039 0,
00040 &tv));
00041 }
00042 else
00043 {
00044 ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook));
00045 }
00046
00047
00048 if (result == -1)
00049 {
00050 if (errno == ETIME)
00051 return 0;
00052 else
00053 return -1;
00054 }
00055
00056
00057 this->owner_ = 1;
00058
00059 return result;
00060 }
00061
00062 int
00063 ACE_TP_Token_Guard::acquire_token (ACE_Time_Value *max_wait_time)
00064 {
00065 ACE_TRACE ("ACE_TP_Token_Guard::acquire_token");
00066
00067
00068
00069 int result = 0;
00070
00071 if (max_wait_time)
00072 {
00073 ACE_Time_Value tv = ACE_OS::gettimeofday ();
00074 tv += *max_wait_time;
00075
00076 ACE_MT (result = this->token_.acquire (0,
00077 0,
00078 &tv));
00079 }
00080 else
00081 {
00082 ACE_MT (result = this->token_.acquire ());
00083 }
00084
00085
00086 if (result == -1)
00087 {
00088 if (errno == ETIME)
00089 return 0;
00090 else
00091 return -1;
00092 }
00093
00094
00095 this->owner_ = 1;
00096
00097 return result;
00098 }
00099
00100
00101 ACE_TP_Reactor::ACE_TP_Reactor (ACE_Sig_Handler *sh,
00102 ACE_Timer_Queue *tq,
00103 int mask_signals,
00104 int s_queue)
00105 : ACE_Select_Reactor (sh, tq, ACE_DISABLE_NOTIFY_PIPE_DEFAULT, 0, mask_signals, s_queue)
00106 {
00107 ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00108 this->supress_notify_renew (1);
00109 }
00110
00111 ACE_TP_Reactor::ACE_TP_Reactor (size_t max_number_of_handles,
00112 int restart,
00113 ACE_Sig_Handler *sh,
00114 ACE_Timer_Queue *tq,
00115 int mask_signals,
00116 int s_queue)
00117 : ACE_Select_Reactor (max_number_of_handles, restart, sh, tq, ACE_DISABLE_NOTIFY_PIPE_DEFAULT, 0, mask_signals, s_queue)
00118 {
00119 ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00120 this->supress_notify_renew (1);
00121 }
00122
00123 int
00124 ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id)
00125 {
00126 ACE_TRACE ("ACE_TP_Reactor::owner");
00127 if (o_id)
00128 *o_id = ACE_Thread::self ();
00129
00130 return 0;
00131 }
00132
00133 int
00134 ACE_TP_Reactor::owner (ACE_thread_t *t_id)
00135 {
00136 ACE_TRACE ("ACE_TP_Reactor::owner");
00137 *t_id = ACE_Thread::self ();
00138
00139 return 0;
00140 }
00141
00142 int
00143 ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time)
00144 {
00145 ACE_TRACE ("ACE_TP_Reactor::handle_events");
00146
00147
00148
00149
00150 ACE_Countdown_Time countdown (max_wait_time);
00151
00152
00153
00154
00155
00156
00157
00158 ACE_TP_Token_Guard guard (this->token_);
00159
00160 int const result = guard.acquire_read_token (max_wait_time);
00161
00162
00163 if (!guard.is_owner ())
00164 return result;
00165
00166
00167 if (this->deactivated_)
00168 return -1;
00169
00170
00171 countdown.update ();
00172
00173 return this->dispatch_i (max_wait_time,
00174 guard);
00175 }
00176
00177 int
00178 ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time,
00179 ACE_TP_Token_Guard &guard)
00180 {
00181 int event_count =
00182 this->get_event_for_dispatching (max_wait_time);
00183
00184
00185
00186 int initial_event_count = event_count;
00187
00188 int result = 0;
00189
00190
00191
00192
00193
00194 #if 0
00195
00196
00197
00198
00199
00200 if (event_count == -1)
00201 {
00202
00203
00204
00205
00206
00207
00208 return this->handle_signals (event_count,
00209 guard);
00210 }
00211 #endif // #if 0
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224 result = this->handle_timer_events (event_count,
00225 guard);
00226
00227 if (result > 0)
00228 return result;
00229
00230
00231
00232 if (event_count > 0)
00233 {
00234
00235
00236
00237 result = this->handle_notify_events (event_count,
00238 guard);
00239
00240 if (result > 0)
00241 return result;
00242
00243
00244 }
00245
00246 if (event_count > 0)
00247 {
00248
00249 result = this->handle_socket_events (event_count,
00250 guard);
00251 }
00252
00253 if (event_count != 0
00254 && event_count == initial_event_count)
00255 {
00256 this->state_changed_ = true;
00257 }
00258
00259 return result;
00260 }
00261
00262
00263 #if 0
00264
00265
00266
00267
00268 int
00269 ACE_TP_Reactor::handle_signals (int & ,
00270 ACE_TP_Token_Guard & )
00271 {
00272 ACE_TRACE ("ACE_TP_Reactor::handle_signals");
00273
00274
00275
00276
00277
00278
00279
00280
00281
00282 if (ACE_Sig_Handler::sig_pending () != 0)
00283 {
00284 ACE_Sig_Handler::sig_pending (0);
00285
00286
00287
00288
00289
00290 if 0
00291
00292
00293
00294
00295
00296
00297 active_handle_count = this->any_ready (dispatch_set);
00298 else
00299
00300 endif
00301
00302
00303
00304
00305 return 1;
00306 }
00307
00308 return -1;
00309 }
00310 #endif // #if 0
00311
00312
00313 int
00314 ACE_TP_Reactor::handle_timer_events (int & ,
00315 ACE_TP_Token_Guard &guard)
00316 {
00317 if (this->timer_queue_ == 0 || this->timer_queue_->is_empty())
00318 {
00319 return 0;
00320 }
00321
00322
00323 ACE_Time_Value cur_time (this->timer_queue_->gettimeofday () +
00324 this->timer_queue_->timer_skew ());
00325
00326
00327
00328 ACE_Timer_Node_Dispatch_Info info;
00329
00330 if (this->timer_queue_->dispatch_info (cur_time,
00331 info))
00332 {
00333 const void *upcall_act = 0;
00334
00335
00336 this->timer_queue_->preinvoke (info,
00337 cur_time,
00338 upcall_act);
00339
00340
00341 guard.release_token ();
00342
00343
00344 this->timer_queue_->upcall (info,
00345 cur_time);
00346
00347
00348 this->timer_queue_->postinvoke (info,
00349 cur_time,
00350 upcall_act);
00351
00352
00353 return 1;
00354 }
00355
00356 return 0;
00357 }
00358
00359 int
00360 ACE_TP_Reactor::handle_notify_events (int & ,
00361 ACE_TP_Token_Guard &guard)
00362 {
00363
00364 ACE_HANDLE notify_handle =
00365 this->get_notify_handle ();
00366
00367 int result = 0;
00368
00369
00370
00371 if (notify_handle == ACE_INVALID_HANDLE)
00372 return result;
00373
00374
00375 ACE_Notification_Buffer buffer;
00376
00377
00378 this->ready_set_.rd_mask_.clr_bit (notify_handle);
00379
00380
00381
00382 while (this->notify_handler_->read_notify_pipe (notify_handle,
00383 buffer) > 0)
00384 {
00385
00386
00387
00388
00389
00390 if (this->notify_handler_->is_dispatchable (buffer) > 0)
00391 {
00392
00393 guard.release_token ();
00394
00395
00396 this->notify_handler_->dispatch_notify (buffer);
00397
00398
00399 result = 1;
00400
00401
00402 break;
00403 }
00404 }
00405
00406
00407
00408
00409 return result;
00410 }
00411
00412 int
00413 ACE_TP_Reactor::handle_socket_events (int &event_count,
00414 ACE_TP_Token_Guard &guard)
00415 {
00416
00417
00418 ACE_EH_Dispatch_Info dispatch_info;
00419
00420 this->get_socket_event_info (dispatch_info);
00421
00422
00423
00424 if (!dispatch_info.dispatch ())
00425 {
00426
00427 if (dispatch_info.event_handler_ == 0)
00428 {
00429 this->handler_rep_.unbind(dispatch_info.handle_,
00430 dispatch_info.mask_);
00431 }
00432
00433
00434 return 0;
00435 }
00436
00437
00438
00439
00440
00441
00442 if (dispatch_info.event_handler_ != this->notify_handler_)
00443 if (this->suspend_i (dispatch_info.handle_) == -1)
00444 return 0;
00445
00446
00447 if (dispatch_info.reference_counting_required_)
00448 dispatch_info.event_handler_->add_reference ();
00449
00450
00451 guard.release_token ();
00452
00453 int result = 0;
00454
00455
00456
00457 --event_count;
00458
00459
00460 if (this->dispatch_socket_event (dispatch_info) == 0)
00461 ++result;
00462
00463 return result;
00464 }
00465
00466 int
00467 ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time)
00468 {
00469
00470
00471 if (this->state_changed_)
00472 {
00473 this->ready_set_.rd_mask_.reset ();
00474 this->ready_set_.wr_mask_.reset ();
00475 this->ready_set_.ex_mask_.reset ();
00476
00477 this->state_changed_ = false;
00478 }
00479 else
00480 {
00481
00482
00483
00484
00485
00486 this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ());
00487 this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ());
00488 this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ());
00489 }
00490
00491 return this->wait_for_multiple_events (this->ready_set_,
00492 max_wait_time);
00493 }
00494
00495 int
00496 ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event)
00497 {
00498
00499
00500
00501
00502
00503 int found_io = 0;
00504 ACE_HANDLE handle;
00505
00506
00507
00508 {
00509 ACE_Handle_Set_Iterator handle_iter (this->ready_set_.wr_mask_);
00510
00511 while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00512 {
00513 if (this->is_suspended_i (handle))
00514 continue;
00515
00516
00517 event.set (handle,
00518 this->handler_rep_.find (handle),
00519 ACE_Event_Handler::WRITE_MASK,
00520 &ACE_Event_Handler::handle_output);
00521
00522 this->clear_handle_read_set (handle);
00523 found_io = 1;
00524 }
00525 }
00526
00527 if (!found_io)
00528 {
00529 ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_);
00530
00531 while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00532 {
00533 if (this->is_suspended_i (handle))
00534 continue;
00535
00536
00537 event.set (handle,
00538 this->handler_rep_.find (handle),
00539 ACE_Event_Handler::EXCEPT_MASK,
00540 &ACE_Event_Handler::handle_exception);
00541
00542 this->clear_handle_read_set (handle);
00543
00544 found_io = 1;
00545 }
00546 }
00547
00548 if (!found_io)
00549 {
00550 ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
00551
00552 while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00553 {
00554 if (this->is_suspended_i (handle))
00555 continue;
00556
00557
00558 event.set (handle,
00559 this->handler_rep_.find (handle),
00560 ACE_Event_Handler::READ_MASK,
00561 &ACE_Event_Handler::handle_input);
00562
00563 this->clear_handle_read_set (handle);
00564 found_io = 1;
00565 }
00566 }
00567
00568 return found_io;
00569 }
00570
00571
00572 int
00573 ACE_TP_Reactor::dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info)
00574 {
00575 ACE_TRACE ("ACE_TP_Reactor::dispatch_socket_event");
00576
00577 ACE_Event_Handler * const event_handler = dispatch_info.event_handler_;
00578 ACE_EH_PTMF const callback = dispatch_info.callback_;
00579
00580
00581 if (event_handler == 0)
00582 return -1;
00583
00584
00585
00586
00587
00588
00589 int status = 1;
00590 while (status > 0)
00591 status = (event_handler->*callback) (dispatch_info.handle_);
00592
00593
00594 return this->post_process_socket_event (dispatch_info, status);
00595 }
00596
00597 int
00598 ACE_TP_Reactor::post_process_socket_event (ACE_EH_Dispatch_Info &dispatch_info,
00599 int status)
00600 {
00601 int result = 0;
00602
00603
00604
00605 if (status < 0 ||
00606 (dispatch_info.event_handler_ != this->notify_handler_ &&
00607 dispatch_info.resume_flag_ ==
00608 ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER))
00609 {
00610
00611
00612
00613
00614 ACE_TP_Token_Guard guard (this->token_);
00615
00616 result = guard.acquire_token ();
00617
00618
00619 if (!guard.is_owner ())
00620 return result;
00621
00622
00623
00624
00625
00626 ACE_Event_Handler const * const eh =
00627 this->handler_rep_.find (dispatch_info.handle_);
00628
00629
00630
00631 if (eh == dispatch_info.event_handler_)
00632 {
00633 if (status < 0)
00634 {
00635 result =
00636 this->remove_handler_i (dispatch_info.handle_,
00637 dispatch_info.mask_);
00638 }
00639
00640
00641 if (dispatch_info.event_handler_ != this->notify_handler_ &&
00642 dispatch_info.resume_flag_ ==
00643 ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)
00644 this->resume_i (dispatch_info.handle_);
00645 }
00646 }
00647
00648
00649 if (dispatch_info.reference_counting_required_)
00650 dispatch_info.event_handler_->remove_reference ();
00651
00652 return result;
00653 }
00654
00655 int
00656 ACE_TP_Reactor::resumable_handler (void)
00657 {
00658 return 1;
00659 }
00660
00661 int
00662 ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time)
00663 {
00664 return this->handle_events (&max_wait_time);
00665 }
00666
00667 void
00668 ACE_TP_Reactor::notify_handle (ACE_HANDLE,
00669 ACE_Reactor_Mask,
00670 ACE_Handle_Set &,
00671 ACE_Event_Handler *eh,
00672 ACE_EH_PTMF)
00673 {
00674 ACE_ERROR ((LM_ERROR,
00675 ACE_TEXT ("ACE_TP_Reactor::notify_handle: ")
00676 ACE_TEXT ("Wrong version of notify_handle() got called \n")));
00677
00678 ACE_ASSERT (eh == 0);
00679 ACE_UNUSED_ARG (eh);
00680 }
00681
00682 ACE_HANDLE
00683 ACE_TP_Reactor::get_notify_handle (void)
00684 {
00685
00686
00687 ACE_HANDLE const read_handle =
00688 this->notify_handler_->notify_handle ();
00689
00690
00691
00692 if (read_handle != ACE_INVALID_HANDLE &&
00693 this->ready_set_.rd_mask_.is_set (read_handle))
00694 {
00695 return read_handle;
00696 }
00697
00698
00699 return ACE_INVALID_HANDLE;
00700 }
00701
00702 ACE_END_VERSIONED_NAMESPACE_DECL