ACE_TP_Reactor Class Reference

Specialization of ACE_Select_Reactor to support thread-pool based event dispatching. More...

#include <TP_Reactor.h>

Inheritance diagram for ACE_TP_Reactor:

Inheritance graph
[legend]
Collaboration diagram for ACE_TP_Reactor:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ACE_TP_Reactor (ACE_Sig_Handler *=0, ACE_Timer_Queue *=0, int mask_signals=1, int s_queue=ACE_Select_Reactor_Token::FIFO)
 Initialize ACE_TP_Reactor with the default size.

 ACE_TP_Reactor (size_t max_number_of_handles, int restart=0, ACE_Sig_Handler *sh=0, ACE_Timer_Queue *tq=0, int mask_signals=1, int s_queue=ACE_Select_Reactor_Token::FIFO)
virtual int handle_events (ACE_Time_Value *max_wait_time=0)
virtual int handle_events (ACE_Time_Value &max_wait_time)
virtual int resumable_handler (void)
virtual int owner (ACE_thread_t n_id, ACE_thread_t *o_id=0)
virtual int owner (ACE_thread_t *t_id)
 Return the thread ID of the current Leader.


Static Public Member Functions

void no_op_sleep_hook (void *)
 Called from handle events.


Public Attributes

 ACE_ALLOC_HOOK_DECLARE
 Declare the dynamic allocation hooks.


Protected Member Functions

virtual void clear_dispatch_mask (ACE_HANDLE handle, ACE_Reactor_Mask mask)
 Template method from the base class.

int dispatch_i (ACE_Time_Value *max_wait_time, ACE_TP_Token_Guard &guard)
 Dispatch just 1 signal, timer, notification handlers.

int get_event_for_dispatching (ACE_Time_Value *max_wait_time)
int handle_timer_events (int &event_count, ACE_TP_Token_Guard &g)
 Handle timer events.

int handle_notify_events (int &event_count, ACE_TP_Token_Guard &g)
 Handle notify events.

int handle_socket_events (int &event_count, ACE_TP_Token_Guard &g)
 handle socket events

virtual void notify_handle (ACE_HANDLE handle, ACE_Reactor_Mask mask, ACE_Handle_Set &, ACE_Event_Handler *eh, ACE_EH_PTMF callback)
 This method shouldn't get called.


Private Member Functions

ACE_HANDLE get_notify_handle (void)
int get_socket_event_info (ACE_EH_Dispatch_Info &info)
 Get socket event dispatch information.

int dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info)
void clear_handle_read_set (ACE_HANDLE handle)
 Clear the handle from the read_set.

int post_process_socket_event (ACE_EH_Dispatch_Info &dispatch_info, int status)
 ACE_TP_Reactor (const ACE_TP_Reactor &)
 Deny access since member-wise won't work...

ACE_TP_Reactoroperator= (const ACE_TP_Reactor &)

Detailed Description

Specialization of ACE_Select_Reactor to support thread-pool based event dispatching.

One of the shortcomings of the ACE_Select_Reactor is that it does not support a thread pool-based event dispatching model, similar to the one in ACE_WFMO_Reactor. In ACE_Select_Reactor, only thread can call handle_events() at any given time. ACE_TP_Reactor removes this short-coming.

ACE_TP_Reactor is a specialization of ACE_Select_Reactor to support thread pool-based event dispatching. This reactor takes advantage of the fact that events reported by select() are persistent if not acted upon immediately. It works by remembering the event handler which was just activated, suspending it for further I/O activities, releasing the internal lock (so that another thread can start waiting in the event loop) and then dispatching the event's handler outside the scope of the reactor lock. After the event handler has been dispatched the event handler is resumed for further I/O activity.

This reactor implementation is best suited for situations when the callbacks to event handlers can take arbitrarily long and/or a number of threads are available to run the event loop. Note that I/O-processing callback code in event handlers (e.g. handle_input()) does not have to be modified or made thread-safe for this reactor. This is because before an I/O event is dispatched to an event handler, the handler is suspended; it is resumed by the reactor after the upcall completes. Therefore, multiple I/O events will not be made to one event handler multiple threads simultaneously. This suspend/resume protection does not apply to either timers scheduled with the reactor or to notifications requested via the reactor. When using timers and/or notifications you must provide proper protection for your class in the context of multiple threads.

