#include <TP_Reactor.h>
Inheritance diagram for ACE_TP_Reactor:
Public Member Functions | |
ACE_TP_Reactor (ACE_Sig_Handler *=0, ACE_Timer_Queue *=0, int mask_signals=1, int s_queue=ACE_Select_Reactor_Token::FIFO) | |
Initialize ACE_TP_Reactor with the default size. | |
ACE_TP_Reactor (size_t max_number_of_handles, int restart=0, ACE_Sig_Handler *sh=0, ACE_Timer_Queue *tq=0, int mask_signals=1, int s_queue=ACE_Select_Reactor_Token::FIFO) | |
virtual int | handle_events (ACE_Time_Value *max_wait_time=0) |
virtual int | handle_events (ACE_Time_Value &max_wait_time) |
virtual int | resumable_handler (void) |
virtual int | owner (ACE_thread_t n_id, ACE_thread_t *o_id=0) |
virtual int | owner (ACE_thread_t *t_id) |
Return the thread ID of the current Leader. | |
Static Public Member Functions | |
static void | no_op_sleep_hook (void *) |
Called from handle events. | |
Public Attributes | |
ACE_ALLOC_HOOK_DECLARE | |
Declare the dynamic allocation hooks. | |
Protected Member Functions | |
virtual void | clear_dispatch_mask (ACE_HANDLE handle, ACE_Reactor_Mask mask) |
Template method from the base class. | |
int | dispatch_i (ACE_Time_Value *max_wait_time, ACE_TP_Token_Guard &guard) |
Dispatch just 1 signal, timer, notification handlers. | |
int | get_event_for_dispatching (ACE_Time_Value *max_wait_time) |
int | handle_timer_events (int &event_count, ACE_TP_Token_Guard &g) |
Handle timer events. | |
int | handle_notify_events (int &event_count, ACE_TP_Token_Guard &g) |
Handle notify events. | |
int | handle_socket_events (int &event_count, ACE_TP_Token_Guard &g) |
handle socket events | |
virtual void | notify_handle (ACE_HANDLE handle, ACE_Reactor_Mask mask, ACE_Handle_Set &, ACE_Event_Handler *eh, ACE_EH_PTMF callback) |
This method shouldn't get called. | |
Private Member Functions | |
ACE_HANDLE | get_notify_handle (void) |
int | get_socket_event_info (ACE_EH_Dispatch_Info &info) |
Get socket event dispatch information. | |
int | dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info) |
void | clear_handle_read_set (ACE_HANDLE handle) |
Clear the handle from the read_set. | |
int | post_process_socket_event (ACE_EH_Dispatch_Info &dispatch_info, int status) |
ACE_TP_Reactor (const ACE_TP_Reactor &) | |
Deny access since member-wise won't work... | |
ACE_TP_Reactor & | operator= (const ACE_TP_Reactor &) |
One of the shortcomings of the ACE_Select_Reactor is that it does not support a thread pool-based event dispatching model, similar to the one in ACE_WFMO_Reactor. In ACE_Select_Reactor, only thread can call handle_events() at any given time. ACE_TP_Reactor removes this short-coming.
ACE_TP_Reactor is a specialization of ACE_Select_Reactor to support thread pool-based event dispatching. This reactor takes advantage of the fact that events reported by select()
are persistent if not acted upon immediately. It works by remembering the event handler which was just activated, suspending it for further I/O activities, releasing the internal lock (so that another thread can start waiting in the event loop) and then dispatching the event's handler outside the scope of the reactor lock. After the event handler has been dispatched the event handler is resumed for further I/O activity.
This reactor implementation is best suited for situations when the callbacks to event handlers can take arbitrarily long and/or a number of threads are available to run the event loop. Note that I/O-processing callback code in event handlers (e.g. handle_input()) does not have to be modified or made thread-safe for this reactor. This is because before an I/O event is dispatched to an event handler, the handler is suspended; it is resumed by the reactor after the upcall completes. Therefore, multiple I/O events will not be made to one event handler multiple threads simultaneously. This suspend/resume protection does not apply to either timers scheduled with the reactor or to notifications requested via the reactor. When using timers and/or notifications you must provide proper protection for your class in the context of multiple threads.
Definition at line 176 of file TP_Reactor.h.
ACE_TP_Reactor::ACE_TP_Reactor | ( | ACE_Sig_Handler * | = 0 , |
|
ACE_Timer_Queue * | = 0 , |
|||
int | mask_signals = 1 , |
|||
int | s_queue = ACE_Select_Reactor_Token::FIFO | |||
) |
Initialize ACE_TP_Reactor with the default size.
Definition at line 101 of file TP_Reactor.cpp.
References ACE_TRACE, and ACE_Select_Reactor_Impl::supress_notify_renew().
00105 : ACE_Select_Reactor (sh, tq, ACE_DISABLE_NOTIFY_PIPE_DEFAULT, 0, mask_signals, s_queue) 00106 { 00107 ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor"); 00108 this->supress_notify_renew (1); 00109 }
ACE_TP_Reactor::ACE_TP_Reactor | ( | size_t | max_number_of_handles, | |
int | restart = 0 , |
|||
ACE_Sig_Handler * | sh = 0 , |
|||
ACE_Timer_Queue * | tq = 0 , |
|||
int | mask_signals = 1 , |
|||
int | s_queue = ACE_Select_Reactor_Token::FIFO | |||
) |
Initialize the ACE_TP_Reactor to manage max_number_of_handles. If restart is non-0 then the ACE_Reactor's handle_events()
method will be restarted automatically when EINTR
occurs. If sh or tq are non-0 they are used as the signal handler and timer queue, respectively.
Definition at line 111 of file TP_Reactor.cpp.
References ACE_TRACE, and ACE_Select_Reactor_Impl::supress_notify_renew().
00117 : ACE_Select_Reactor (max_number_of_handles, restart, sh, tq, ACE_DISABLE_NOTIFY_PIPE_DEFAULT, 0, mask_signals, s_queue) 00118 { 00119 ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor"); 00120 this->supress_notify_renew (1); 00121 }
ACE_TP_Reactor::ACE_TP_Reactor | ( | const ACE_TP_Reactor & | ) | [private] |
Deny access since member-wise won't work...
ACE_INLINE void ACE_TP_Reactor::clear_dispatch_mask | ( | ACE_HANDLE | handle, | |
ACE_Reactor_Mask | mask | |||
) | [protected, virtual] |
Template method from the base class.
Reimplemented from ACE_Select_Reactor_Impl.
Definition at line 111 of file TP_Reactor.inl.
References ACE_Select_Reactor_Handle_Set::ex_mask_, ACE_Select_Reactor_Handle_Set::rd_mask_, ACE_Select_Reactor_Impl::ready_set_, ACE_Handle_Set::reset(), and ACE_Select_Reactor_Handle_Set::wr_mask_.
00113 { 00114 this->ready_set_.rd_mask_.reset (); 00115 this->ready_set_.wr_mask_.reset (); 00116 this->ready_set_.ex_mask_.reset (); 00117 }
ACE_INLINE void ACE_TP_Reactor::clear_handle_read_set | ( | ACE_HANDLE | handle | ) | [private] |
Clear the handle from the read_set.
Definition at line 103 of file TP_Reactor.inl.
References ACE_Handle_Set::clr_bit(), ACE_Select_Reactor_Handle_Set::ex_mask_, ACE_Select_Reactor_Handle_Set::rd_mask_, ACE_Select_Reactor_Impl::ready_set_, and ACE_Select_Reactor_Handle_Set::wr_mask_.
Referenced by get_socket_event_info().
00104 { 00105 this->ready_set_.wr_mask_.clr_bit (handle); 00106 this->ready_set_.ex_mask_.clr_bit (handle); 00107 this->ready_set_.rd_mask_.clr_bit (handle); 00108 }
int ACE_TP_Reactor::dispatch_i | ( | ACE_Time_Value * | max_wait_time, | |
ACE_TP_Token_Guard & | guard | |||
) | [protected] |
Dispatch just 1 signal, timer, notification handlers.
Definition at line 177 of file TP_Reactor.cpp.
References get_event_for_dispatching(), handle_notify_events(), handle_socket_events(), handle_timer_events(), and ACE_Select_Reactor_Impl::state_changed_.
Referenced by handle_events().
00179 { 00180 int event_count = this->get_event_for_dispatching (max_wait_time); 00181 00182 // We use this count to detect potential infinite loops as described 00183 // in bug 2540. 00184 int const initial_event_count = event_count; 00185 00186 int result = 0; 00187 00188 // Note: We are passing the <event_count> around, to have record of 00189 // how many events still need processing. May be this could be 00190 // useful in future. 00191 00192 #if 0 00193 // @Ciju 00194 // signal handling isn't in a production state yet. 00195 // Commenting it out for now. 00196 00197 // Dispatch signals 00198 if (event_count == -1) 00199 { 00200 // Looks like we dont do any upcalls in dispatch signals. If at 00201 // a later point of time, we decide to handle signals we have to 00202 // release the lock before we make any upcalls.. What is here 00203 // now is not the right thing... 00204 // 00205 // @@ We need to do better.. 00206 return this->handle_signals (event_count, guard); 00207 } 00208 #endif // #if 0 00209 00210 // If there are no signals and if we had received a proper 00211 // event_count then first look at dispatching timeouts. We need to 00212 // handle timers early since they may have higher latency 00213 // constraints than I/O handlers. Ideally, the order of dispatching 00214 // should be a strategy... 00215 00216 // NOTE: The event count does not have the number of timers that 00217 // needs dispatching. But we are still passing this along. We dont 00218 // need to do that. In the future we *may* have the timers also 00219 // returned through the <event_count>. Just passing that along for 00220 // that day. 00221 result = this->handle_timer_events (event_count, guard); 00222 00223 if (result > 0) 00224 return result; 00225 00226 // Else just go ahead fall through for further handling. 00227 00228 if (event_count > 0) 00229 { 00230 // Next dispatch the notification handlers (if there are any to 00231 // dispatch). These are required to handle multiple-threads 00232 // that are trying to update the <Reactor>. 00233 result = this->handle_notify_events (event_count, guard); 00234 00235 if (result > 0) 00236 return result; 00237 00238 // Else just fall through for further handling 00239 } 00240 00241 if (event_count > 0) 00242 { 00243 // Handle socket events 00244 result = this->handle_socket_events (event_count, guard); 00245 } 00246 00247 if (event_count != 0 && event_count == initial_event_count) 00248 { 00249 this->state_changed_ = true; 00250 } 00251 00252 return result; 00253 }
int ACE_TP_Reactor::dispatch_socket_event | ( | ACE_EH_Dispatch_Info & | dispatch_info | ) | [private] |
Notify the appropriate <callback> in the context of the <eh> associated with <handle> that a particular event has occurred.
Definition at line 557 of file TP_Reactor.cpp.
References ACE_TRACE, ACE_EH_Dispatch_Info::callback_, ACE_EH_Dispatch_Info::event_handler_, ACE_EH_Dispatch_Info::handle_, and post_process_socket_event().
00558 { 00559 ACE_TRACE ("ACE_TP_Reactor::dispatch_socket_event"); 00560 00561 ACE_Event_Handler * const event_handler = dispatch_info.event_handler_; 00562 ACE_EH_PTMF const callback = dispatch_info.callback_; 00563 00564 // Check for removed handlers. 00565 if (event_handler == 0) 00566 return -1; 00567 00568 // Upcall. If the handler returns positive value (requesting a 00569 // reactor callback) don't set the ready-bit because it will be 00570 // ignored if the reactor state has changed. Just call back 00571 // as many times as the handler requests it. Other threads are off 00572 // handling other things. 00573 int status = 1; 00574 while (status > 0) 00575 status = (event_handler->*callback) (dispatch_info.handle_); 00576 00577 // Post process socket event 00578 return this->post_process_socket_event (dispatch_info, status); 00579 }
int ACE_TP_Reactor::get_event_for_dispatching | ( | ACE_Time_Value * | max_wait_time | ) | [protected] |
Get the event that needs dispatching. It could be either a signal, timer, notification handlers or return possibly 1 I/O handler for dispatching. In the most common use case, this would return 1 I/O handler for dispatching
Definition at line 452 of file TP_Reactor.cpp.
References ACE_Select_Reactor_Handle_Set::ex_mask_, ACE_Select_Reactor_Handle_Set::rd_mask_, ACE_Select_Reactor_Impl::ready_set_, ACE_Handle_Set::reset(), ACE_Select_Reactor_Impl::state_changed_, ACE_Handle_Set::sync(), ACE_Select_Reactor_T< ACE_Select_Reactor_Token >::wait_for_multiple_events(), and ACE_Select_Reactor_Handle_Set::wr_mask_.
Referenced by dispatch_i().
00453 { 00454 // If the reactor handler state has changed, clear any remembered 00455 // ready bits and re-scan from the master wait_set. 00456 if (this->state_changed_) 00457 { 00458 this->ready_set_.rd_mask_.reset (); 00459 this->ready_set_.wr_mask_.reset (); 00460 this->ready_set_.ex_mask_.reset (); 00461 00462 this->state_changed_ = false; 00463 } 00464 else 00465 { 00466 // This is a hack... somewhere, under certain conditions (which 00467 // I don't understand...) the mask will have all of its bits clear, 00468 // yet have a size_ > 0. This is an attempt to remedy the affect, 00469 // without knowing why it happens. 00470 00471 this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ()); 00472 this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ()); 00473 this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ()); 00474 } 00475 00476 return this->wait_for_multiple_events (this->ready_set_, max_wait_time); 00477 }
ACE_HANDLE ACE_TP_Reactor::get_notify_handle | ( | void | ) | [private] |
Get the handle of the notify pipe from the ready set if there is an event in the notify pipe.
Definition at line 667 of file TP_Reactor.cpp.
References ACE_Reactor_Notify::notify_handle(), and ACE_Select_Reactor_Impl::notify_handler_.
Referenced by handle_notify_events().
00668 { 00669 // Call the notify handler to get a handle on which we would have a 00670 // notify waiting 00671 ACE_HANDLE const read_handle = 00672 this->notify_handler_->notify_handle (); 00673 00674 // Check whether the rd_mask has been set on that handle. If so 00675 // return the handle. 00676 if (read_handle != ACE_INVALID_HANDLE && 00677 this->ready_set_.rd_mask_.is_set (read_handle)) 00678 { 00679 return read_handle; 00680 } 00681 00682 // None found.. 00683 return ACE_INVALID_HANDLE; 00684 }
int ACE_TP_Reactor::get_socket_event_info | ( | ACE_EH_Dispatch_Info & | info | ) | [private] |
Get socket event dispatch information.
Definition at line 480 of file TP_Reactor.cpp.
References clear_handle_read_set(), ACE_Event_Handler::EXCEPT_MASK, ACE_Event_Handler::handle_exception(), ACE_Event_Handler::handle_input(), ACE_Event_Handler::handle_output(), ACE_Event_Handler::READ_MASK, ACE_EH_Dispatch_Info::set(), and ACE_Event_Handler::WRITE_MASK.
Referenced by handle_socket_events().
00481 { 00482 // Check for dispatch in write, except, read. Only catch one, but if 00483 // one is caught, be sure to clear the handle from each mask in case 00484 // there is more than one mask set for it. This would cause problems 00485 // if the handler is suspended for dispatching, but its set bit in 00486 // another part of ready_set_ kept it from being dispatched. 00487 int found_io = 0; 00488 ACE_HANDLE handle; 00489 00490 // @@todo: We can do quite a bit of code reduction here. Let me get 00491 // it to work before I do this. 00492 { 00493 ACE_Handle_Set_Iterator handle_iter (this->ready_set_.wr_mask_); 00494 00495 while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE) 00496 { 00497 if (this->is_suspended_i (handle)) 00498 continue; 00499 00500 // Remember this info 00501 event.set (handle, 00502 this->handler_rep_.find (handle), 00503 ACE_Event_Handler::WRITE_MASK, 00504 &ACE_Event_Handler::handle_output); 00505 00506 this->clear_handle_read_set (handle); 00507 found_io = 1; 00508 } 00509 } 00510 00511 if (!found_io) 00512 { 00513 ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_); 00514 00515 while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE) 00516 { 00517 if (this->is_suspended_i (handle)) 00518 continue; 00519 00520 // Remember this info 00521 event.set (handle, 00522 this->handler_rep_.find (handle), 00523 ACE_Event_Handler::EXCEPT_MASK, 00524 &ACE_Event_Handler::handle_exception); 00525 00526 this->clear_handle_read_set (handle); 00527 00528 found_io = 1; 00529 } 00530 } 00531 00532 if (!found_io) 00533 { 00534 ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_); 00535 00536 while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE) 00537 { 00538 if (this->is_suspended_i (handle)) 00539 continue; 00540 00541 // Remember this info 00542 event.set (handle, 00543 this->handler_rep_.find (handle), 00544 ACE_Event_Handler::READ_MASK, 00545 &ACE_Event_Handler::handle_input); 00546 00547 this->clear_handle_read_set (handle); 00548 found_io = 1; 00549 } 00550 } 00551 00552 return found_io; 00553 }
int ACE_TP_Reactor::handle_events | ( | ACE_Time_Value & | max_wait_time | ) | [virtual] |
This method is just like the one above, except the max_wait_time value is a reference and can therefore never be NULL.
Current <alertable_handle_events> is identical to <handle_events>.
Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.
Definition at line 646 of file TP_Reactor.cpp.
References handle_events().
00647 { 00648 return this->handle_events (&max_wait_time); 00649 }
int ACE_TP_Reactor::handle_events | ( | ACE_Time_Value * | max_wait_time = 0 |
) | [virtual] |
This event loop driver that blocks for max_wait_time before returning. It will return earlier if timer events, I/O events, or signal events occur. Note that max_wait_time can be 0, in which case this method blocks indefinitely until events occur.
max_wait_time is decremented to reflect how much time this call took. For instance, if a time value of 3 seconds is passed to handle_events and an event occurs after 2 seconds, max_wait_time will equal 1 second. This can be used if an application wishes to handle events for some fixed amount of time.
errno
for more information). Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.
Definition at line 143 of file TP_Reactor.cpp.
References ACE_TRACE, ACE_TP_Token_Guard::acquire_read_token(), dispatch_i(), ACE_TP_Token_Guard::is_owner(), and ACE_Countdown_Time::update().
Referenced by handle_events().
00144 { 00145 ACE_TRACE ("ACE_TP_Reactor::handle_events"); 00146 00147 // Stash the current time -- the destructor of this object will 00148 // automatically compute how much time elapsed since this method was 00149 // called. 00150 ACE_Countdown_Time countdown (max_wait_time); 00151 00152 // 00153 // The order of these events is very subtle, modify with care. 00154 // 00155 00156 // Instantiate the token guard which will try grabbing the token for 00157 // this thread. 00158 ACE_TP_Token_Guard guard (this->token_); 00159 00160 int const result = guard.acquire_read_token (max_wait_time); 00161 00162 // If the guard is NOT the owner just return the retval 00163 if (!guard.is_owner ()) 00164 return result; 00165 00166 // After getting the lock just just for deactivation.. 00167 if (this->deactivated_) 00168 return -1; 00169 00170 // Update the countdown to reflect time waiting for the token. 00171 countdown.update (); 00172 00173 return this->dispatch_i (max_wait_time, guard); 00174 }
int ACE_TP_Reactor::handle_notify_events | ( | int & | event_count, | |
ACE_TP_Token_Guard & | g | |||
) | [protected] |
Handle notify events.
Definition at line 347 of file TP_Reactor.cpp.
References ACE_Handle_Set::clr_bit(), ACE_Reactor_Notify::dispatch_notify(), get_notify_handle(), notify_handle(), ACE_Select_Reactor_Impl::notify_handler_, ACE_Select_Reactor_Handle_Set::rd_mask_, ACE_Select_Reactor_Impl::ready_set_, and ACE_TP_Token_Guard::release_token().
Referenced by dispatch_i().
00349 { 00350 // Get the handle on which notify calls could have occured 00351 ACE_HANDLE notify_handle = this->get_notify_handle (); 00352 00353 int result = 0; 00354 00355 // The notify was not in the list returned by 00356 // wait_for_multiple_events (). 00357 if (notify_handle == ACE_INVALID_HANDLE) 00358 return result; 00359 00360 // Now just do a read on the pipe.. 00361 ACE_Notification_Buffer buffer; 00362 00363 // Clear the handle of the read_mask of our <ready_set_> 00364 this->ready_set_.rd_mask_.clr_bit (notify_handle); 00365 00366 // Keep reading notifies till we empty it or till we have a 00367 // dispatchable buffer 00368 while (this->notify_handler_->read_notify_pipe (notify_handle, buffer) > 0) 00369 { 00370 // Just figure out whether we can read any buffer that has 00371 // dispatchable info. If not we have just been unblocked by 00372 // another thread trying to update the reactor. If we get any 00373 // buffer that needs dispatching we will dispatch that after 00374 // releasing the lock 00375 if (this->notify_handler_->is_dispatchable (buffer) > 0) 00376 { 00377 // Release the token before dispatching notifies... 00378 guard.release_token (); 00379 00380 // Dispatch the upcall for the notify 00381 this->notify_handler_->dispatch_notify (buffer); 00382 00383 // We had a successful dispatch. 00384 result = 1; 00385 00386 // break out of the while loop 00387 break; 00388 } 00389 } 00390 00391 // If we did some work, then we just return 1 which will allow us 00392 // to get out of here. If we return 0, then we will be asked to do 00393 // some work ie. dispacth socket events 00394 return result; 00395 }
int ACE_TP_Reactor::handle_socket_events | ( | int & | event_count, | |
ACE_TP_Token_Guard & | g | |||
) | [protected] |
handle socket events
Definition at line 398 of file TP_Reactor.cpp.
References ACE_Event_Handler::add_reference(), ACE_EH_Dispatch_Info::dispatch(), ACE_EH_Dispatch_Info::event_handler_, get_socket_event_info(), ACE_EH_Dispatch_Info::handle_, ACE_Select_Reactor_Impl::handler_rep_, ACE_EH_Dispatch_Info::mask_, ACE_EH_Dispatch_Info::reference_counting_required_, ACE_TP_Token_Guard::release_token(), and ACE_Select_Reactor_Handler_Repository::unbind().
Referenced by dispatch_i().
00400 { 00401 00402 // We got the lock, lets handle some I/O events. 00403 ACE_EH_Dispatch_Info dispatch_info; 00404 00405 this->get_socket_event_info (dispatch_info); 00406 00407 // If there is any event handler that is ready to be dispatched, the 00408 // dispatch information is recorded in dispatch_info. 00409 if (!dispatch_info.dispatch ()) 00410 { 00411 // Check for removed handlers. 00412 if (dispatch_info.event_handler_ == 0) 00413 { 00414 this->handler_rep_.unbind(dispatch_info.handle_, 00415 dispatch_info.mask_); 00416 } 00417 00418 00419 return 0; 00420 } 00421 00422 // Suspend the handler so that other threads don't start dispatching 00423 // it, if we can't suspend then return directly 00424 // 00425 // NOTE: This check was performed in older versions of the 00426 // TP_Reactor. Looks like it is a waste.. 00427 if (dispatch_info.event_handler_ != this->notify_handler_) 00428 if (this->suspend_i (dispatch_info.handle_) == -1) 00429 return 0; 00430 00431 // Call add_reference() if needed. 00432 if (dispatch_info.reference_counting_required_) 00433 dispatch_info.event_handler_->add_reference (); 00434 00435 // Release the lock. Others threads can start waiting. 00436 guard.release_token (); 00437 00438 int result = 0; 00439 00440 // If there was an event handler ready, dispatch it. 00441 // Decrement the event left 00442 --event_count; 00443 00444 // Dispatched an event 00445 if (this->dispatch_socket_event (dispatch_info) == 0) 00446 ++result; 00447 00448 return result; 00449 }
int ACE_TP_Reactor::handle_timer_events | ( | int & | event_count, | |
ACE_TP_Token_Guard & | g | |||
) | [protected] |
Handle timer events.
Definition at line 307 of file TP_Reactor.cpp.
References ACE_Timer_Node_Dispatch_Info, ACE_Timer_Queue_T< TYPE, FUNCTOR, ACE_LOCK >::postinvoke(), ACE_Timer_Queue_T< TYPE, FUNCTOR, ACE_LOCK >::preinvoke(), ACE_TP_Token_Guard::release_token(), ACE_Select_Reactor_Impl::timer_queue_, ACE_Timer_Queue_T< TYPE, FUNCTOR, ACE_LOCK >::timer_skew(), and ACE_Timer_Queue_T< TYPE, FUNCTOR, ACE_LOCK >::upcall().
Referenced by dispatch_i().
00309 { 00310 if (this->timer_queue_ == 0 || this->timer_queue_->is_empty()) 00311 { // Empty timer queue so cannot have any expired timers. 00312 return 0; 00313 } 00314 00315 // Get the current time 00316 ACE_Time_Value cur_time (this->timer_queue_->gettimeofday () + 00317 this->timer_queue_->timer_skew ()); 00318 00319 // Look for a node in the timer queue whose timer <= the present 00320 // time. 00321 ACE_Timer_Node_Dispatch_Info info; 00322 00323 if (this->timer_queue_->dispatch_info (cur_time, info)) 00324 { 00325 const void *upcall_act = 0; 00326 00327 // Preinvoke. 00328 this->timer_queue_->preinvoke (info, cur_time, upcall_act); 00329 00330 // Release the token before dispatching notifies... 00331 guard.release_token (); 00332 00333 // call the functor 00334 this->timer_queue_->upcall (info, cur_time); 00335 00336 // Postinvoke 00337 this->timer_queue_->postinvoke (info, cur_time, upcall_act); 00338 00339 // We have dispatched a timer 00340 return 1; 00341 } 00342 00343 return 0; 00344 }
ACE_INLINE void ACE_TP_Reactor::no_op_sleep_hook | ( | void * | ) | [static] |
Called from handle events.
Definition at line 98 of file TP_Reactor.inl.
Referenced by ACE_TP_Token_Guard::acquire_read_token().
void ACE_TP_Reactor::notify_handle | ( | ACE_HANDLE | handle, | |
ACE_Reactor_Mask | mask, | |||
ACE_Handle_Set & | , | |||
ACE_Event_Handler * | eh, | |||
ACE_EH_PTMF | callback | |||
) | [protected, virtual] |
This method shouldn't get called.
Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.
Definition at line 652 of file TP_Reactor.cpp.
References ACE_ASSERT, ACE_ERROR, ACE_TEXT, and LM_ERROR.
Referenced by handle_notify_events().
00657 { 00658 ACE_ERROR ((LM_ERROR, 00659 ACE_TEXT ("ACE_TP_Reactor::notify_handle: ") 00660 ACE_TEXT ("Wrong version of notify_handle() got called \n"))); 00661 00662 ACE_ASSERT (eh == 0); 00663 ACE_UNUSED_ARG (eh); 00664 }
ACE_TP_Reactor& ACE_TP_Reactor::operator= | ( | const ACE_TP_Reactor & | ) | [private] |
int ACE_TP_Reactor::owner | ( | ACE_thread_t * | t_id | ) | [virtual] |
Return the thread ID of the current Leader.
Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.
Definition at line 134 of file TP_Reactor.cpp.
References ACE_TRACE, and ACE_Thread::self().
00135 { 00136 ACE_TRACE ("ACE_TP_Reactor::owner"); 00137 *t_id = ACE_Thread::self (); 00138 00139 return 0; 00140 }
int ACE_TP_Reactor::owner | ( | ACE_thread_t | n_id, | |
ACE_thread_t * | o_id = 0 | |||
) | [virtual] |
The ACE_TP_Reactor implementation does not have a single owner thread. Attempts to set the owner explicitly are ignored. The reported owner thread is the current Leader in the pattern.
Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.
Definition at line 124 of file TP_Reactor.cpp.
References ACE_TRACE, and ACE_Thread::self().
00125 { 00126 ACE_TRACE ("ACE_TP_Reactor::owner"); 00127 if (o_id) 00128 *o_id = ACE_Thread::self (); 00129 00130 return 0; 00131 }
int ACE_TP_Reactor::post_process_socket_event | ( | ACE_EH_Dispatch_Info & | dispatch_info, | |
int | status | |||
) | [private] |
Definition at line 582 of file TP_Reactor.cpp.
References ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER, ACE_TP_Token_Guard::acquire_token(), ACE_EH_Dispatch_Info::event_handler_, ACE_Select_Reactor_Handler_Repository::find(), ACE_EH_Dispatch_Info::handle_, ACE_Select_Reactor_Impl::handler_rep_, ACE_TP_Token_Guard::is_owner(), ACE_EH_Dispatch_Info::mask_, ACE_EH_Dispatch_Info::reference_counting_required_, ACE_Select_Reactor_T< ACE_Select_Reactor_Token >::remove_handler_i(), ACE_Event_Handler::remove_reference(), ACE_EH_Dispatch_Info::resume_flag_, and ACE_Select_Reactor_T< ACE_Select_Reactor_Token >::resume_i().
Referenced by dispatch_socket_event().
00584 { 00585 int result = 0; 00586 00587 // First check if we really have to post process something, if not, then 00588 // we don't acquire the token which saves us a lot of time. 00589 if (status < 0 || 00590 (dispatch_info.event_handler_ != this->notify_handler_ && 00591 dispatch_info.resume_flag_ == 00592 ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)) 00593 { 00594 // Get the reactor token and with this token acquired remove first the 00595 // handler and resume it at the same time. This must be atomic, see also 00596 // bugzilla 2395. When this is not atomic it can be that we resume the 00597 // handle after it is reused by the OS. 00598 ACE_TP_Token_Guard guard (this->token_); 00599 00600 result = guard.acquire_token (); 00601 00602 // If the guard is NOT the owner just return the retval 00603 if (!guard.is_owner ()) 00604 return result; 00605 00606 // A different event handler may have been registered during the 00607 // upcall if the handle was closed and then reopened, for 00608 // example. Make sure we're removing and/or resuming the event 00609 // handler used during the upcall. 00610 ACE_Event_Handler const * const eh = 00611 this->handler_rep_.find (dispatch_info.handle_); 00612 00613 // Only remove or resume the event handler used during the 00614 // upcall. 00615 if (eh == dispatch_info.event_handler_) 00616 { 00617 if (status < 0) 00618 { 00619 result = 00620 this->remove_handler_i (dispatch_info.handle_, 00621 dispatch_info.mask_); 00622 } 00623 00624 // Resume handler if required. 00625 if (dispatch_info.event_handler_ != this->notify_handler_ && 00626 dispatch_info.resume_flag_ == 00627 ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER) 00628 this->resume_i (dispatch_info.handle_); 00629 } 00630 } 00631 00632 // Call remove_reference() if needed. 00633 if (dispatch_info.reference_counting_required_) 00634 dispatch_info.event_handler_->remove_reference (); 00635 00636 return result; 00637 }
int ACE_TP_Reactor::resumable_handler | ( | void | ) | [virtual] |
Does the reactor allow the application to resume the handle on its own ie. can it pass on the control of handle resumption to the application. The TP reactor has can allow applications to resume handles. So return a positive value.
Reimplemented from ACE_Select_Reactor_Impl.
Definition at line 640 of file TP_Reactor.cpp.
Declare the dynamic allocation hooks.
Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.
Definition at line 240 of file TP_Reactor.h.