Definition at line 176 of file TP_Reactor.h.


Constructor & Destructor Documentation

ACE_TP_Reactor::ACE_TP_Reactor ACE_Sig_Handler = 0,
ACE_Timer_Queue = 0,
int  mask_signals = 1,
int  s_queue = ACE_Select_Reactor_Token::FIFO
 

Initialize ACE_TP_Reactor with the default size.

Definition at line 101 of file TP_Reactor.cpp.

References ACE_DISABLE_NOTIFY_PIPE_DEFAULT, ACE_Select_Reactor, ACE_Timer_Queue, ACE_TRACE, and ACE_Select_Reactor_Impl::supress_notify_renew().

00105   : ACE_Select_Reactor (sh, tq, ACE_DISABLE_NOTIFY_PIPE_DEFAULT, 0, mask_signals, s_queue)
00106 {
00107   ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00108   this->supress_notify_renew (1);
00109 }

ACE_TP_Reactor::ACE_TP_Reactor size_t  max_number_of_handles,
int  restart = 0,
ACE_Sig_Handler sh = 0,
ACE_Timer_Queue tq = 0,
int  mask_signals = 1,
int  s_queue = ACE_Select_Reactor_Token::FIFO
 

Initialize the ACE_TP_Reactor to manage max_number_of_handles. If restart is non-0 then the ACE_Reactor's handle_events() method will be restarted automatically when EINTR occurs. If sh or tq are non-0 they are used as the signal handler and timer queue, respectively.

Definition at line 111 of file TP_Reactor.cpp.

References ACE_DISABLE_NOTIFY_PIPE_DEFAULT, ACE_Select_Reactor, ACE_Timer_Queue, ACE_TRACE, and ACE_Select_Reactor_Impl::supress_notify_renew().

00117   : ACE_Select_Reactor (max_number_of_handles, restart, sh, tq, ACE_DISABLE_NOTIFY_PIPE_DEFAULT, 0, mask_signals, s_queue)
00118 {
00119   ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00120   this->supress_notify_renew (1);
00121 }

ACE_TP_Reactor::ACE_TP_Reactor const ACE_TP_Reactor  )  [private]
 

Deny access since member-wise won't work...


Member Function Documentation

ACE_INLINE void ACE_TP_Reactor::clear_dispatch_mask ACE_HANDLE  handle,
ACE_Reactor_Mask  mask
[protected, virtual]
 

Template method from the base class.

Reimplemented from ACE_Select_Reactor_Impl.

Definition at line 111 of file TP_Reactor.inl.

References ACE_Reactor_Mask, ACE_Select_Reactor_Handle_Set::ex_mask_, ACE_Select_Reactor_Handle_Set::rd_mask_, ACE_Handle_Set::reset(), and ACE_Select_Reactor_Handle_Set::wr_mask_.

00113 {
00114   this->ready_set_.rd_mask_.reset ();
00115   this->ready_set_.wr_mask_.reset ();
00116   this->ready_set_.ex_mask_.reset ();
00117 }

ACE_INLINE void ACE_TP_Reactor::clear_handle_read_set ACE_HANDLE  handle  )  [private]
 

Clear the handle from the read_set.

Definition at line 103 of file TP_Reactor.inl.

References ACE_Handle_Set::clr_bit(), ACE_Select_Reactor_Handle_Set::ex_mask_, ACE_Select_Reactor_Handle_Set::rd_mask_, and ACE_Select_Reactor_Handle_Set::wr_mask_.

Referenced by get_socket_event_info().

00104 {
00105   this->ready_set_.wr_mask_.clr_bit (handle);
00106   this->ready_set_.ex_mask_.clr_bit (handle);
00107   this->ready_set_.rd_mask_.clr_bit (handle);
00108 }

int ACE_TP_Reactor::dispatch_i ACE_Time_Value max_wait_time,
ACE_TP_Token_Guard guard
[protected]
 

Dispatch just 1 signal, timer, notification handlers.

Definition at line 178 of file TP_Reactor.cpp.

References get_event_for_dispatching(), handle_notify_events(), handle_socket_events(), and handle_timer_events().

Referenced by handle_events().

00180 {
00181   int event_count =
00182     this->get_event_for_dispatching (max_wait_time);
00183 
00184   // We use this count to detect potential infinite loops as described
00185   // in bug 2540.
00186   int initial_event_count = event_count;
00187 
00188   int result = 0;
00189 
00190   // Note: We are passing the <event_count> around, to have record of
00191   // how many events still need processing. May be this could be
00192   // useful in future.
00193 
00194 #if 0
00195   // @Ciju
00196   // signal handling isn't in a production state yet.
00197   // Commenting it out for now.
00198 
00199   // Dispatch signals
00200   if (event_count == -1)
00201     {
00202       // Looks like we dont do any upcalls in dispatch signals. If at
00203       // a later point of time, we decide to handle signals we have to
00204       // release the lock before we make any upcalls.. What is here
00205       // now is not the right thing...
00206       //
00207       // @@ We need to do better..
00208       return this->handle_signals (event_count,
00209                                    guard);
00210     }
00211 #endif // #if 0
00212 
00213   // If there are no signals and if we had received a proper
00214   // event_count then first look at dispatching timeouts. We need to
00215   // handle timers early since they may have higher latency
00216   // constraints than I/O handlers.  Ideally, the order of dispatching
00217   // should be a strategy...
00218 
00219   // NOTE: The event count does not have the number of timers that
00220   // needs dispatching. But we are still passing this along. We dont
00221   // need to do that. In the future we *may* have the timers also
00222   // returned through the <event_count>. Just passing that along for
00223   // that day.
00224   result = this->handle_timer_events (event_count,
00225                                       guard);
00226 
00227   if (result > 0)
00228     return result;
00229 
00230   // Else just go ahead fall through for further handling.
00231 
00232   if (event_count > 0)
00233     {
00234       // Next dispatch the notification handlers (if there are any to
00235       // dispatch).  These are required to handle multiple-threads
00236       // that are trying to update the <Reactor>.
00237       result = this->handle_notify_events (event_count,
00238                                            guard);
00239 
00240       if (result > 0)
00241         return result;
00242 
00243       // Else just fall through for further handling
00244     }
00245 
00246   if (event_count > 0)
00247     {
00248       // Handle socket events
00249       result = this->handle_socket_events (event_count,
00250                                            guard);
00251     }
00252 
00253   if (event_count != 0
00254       && event_count == initial_event_count)
00255     {
00256       this->state_changed_ = true;
00257     }
00258 
00259   return result;
00260 }

int ACE_TP_Reactor::dispatch_socket_event ACE_EH_Dispatch_Info dispatch_info  )  [private]
 

Notify the appropriate in the context of the associated with that a particular event has occurred.

Definition at line 573 of file TP_Reactor.cpp.

References ACE_EH_PTMF, ACE_TRACE, ACE_EH_Dispatch_Info::callback_, ACE_EH_Dispatch_Info::event_handler_, ACE_EH_Dispatch_Info::handle_, and post_process_socket_event().

Referenced by handle_socket_events().

00574 {
00575   ACE_TRACE ("ACE_TP_Reactor::dispatch_socket_event");
00576 
00577   ACE_Event_Handler * const event_handler = dispatch_info.event_handler_;
00578   ACE_EH_PTMF const callback = dispatch_info.callback_;
00579 
00580   // Check for removed handlers.
00581   if (event_handler == 0)
00582     return -1;
00583 
00584   // Upcall. If the handler returns positive value (requesting a
00585   // reactor callback) don't set the ready-bit because it will be
00586   // ignored if the reactor state has changed. Just call back
00587   // as many times as the handler requests it. Other threads are off
00588   // handling other things.
00589   int status = 1;
00590   while (status > 0)
00591     status = (event_handler->*callback) (dispatch_info.handle_);
00592 
00593   // Post process socket event
00594   return this->post_process_socket_event (dispatch_info, status);
00595 }

int ACE_TP_Reactor::get_event_for_dispatching ACE_Time_Value max_wait_time  )  [protected]
 

Get the event that needs dispatching. It could be either a signal, timer, notification handlers or return possibly 1 I/O handler for dispatching. In the most common use case, this would return 1 I/O handler for dispatching

Definition at line 467 of file TP_Reactor.cpp.

References ACE_Select_Reactor_Handle_Set::ex_mask_, ACE_Select_Reactor_Handle_Set::rd_mask_, ACE_Handle_Set::reset(), ACE_Handle_Set::sync(), ACE_Select_Reactor_T< ACE_SELECT_REACTOR_TOKEN >::wait_for_multiple_events(), and ACE_Select_Reactor_Handle_Set::wr_mask_.

Referenced by dispatch_i().

00468 {
00469   // If the reactor handler state has changed, clear any remembered
00470   // ready bits and re-scan from the master wait_set.
00471   if (this->state_changed_)
00472     {
00473       this->ready_set_.rd_mask_.reset ();
00474       this->ready_set_.wr_mask_.reset ();
00475       this->ready_set_.ex_mask_.reset ();
00476 
00477       this->state_changed_ = false;
00478     }
00479   else
00480     {
00481       // This is a hack... somewhere, under certain conditions (which
00482       // I don't understand...) the mask will have all of its bits clear,
00483       // yet have a size_ > 0. This is an attempt to remedy the affect,
00484       // without knowing why it happens.
00485 
00486       this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ());
00487       this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ());
00488       this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ());
00489     }
00490 
00491   return this->wait_for_multiple_events (this->ready_set_,
00492                                          max_wait_time);
00493 }

ACE_HANDLE ACE_TP_Reactor::get_notify_handle void   )  [private]
 

Get the handle of the notify pipe from the ready set if there is an event in the notify pipe.

Definition at line 683 of file TP_Reactor.cpp.

References ACE_Handle_Set::is_set(), ACE_Reactor_Notify::notify_handle(), and ACE_Select_Reactor_Handle_Set::rd_mask_.

Referenced by handle_notify_events().

00684 {
00685   // Call the notify handler to get a handle on which we would have a
00686   // notify waiting
00687   ACE_HANDLE const read_handle =
00688     this->notify_handler_->notify_handle ();
00689 
00690   // Check whether the rd_mask has been set on that handle. If so
00691   // return the handle.
00692   if (read_handle != ACE_INVALID_HANDLE &&
00693       this->ready_set_.rd_mask_.is_set (read_handle))
00694     {
00695       return read_handle;
00696     }
00697 
00698   // None found..
00699   return ACE_INVALID_HANDLE;
00700 }

int ACE_TP_Reactor::get_socket_event_info ACE_EH_Dispatch_Info info  )  [private]
 

Get socket event dispatch information.

Definition at line 496 of file TP_Reactor.cpp.

References clear_handle_read_set(), ACE_Select_Reactor_T< ACE_SELECT_REACTOR_TOKEN >::is_suspended_i(), and ACE_EH_Dispatch_Info::set().

Referenced by handle_socket_events().

00497 {
00498   // Check for dispatch in write, except, read. Only catch one, but if
00499   // one is caught, be sure to clear the handle from each mask in case
00500   // there is more than one mask set for it. This would cause problems
00501   // if the handler is suspended for dispatching, but its set bit in
00502   // another part of ready_set_ kept it from being dispatched.
00503   int found_io = 0;
00504   ACE_HANDLE handle;
00505 
00506   // @@todo: We can do quite a bit of code reduction here. Let me get
00507   // it to work before I do this.
00508   {
00509     ACE_Handle_Set_Iterator handle_iter (this->ready_set_.wr_mask_);
00510 
00511     while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00512       {
00513         if (this->is_suspended_i (handle))
00514           continue;
00515 
00516         // Remember this info
00517         event.set (handle,
00518                    this->handler_rep_.find (handle),
00519                    ACE_Event_Handler::WRITE_MASK,
00520                    &ACE_Event_Handler::handle_output);
00521 
00522         this->clear_handle_read_set (handle);
00523         found_io = 1;
00524       }
00525   }
00526 
00527   if (!found_io)
00528     {
00529       ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_);
00530 
00531       while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00532         {
00533           if (this->is_suspended_i (handle))
00534             continue;
00535 
00536           // Remember this info
00537           event.set (handle,
00538                      this->handler_rep_.find (handle),
00539                      ACE_Event_Handler::EXCEPT_MASK,
00540                      &ACE_Event_Handler::handle_exception);
00541 
00542           this->clear_handle_read_set (handle);
00543 
00544           found_io = 1;
00545         }
00546     }
00547 
00548   if (!found_io)
00549     {
00550       ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
00551 
00552       while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00553         {
00554           if (this->is_suspended_i (handle))
00555             continue;
00556 
00557           // Remember this info
00558           event.set (handle,
00559                      this->handler_rep_.find (handle),
00560                      ACE_Event_Handler::READ_MASK,
00561                      &ACE_Event_Handler::handle_input);
00562 
00563           this->clear_handle_read_set (handle);
00564           found_io = 1;
00565         }
00566     }
00567 
00568   return found_io;
00569 }

int ACE_TP_Reactor::handle_events ACE_Time_Value max_wait_time  )  [virtual]
 

This method is just like the one above, except the max_wait_time value is a reference and can therefore never be NULL.

Current is identical to .

Reimplemented from ACE_Select_Reactor_T< ACE_SELECT_REACTOR_TOKEN >.

Definition at line 662 of file TP_Reactor.cpp.

References handle_events().

00663 {
00664   return this->handle_events (&max_wait_time);
00665 }

int ACE_TP_Reactor::handle_events ACE_Time_Value max_wait_time = 0  )  [virtual]
 

This event loop driver that blocks for max_wait_time before returning. It will return earlier if timer events, I/O events, or signal events occur. Note that max_wait_time can be 0, in which case this method blocks indefinitely until events occur.

max_wait_time is decremented to reflect how much time this call took. For instance, if a time value of 3 seconds is passed to handle_events and an event occurs after 2 seconds, max_wait_time will equal 1 second. This can be used if an application wishes to handle events for some fixed amount of time.

Returns:
The total number of events that were dispatched; 0 if the max_wait_time elapsed without dispatching any handlers, or -1 if an error occurs (check errno for more information).

Reimplemented from ACE_Select_Reactor_T< ACE_SELECT_REACTOR_TOKEN >.

Definition at line 143 of file TP_Reactor.cpp.

References ACE_TRACE, ACE_TP_Token_Guard::acquire_read_token(), dispatch_i(), ACE_TP_Token_Guard::is_owner(), and ACE_Countdown_Time::update().

Referenced by handle_events().

00144 {
00145   ACE_TRACE ("ACE_TP_Reactor::handle_events");
00146 
00147   // Stash the current time -- the destructor of this object will
00148   // automatically compute how much time elapsed since this method was
00149   // called.
00150   ACE_Countdown_Time countdown (max_wait_time);
00151 
00152   //
00153   // The order of these events is very subtle, modify with care.
00154   //
00155 
00156   // Instantiate the token guard which will try grabbing the token for
00157   // this thread.
00158   ACE_TP_Token_Guard guard (this->token_);
00159 
00160   int const result = guard.acquire_read_token (max_wait_time);
00161 
00162   // If the guard is NOT the owner just return the retval
00163   if (!guard.is_owner ())
00164     return result;
00165 
00166   // After getting the lock just just for deactivation..
00167   if (this->deactivated_)
00168     return -1;
00169 
00170   // Update the countdown to reflect time waiting for the token.
00171   countdown.update ();
00172 
00173   return this->dispatch_i (max_wait_time,
00174                            guard);
00175 }

int ACE_TP_Reactor::handle_notify_events int &  event_count,
ACE_TP_Token_Guard g
[protected]
 

Handle notify events.

Definition at line 360 of file TP_Reactor.cpp.

References ACE_Handle_Set::clr_bit(), ACE_Reactor_Notify::dispatch_notify(), get_notify_handle(), ACE_Reactor_Notify::is_dispatchable(), ACE_Select_Reactor_Handle_Set::rd_mask_, ACE_Reactor_Notify::read_notify_pipe(), and ACE_TP_Token_Guard::release_token().

Referenced by dispatch_i().

00362 {
00363   // Get the handle on which notify calls could have occured
00364   ACE_HANDLE notify_handle =
00365     this->get_notify_handle ();
00366 
00367   int result = 0;
00368 
00369   // The notify was not in the list returned by
00370   // wait_for_multiple_events ().
00371   if (notify_handle == ACE_INVALID_HANDLE)
00372     return result;
00373 
00374   // Now just do a read on the pipe..
00375   ACE_Notification_Buffer buffer;
00376 
00377   // Clear the handle of the read_mask of our <ready_set_>
00378   this->ready_set_.rd_mask_.clr_bit (notify_handle);
00379 
00380   // Keep reading notifies till we empty it or till we have a
00381   // dispatchable buffer
00382   while (this->notify_handler_->read_notify_pipe (notify_handle,
00383                                                   buffer) > 0)
00384     {
00385       // Just figure out whether we can read any buffer that has
00386       // dispatchable info. If not we have just been unblocked by
00387       // another thread trying to update the reactor. If we get any
00388       // buffer that needs dispatching we will dispatch that after
00389       // releasing the lock
00390       if (this->notify_handler_->is_dispatchable (buffer) > 0)
00391         {
00392           // Release the token before dispatching notifies...
00393           guard.release_token ();
00394 
00395           // Dispatch the upcall for the notify
00396           this->notify_handler_->dispatch_notify (buffer);
00397 
00398           // We had a successful dispatch.
00399           result = 1;
00400 
00401           // break out of the while loop
00402           break;
00403         }
00404     }
00405 
00406   // If we did some work, then we just return 1 which will allow us
00407   // to get out of here. If we return 0, then we will be asked to do
00408   // some work ie. dispacth socket events
00409   return result;
00410 }

int ACE_TP_Reactor::handle_socket_events int &  event_count,
ACE_TP_Token_Guard g
[protected]
 

handle socket events

Definition at line 413 of file TP_Reactor.cpp.

References ACE_Event_Handler::add_reference(), ACE_EH_Dispatch_Info::dispatch(), dispatch_socket_event(), ACE_EH_Dispatch_Info::event_handler_, get_socket_event_info(), ACE_EH_Dispatch_Info::handle_, ACE_EH_Dispatch_Info::mask_, ACE_EH_Dispatch_Info::reference_counting_required_, ACE_TP_Token_Guard::release_token(), ACE_Select_Reactor_T< ACE_SELECT_REACTOR_TOKEN >::suspend_i(), and ACE_Select_Reactor_Handler_Repository::unbind().

Referenced by dispatch_i().

00415 {
00416 
00417   // We got the lock, lets handle some I/O events.
00418   ACE_EH_Dispatch_Info dispatch_info;
00419 
00420   this->get_socket_event_info (dispatch_info);
00421 
00422   // If there is any event handler that is ready to be dispatched, the
00423   // dispatch information is recorded in dispatch_info.
00424   if (!dispatch_info.dispatch ())
00425     {
00426       // Check for removed handlers.
00427       if (dispatch_info.event_handler_ == 0)
00428         {
00429           this->handler_rep_.unbind(dispatch_info.handle_,
00430                                     dispatch_info.mask_);
00431         }
00432 
00433 
00434       return 0;
00435     }
00436 
00437   // Suspend the handler so that other threads don't start dispatching
00438   // it, if we can't suspend then return directly
00439   //
00440   // NOTE: This check was performed in older versions of the
00441   // TP_Reactor. Looks like it is a waste..
00442   if (dispatch_info.event_handler_ != this->notify_handler_)
00443     if (this->suspend_i (dispatch_info.handle_) == -1)
00444       return 0;
00445 
00446   // Call add_reference() if needed.
00447   if (dispatch_info.reference_counting_required_)
00448     dispatch_info.event_handler_->add_reference ();
00449 
00450   // Release the lock.  Others threads can start waiting.
00451   guard.release_token ();
00452 
00453   int result = 0;
00454 
00455   // If there was an event handler ready, dispatch it.
00456   // Decrement the event left
00457   --event_count;
00458 
00459   // Dispatched an event
00460   if (this->dispatch_socket_event (dispatch_info) == 0)
00461     ++result;
00462 
00463   return result;
00464 }

int ACE_TP_Reactor::handle_timer_events int &  event_count,
ACE_TP_Token_Guard g
[protected]
 

Handle timer events.

Definition at line 314 of file TP_Reactor.cpp.

References ACE_Timer_Node_Dispatch_Info, ACE_Timer_Queue_T< TYPE, FUNCTOR, ACE_LOCK >::dispatch_info(), ACE_Timer_Queue_T< TYPE, FUNCTOR, ACE_LOCK >::is_empty(), ACE_Timer_Queue_T< TYPE, FUNCTOR, ACE_LOCK >::postinvoke(), ACE_Timer_Queue_T< TYPE, FUNCTOR, ACE_LOCK >::preinvoke(), ACE_TP_Token_Guard::release_token(), ACE_Timer_Queue_T< TYPE, FUNCTOR, ACE_LOCK >::timer_skew(), and ACE_Timer_Queue_T< TYPE, FUNCTOR, ACE_LOCK >::upcall().

Referenced by dispatch_i().

00316 {
00317   if (this->timer_queue_ == 0 || this->timer_queue_->is_empty())
00318     { // Empty timer queue so cannot have any expired timers.
00319       return 0;
00320     }
00321 
00322   // Get the current time
00323   ACE_Time_Value cur_time (this->timer_queue_->gettimeofday () +
00324                            this->timer_queue_->timer_skew ());
00325 
00326   // Look for a node in the timer queue whose timer <= the present
00327   // time.
00328   ACE_Timer_Node_Dispatch_Info info;
00329 
00330   if (this->timer_queue_->dispatch_info (cur_time,
00331                                          info))
00332     {
00333       const void *upcall_act = 0;
00334 
00335       // Preinvoke.
00336       this->timer_queue_->preinvoke (info,
00337                                      cur_time,
00338                                      upcall_act);
00339 
00340       // Release the token before dispatching notifies...
00341       guard.release_token ();
00342 
00343       // call the functor
00344       this->timer_queue_->upcall (info,
00345                                   cur_time);
00346 
00347       // Postinvoke
00348       this->timer_queue_->postinvoke (info,
00349                                       cur_time,
00350                                       upcall_act);
00351 
00352       // We have dispatched a timer
00353       return 1;
00354     }
00355 
00356   return 0;
00357 }

ACE_INLINE void ACE_TP_Reactor::no_op_sleep_hook void *   )  [static]
 

Called from handle events.

Definition at line 98 of file TP_Reactor.inl.

00099 {
00100 }

void ACE_TP_Reactor::notify_handle ACE_HANDLE  handle,
ACE_Reactor_Mask  mask,
ACE_Handle_Set ,
ACE_Event_Handler eh,
ACE_EH_PTMF  callback
[protected, virtual]
 

This method shouldn't get called.

Reimplemented from ACE_Select_Reactor_T< ACE_SELECT_REACTOR_TOKEN >.

Definition at line 668 of file TP_Reactor.cpp.

References ACE_ASSERT, ACE_EH_PTMF, ACE_ERROR, ACE_Reactor_Mask, ACE_TEXT, and LM_ERROR.

00673 {
00674   ACE_ERROR ((LM_ERROR,
00675               ACE_TEXT ("ACE_TP_Reactor::notify_handle: ")
00676               ACE_TEXT ("Wrong version of notify_handle() got called \n")));
00677 
00678   ACE_ASSERT (eh == 0);
00679   ACE_UNUSED_ARG (eh);
00680 }

ACE_TP_Reactor& ACE_TP_Reactor::operator= const ACE_TP_Reactor  )  [private]
 

int ACE_TP_Reactor::owner ACE_thread_t t_id  )  [virtual]
 

Return the thread ID of the current Leader.

Reimplemented from ACE_Select_Reactor_T< ACE_SELECT_REACTOR_TOKEN >.

Definition at line 134 of file TP_Reactor.cpp.

References ACE_TRACE, and ACE_Thread::self().

00135 {
00136   ACE_TRACE ("ACE_TP_Reactor::owner");
00137   *t_id = ACE_Thread::self ();
00138 
00139   return 0;
00140 }

int ACE_TP_Reactor::owner ACE_thread_t  n_id,
ACE_thread_t o_id = 0
[virtual]
 

The ACE_TP_Reactor implementation does not have a single owner thread. Attempts to set the owner explicitly are ignored. The reported owner thread is the current Leader in the pattern.

Reimplemented from ACE_Select_Reactor_T< ACE_SELECT_REACTOR_TOKEN >.

Definition at line 124 of file TP_Reactor.cpp.

References ACE_TRACE, and ACE_Thread::self().

00125 {
00126   ACE_TRACE ("ACE_TP_Reactor::owner");
00127   if (o_id)
00128     *o_id = ACE_Thread::self ();
00129 
00130   return 0;
00131 }

int ACE_TP_Reactor::post_process_socket_event ACE_EH_Dispatch_Info dispatch_info,
int  status
[private]
 

Definition at line 598 of file TP_Reactor.cpp.

References ACE_TP_Token_Guard::acquire_token(), ACE_EH_Dispatch_Info::event_handler_, ACE_Select_Reactor_Handler_Repository::find(), ACE_EH_Dispatch_Info::handle_, ACE_TP_Token_Guard::is_owner(), ACE_EH_Dispatch_Info::mask_, ACE_EH_Dispatch_Info::reference_counting_required_, ACE_Select_Reactor_T< ACE_SELECT_REACTOR_TOKEN >::remove_handler_i(), ACE_Event_Handler::remove_reference(), ACE_EH_Dispatch_Info::resume_flag_, and ACE_Select_Reactor_T< ACE_SELECT_REACTOR_TOKEN >::resume_i().

Referenced by dispatch_socket_event().

00600 {
00601   int result = 0;
00602 
00603   // First check if we really have to post process something, if not, then
00604   // we don't acquire the token which saves us a lot of time.
00605   if (status < 0 ||
00606      (dispatch_info.event_handler_ != this->notify_handler_ &&
00607       dispatch_info.resume_flag_ ==
00608         ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER))
00609     {
00610       // Get the reactor token and with this token acquired remove first the
00611       // handler and resume it at the same time. This must be atomic, see also
00612       // bugzilla 2395. When this is not atomic it can be that we resume the
00613       // handle after it is reused by the OS.
00614       ACE_TP_Token_Guard guard (this->token_);
00615 
00616       result = guard.acquire_token ();
00617 
00618       // If the guard is NOT the owner just return the retval
00619       if (!guard.is_owner ())
00620         return result;
00621 
00622       // A different event handler may have been registered during the
00623       // upcall if the handle was closed and then reopened, for
00624       // example.  Make sure we're removing and/or resuming the event
00625       // handler used during the upcall.
00626       ACE_Event_Handler const * const eh =
00627         this->handler_rep_.find (dispatch_info.handle_);
00628 
00629       // Only remove or resume the event handler used during the
00630       // upcall.
00631       if (eh == dispatch_info.event_handler_)
00632         {
00633           if (status < 0)
00634             {
00635               result =
00636                 this->remove_handler_i (dispatch_info.handle_,
00637                                         dispatch_info.mask_);
00638             }
00639 
00640           // Resume handler if required.
00641           if (dispatch_info.event_handler_ != this->notify_handler_ &&
00642               dispatch_info.resume_flag_ ==
00643               ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)
00644             this->resume_i (dispatch_info.handle_);
00645         }
00646     }
00647 
00648   // Call remove_reference() if needed.
00649   if (dispatch_info.reference_counting_required_)
00650     dispatch_info.event_handler_->remove_reference ();
00651 
00652   return result;
00653 }

int ACE_TP_Reactor::resumable_handler void   )  [virtual]
 

Does the reactor allow the application to resume the handle on its own ie. can it pass on the control of handle resumption to the application. The TP reactor has can allow applications to resume handles. So return a positive value.

Reimplemented from ACE_Select_Reactor_Impl.

Definition at line 656 of file TP_Reactor.cpp.

00657 {
00658   return 1;
00659 }


Member Data Documentation

ACE_TP_Reactor::ACE_ALLOC_HOOK_DECLARE
 

Declare the dynamic allocation hooks.

Reimplemented from ACE_Select_Reactor_T< ACE_SELECT_REACTOR_TOKEN >.

Definition at line 240 of file TP_Reactor.h.


The documentation for this class was generated from the following files:
Generated on Sun Jan 27 12:59:10 2008 for ACE by doxygen 1.3.6