ACE_TP_Reactor Class Reference

Specialization of ACE_Select_Reactor to support thread-pool based event dispatching. More...

#include <TP_Reactor.h>

Inheritance diagram for ACE_TP_Reactor:

Inheritance graph
[legend]
Collaboration diagram for ACE_TP_Reactor:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ACE_TP_Reactor (ACE_Sig_Handler *=0, ACE_Timer_Queue *=0, int mask_signals=1, int s_queue=ACE_Select_Reactor_Token::FIFO)
 Initialize ACE_TP_Reactor with the default size.
 ACE_TP_Reactor (size_t max_number_of_handles, int restart=0, ACE_Sig_Handler *sh=0, ACE_Timer_Queue *tq=0, int mask_signals=1, int s_queue=ACE_Select_Reactor_Token::FIFO)
virtual int handle_events (ACE_Time_Value *max_wait_time=0)
virtual int handle_events (ACE_Time_Value &max_wait_time)
virtual int resumable_handler (void)
virtual int owner (ACE_thread_t n_id, ACE_thread_t *o_id=0)
virtual int owner (ACE_thread_t *t_id)
 Return the thread ID of the current Leader.

Static Public Member Functions

static void no_op_sleep_hook (void *)
 Called from handle events.

Public Attributes

 ACE_ALLOC_HOOK_DECLARE
 Declare the dynamic allocation hooks.

Protected Member Functions

virtual void clear_dispatch_mask (ACE_HANDLE handle, ACE_Reactor_Mask mask)
 Template method from the base class.
int dispatch_i (ACE_Time_Value *max_wait_time, ACE_TP_Token_Guard &guard)
 Dispatch just 1 signal, timer, notification handlers.
int get_event_for_dispatching (ACE_Time_Value *max_wait_time)
int handle_timer_events (int &event_count, ACE_TP_Token_Guard &g)
 Handle timer events.
int handle_notify_events (int &event_count, ACE_TP_Token_Guard &g)
 Handle notify events.
int handle_socket_events (int &event_count, ACE_TP_Token_Guard &g)
 handle socket events
virtual void notify_handle (ACE_HANDLE handle, ACE_Reactor_Mask mask, ACE_Handle_Set &, ACE_Event_Handler *eh, ACE_EH_PTMF callback)
 This method shouldn't get called.

Private Member Functions

ACE_HANDLE get_notify_handle (void)
int get_socket_event_info (ACE_EH_Dispatch_Info &info)
 Get socket event dispatch information.
int dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info)
void clear_handle_read_set (ACE_HANDLE handle)
 Clear the handle from the read_set.
int post_process_socket_event (ACE_EH_Dispatch_Info &dispatch_info, int status)
 ACE_TP_Reactor (const ACE_TP_Reactor &)
 Deny access since member-wise won't work...
ACE_TP_Reactoroperator= (const ACE_TP_Reactor &)

Detailed Description

Specialization of ACE_Select_Reactor to support thread-pool based event dispatching.

One of the shortcomings of the ACE_Select_Reactor is that it does not support a thread pool-based event dispatching model, similar to the one in ACE_WFMO_Reactor. In ACE_Select_Reactor, only thread can call handle_events() at any given time. ACE_TP_Reactor removes this short-coming.

ACE_TP_Reactor is a specialization of ACE_Select_Reactor to support thread pool-based event dispatching. This reactor takes advantage of the fact that events reported by select() are persistent if not acted upon immediately. It works by remembering the event handler which was just activated, suspending it for further I/O activities, releasing the internal lock (so that another thread can start waiting in the event loop) and then dispatching the event's handler outside the scope of the reactor lock. After the event handler has been dispatched the event handler is resumed for further I/O activity.

This reactor implementation is best suited for situations when the callbacks to event handlers can take arbitrarily long and/or a number of threads are available to run the event loop. Note that I/O-processing callback code in event handlers (e.g. handle_input()) does not have to be modified or made thread-safe for this reactor. This is because before an I/O event is dispatched to an event handler, the handler is suspended; it is resumed by the reactor after the upcall completes. Therefore, multiple I/O events will not be made to one event handler multiple threads simultaneously. This suspend/resume protection does not apply to either timers scheduled with the reactor or to notifications requested via the reactor. When using timers and/or notifications you must provide proper protection for your class in the context of multiple threads.

Definition at line 176 of file TP_Reactor.h.


Constructor & Destructor Documentation

ACE_TP_Reactor::ACE_TP_Reactor ( ACE_Sig_Handler = 0,
ACE_Timer_Queue = 0,
int  mask_signals = 1,
int  s_queue = ACE_Select_Reactor_Token::FIFO 
)

Initialize ACE_TP_Reactor with the default size.

Definition at line 101 of file TP_Reactor.cpp.

References ACE_TRACE, and ACE_Select_Reactor_Impl::supress_notify_renew().

00105   : ACE_Select_Reactor (sh, tq, ACE_DISABLE_NOTIFY_PIPE_DEFAULT, 0, mask_signals, s_queue)
00106 {
00107   ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00108   this->supress_notify_renew (1);
00109 }

ACE_TP_Reactor::ACE_TP_Reactor ( size_t  max_number_of_handles,
int  restart = 0,
ACE_Sig_Handler sh = 0,
ACE_Timer_Queue tq = 0,
int  mask_signals = 1,
int  s_queue = ACE_Select_Reactor_Token::FIFO 
)

Initialize the ACE_TP_Reactor to manage max_number_of_handles. If restart is non-0 then the ACE_Reactor's handle_events() method will be restarted automatically when EINTR occurs. If sh or tq are non-0 they are used as the signal handler and timer queue, respectively.

Definition at line 111 of file TP_Reactor.cpp.

References ACE_TRACE, and ACE_Select_Reactor_Impl::supress_notify_renew().

00117   : ACE_Select_Reactor (max_number_of_handles, restart, sh, tq, ACE_DISABLE_NOTIFY_PIPE_DEFAULT, 0, mask_signals, s_queue)
00118 {
00119   ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00120   this->supress_notify_renew (1);
00121 }

ACE_TP_Reactor::ACE_TP_Reactor ( const ACE_TP_Reactor  )  [private]

Deny access since member-wise won't work...


Member Function Documentation

ACE_INLINE void ACE_TP_Reactor::clear_dispatch_mask ( ACE_HANDLE  handle,
ACE_Reactor_Mask  mask 
) [protected, virtual]

Template method from the base class.

Reimplemented from ACE_Select_Reactor_Impl.

Definition at line 111 of file TP_Reactor.inl.

References ACE_Select_Reactor_Handle_Set::ex_mask_, ACE_Select_Reactor_Handle_Set::rd_mask_, ACE_Select_Reactor_Impl::ready_set_, ACE_Handle_Set::reset(), and ACE_Select_Reactor_Handle_Set::wr_mask_.

00113 {
00114   this->ready_set_.rd_mask_.reset ();
00115   this->ready_set_.wr_mask_.reset ();
00116   this->ready_set_.ex_mask_.reset ();
00117 }

ACE_INLINE void ACE_TP_Reactor::clear_handle_read_set ( ACE_HANDLE  handle  )  [private]

Clear the handle from the read_set.

Definition at line 103 of file TP_Reactor.inl.

References ACE_Handle_Set::clr_bit(), ACE_Select_Reactor_Handle_Set::ex_mask_, ACE_Select_Reactor_Handle_Set::rd_mask_, ACE_Select_Reactor_Impl::ready_set_, and ACE_Select_Reactor_Handle_Set::wr_mask_.

Referenced by get_socket_event_info().

00104 {
00105   this->ready_set_.wr_mask_.clr_bit (handle);
00106   this->ready_set_.ex_mask_.clr_bit (handle);
00107   this->ready_set_.rd_mask_.clr_bit (handle);
00108 }

int ACE_TP_Reactor::dispatch_i ( ACE_Time_Value max_wait_time,
ACE_TP_Token_Guard guard 
) [protected]

Dispatch just 1 signal, timer, notification handlers.

Definition at line 177 of file TP_Reactor.cpp.

References get_event_for_dispatching(), handle_notify_events(), handle_socket_events(), handle_timer_events(), and ACE_Select_Reactor_Impl::state_changed_.

Referenced by handle_events().

00179 {
00180   int event_count = this->get_event_for_dispatching (max_wait_time);
00181 
00182   // We use this count to detect potential infinite loops as described
00183   // in bug 2540.
00184   int const initial_event_count = event_count;
00185 
00186   int result = 0;
00187 
00188   // Note: We are passing the <event_count> around, to have record of
00189   // how many events still need processing. May be this could be
00190   // useful in future.
00191 
00192 #if 0
00193   // @Ciju
00194   // signal handling isn't in a production state yet.
00195   // Commenting it out for now.
00196 
00197   // Dispatch signals
00198   if (event_count == -1)
00199     {
00200       // Looks like we dont do any upcalls in dispatch signals. If at
00201       // a later point of time, we decide to handle signals we have to
00202       // release the lock before we make any upcalls.. What is here
00203       // now is not the right thing...
00204       //
00205       // @@ We need to do better..
00206       return this->handle_signals (event_count, guard);
00207     }
00208 #endif // #if 0
00209 
00210   // If there are no signals and if we had received a proper
00211   // event_count then first look at dispatching timeouts. We need to
00212   // handle timers early since they may have higher latency
00213   // constraints than I/O handlers.  Ideally, the order of dispatching
00214   // should be a strategy...
00215 
00216   // NOTE: The event count does not have the number of timers that
00217   // needs dispatching. But we are still passing this along. We dont
00218   // need to do that. In the future we *may* have the timers also
00219   // returned through the <event_count>. Just passing that along for
00220   // that day.
00221   result = this->handle_timer_events (event_count, guard);
00222 
00223   if (result > 0)
00224     return result;
00225 
00226   // Else just go ahead fall through for further handling.
00227 
00228   if (event_count > 0)
00229     {
00230       // Next dispatch the notification handlers (if there are any to
00231       // dispatch).  These are required to handle multiple-threads
00232       // that are trying to update the <Reactor>.
00233       result = this->handle_notify_events (event_count, guard);
00234 
00235       if (result > 0)
00236         return result;
00237 
00238       // Else just fall through for further handling
00239     }
00240 
00241   if (event_count > 0)
00242     {
00243       // Handle socket events
00244       result = this->handle_socket_events (event_count, guard);
00245     }
00246 
00247   if (event_count != 0 && event_count == initial_event_count)
00248     {
00249       this->state_changed_ = true;
00250     }
00251 
00252   return result;
00253 }

int ACE_TP_Reactor::dispatch_socket_event ( ACE_EH_Dispatch_Info dispatch_info  )  [private]

Notify the appropriate <callback> in the context of the <eh> associated with <handle> that a particular event has occurred.

Definition at line 557 of file TP_Reactor.cpp.

References ACE_TRACE, ACE_EH_Dispatch_Info::callback_, ACE_EH_Dispatch_Info::event_handler_, ACE_EH_Dispatch_Info::handle_, and post_process_socket_event().

00558 {
00559   ACE_TRACE ("ACE_TP_Reactor::dispatch_socket_event");
00560 
00561   ACE_Event_Handler * const event_handler = dispatch_info.event_handler_;
00562   ACE_EH_PTMF const callback = dispatch_info.callback_;
00563 
00564   // Check for removed handlers.
00565   if (event_handler == 0)
00566     return -1;
00567 
00568   // Upcall. If the handler returns positive value (requesting a
00569   // reactor callback) don't set the ready-bit because it will be
00570   // ignored if the reactor state has changed. Just call back
00571   // as many times as the handler requests it. Other threads are off
00572   // handling other things.
00573   int status = 1;
00574   while (status > 0)
00575     status = (event_handler->*callback) (dispatch_info.handle_);
00576 
00577   // Post process socket event
00578   return this->post_process_socket_event (dispatch_info, status);
00579 }

int ACE_TP_Reactor::get_event_for_dispatching ( ACE_Time_Value max_wait_time  )  [protected]

Get the event that needs dispatching. It could be either a signal, timer, notification handlers or return possibly 1 I/O handler for dispatching. In the most common use case, this would return 1 I/O handler for dispatching

Definition at line 452 of file TP_Reactor.cpp.

References ACE_Select_Reactor_Handle_Set::ex_mask_, ACE_Select_Reactor_Handle_Set::rd_mask_, ACE_Select_Reactor_Impl::ready_set_, ACE_Handle_Set::reset(), ACE_Select_Reactor_Impl::state_changed_, ACE_Handle_Set::sync(), ACE_Select_Reactor_T< ACE_Select_Reactor_Token >::wait_for_multiple_events(), and ACE_Select_Reactor_Handle_Set::wr_mask_.

Referenced by dispatch_i().

00453 {
00454   // If the reactor handler state has changed, clear any remembered
00455   // ready bits and re-scan from the master wait_set.
00456   if (this->state_changed_)
00457     {
00458       this->ready_set_.rd_mask_.reset ();
00459       this->ready_set_.wr_mask_.reset ();
00460       this->ready_set_.ex_mask_.reset ();
00461 
00462       this->state_changed_ = false;
00463     }
00464   else
00465     {
00466       // This is a hack... somewhere, under certain conditions (which
00467       // I don't understand...) the mask will have all of its bits clear,
00468       // yet have a size_ > 0. This is an attempt to remedy the affect,
00469       // without knowing why it happens.
00470 
00471       this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ());
00472       this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ());
00473       this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ());
00474     }
00475 
00476   return this->wait_for_multiple_events (this->ready_set_, max_wait_time);
00477 }

ACE_HANDLE ACE_TP_Reactor::get_notify_handle ( void   )  [private]

Get the handle of the notify pipe from the ready set if there is an event in the notify pipe.

Definition at line 667 of file TP_Reactor.cpp.

References ACE_Reactor_Notify::notify_handle(), and ACE_Select_Reactor_Impl::notify_handler_.

Referenced by handle_notify_events().

00668 {
00669   // Call the notify handler to get a handle on which we would have a
00670   // notify waiting
00671   ACE_HANDLE const read_handle =
00672     this->notify_handler_->notify_handle ();
00673 
00674   // Check whether the rd_mask has been set on that handle. If so
00675   // return the handle.
00676   if (read_handle != ACE_INVALID_HANDLE &&
00677       this->ready_set_.rd_mask_.is_set (read_handle))
00678     {
00679       return read_handle;
00680     }
00681 
00682   // None found..
00683   return ACE_INVALID_HANDLE;
00684 }

int ACE_TP_Reactor::get_socket_event_info ( ACE_EH_Dispatch_Info info  )  [private]

Get socket event dispatch information.

Definition at line 480 of file TP_Reactor.cpp.

References clear_handle_read_set(), ACE_Event_Handler::EXCEPT_MASK, ACE_Event_Handler::handle_exception(), ACE_Event_Handler::handle_input(), ACE_Event_Handler::handle_output(), ACE_Event_Handler::READ_MASK, ACE_EH_Dispatch_Info::set(), and ACE_Event_Handler::WRITE_MASK.

Referenced by handle_socket_events().

00481 {
00482   // Check for dispatch in write, except, read. Only catch one, but if
00483   // one is caught, be sure to clear the handle from each mask in case
00484   // there is more than one mask set for it. This would cause problems
00485   // if the handler is suspended for dispatching, but its set bit in
00486   // another part of ready_set_ kept it from being dispatched.
00487   int found_io = 0;
00488   ACE_HANDLE handle;
00489 
00490   // @@todo: We can do quite a bit of code reduction here. Let me get
00491   // it to work before I do this.
00492   {
00493     ACE_Handle_Set_Iterator handle_iter (this->ready_set_.wr_mask_);
00494 
00495     while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00496       {
00497         if (this->is_suspended_i (handle))
00498           continue;
00499 
00500         // Remember this info
00501         event.set (handle,
00502                    this->handler_rep_.find (handle),
00503                    ACE_Event_Handler::WRITE_MASK,
00504                    &ACE_Event_Handler::handle_output);
00505 
00506         this->clear_handle_read_set (handle);
00507         found_io = 1;
00508       }
00509   }
00510 
00511   if (!found_io)
00512     {
00513       ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_);
00514 
00515       while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00516         {
00517           if (this->is_suspended_i (handle))
00518             continue;
00519 
00520           // Remember this info
00521           event.set (handle,
00522                      this->handler_rep_.find (handle),
00523                      ACE_Event_Handler::EXCEPT_MASK,
00524                      &ACE_Event_Handler::handle_exception);
00525 
00526           this->clear_handle_read_set (handle);
00527 
00528           found_io = 1;
00529         }
00530     }
00531 
00532   if (!found_io)
00533     {
00534       ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
00535 
00536       while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00537         {
00538           if (this->is_suspended_i (handle))
00539             continue;
00540 
00541           // Remember this info
00542           event.set (handle,
00543                      this->handler_rep_.find (handle),
00544                      ACE_Event_Handler::READ_MASK,
00545                      &ACE_Event_Handler::handle_input);
00546 
00547           this->clear_handle_read_set (handle);
00548           found_io = 1;
00549         }
00550     }
00551 
00552   return found_io;
00553 }

int ACE_TP_Reactor::handle_events ( ACE_Time_Value max_wait_time  )  [virtual]

This method is just like the one above, except the max_wait_time value is a reference and can therefore never be NULL.

Current <alertable_handle_events> is identical to <handle_events>.

Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.

Definition at line 646 of file TP_Reactor.cpp.

References handle_events().

00647 {
00648   return this->handle_events (&max_wait_time);
00649 }

int ACE_TP_Reactor::handle_events ( ACE_Time_Value max_wait_time = 0  )  [virtual]

This event loop driver that blocks for max_wait_time before returning. It will return earlier if timer events, I/O events, or signal events occur. Note that max_wait_time can be 0, in which case this method blocks indefinitely until events occur.

max_wait_time is decremented to reflect how much time this call took. For instance, if a time value of 3 seconds is passed to handle_events and an event occurs after 2 seconds, max_wait_time will equal 1 second. This can be used if an application wishes to handle events for some fixed amount of time.

Returns:
The total number of events that were dispatched; 0 if the max_wait_time elapsed without dispatching any handlers, or -1 if an error occurs (check errno for more information).

Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.

Definition at line 143 of file TP_Reactor.cpp.

References ACE_TRACE, ACE_TP_Token_Guard::acquire_read_token(), dispatch_i(), ACE_TP_Token_Guard::is_owner(), and ACE_Countdown_Time::update().

Referenced by handle_events().

00144 {
00145   ACE_TRACE ("ACE_TP_Reactor::handle_events");
00146 
00147   // Stash the current time -- the destructor of this object will
00148   // automatically compute how much time elapsed since this method was
00149   // called.
00150   ACE_Countdown_Time countdown (max_wait_time);
00151 
00152   //
00153   // The order of these events is very subtle, modify with care.
00154   //
00155 
00156   // Instantiate the token guard which will try grabbing the token for
00157   // this thread.
00158   ACE_TP_Token_Guard guard (this->token_);
00159 
00160   int const result = guard.acquire_read_token (max_wait_time);
00161 
00162   // If the guard is NOT the owner just return the retval
00163   if (!guard.is_owner ())
00164     return result;
00165 
00166   // After getting the lock just just for deactivation..
00167   if (this->deactivated_)
00168     return -1;
00169 
00170   // Update the countdown to reflect time waiting for the token.
00171   countdown.update ();
00172 
00173   return this->dispatch_i (max_wait_time, guard);
00174 }

int ACE_TP_Reactor::handle_notify_events ( int &  event_count,
ACE_TP_Token_Guard g 
) [protected]

Handle notify events.

Definition at line 347 of file TP_Reactor.cpp.

References ACE_Handle_Set::clr_bit(), ACE_Reactor_Notify::dispatch_notify(), get_notify_handle(), notify_handle(), ACE_Select_Reactor_Impl::notify_handler_, ACE_Select_Reactor_Handle_Set::rd_mask_, ACE_Select_Reactor_Impl::ready_set_, and ACE_TP_Token_Guard::release_token().

Referenced by dispatch_i().

00349 {
00350   // Get the handle on which notify calls could have occured
00351   ACE_HANDLE notify_handle = this->get_notify_handle ();
00352 
00353   int result = 0;
00354 
00355   // The notify was not in the list returned by
00356   // wait_for_multiple_events ().
00357   if (notify_handle == ACE_INVALID_HANDLE)
00358     return result;
00359 
00360   // Now just do a read on the pipe..
00361   ACE_Notification_Buffer buffer;
00362 
00363   // Clear the handle of the read_mask of our <ready_set_>
00364   this->ready_set_.rd_mask_.clr_bit (notify_handle);
00365 
00366   // Keep reading notifies till we empty it or till we have a
00367   // dispatchable buffer
00368   while (this->notify_handler_->read_notify_pipe (notify_handle, buffer) > 0)
00369     {
00370       // Just figure out whether we can read any buffer that has
00371       // dispatchable info. If not we have just been unblocked by
00372       // another thread trying to update the reactor. If we get any
00373       // buffer that needs dispatching we will dispatch that after
00374       // releasing the lock
00375       if (this->notify_handler_->is_dispatchable (buffer) > 0)
00376         {
00377           // Release the token before dispatching notifies...
00378           guard.release_token ();
00379 
00380           // Dispatch the upcall for the notify
00381           this->notify_handler_->dispatch_notify (buffer);
00382 
00383           // We had a successful dispatch.
00384           result = 1;
00385 
00386           // break out of the while loop
00387           break;
00388         }
00389     }
00390 
00391   // If we did some work, then we just return 1 which will allow us
00392   // to get out of here. If we return 0, then we will be asked to do
00393   // some work ie. dispacth socket events
00394   return result;
00395 }

int ACE_TP_Reactor::handle_socket_events ( int &  event_count,
ACE_TP_Token_Guard g 
) [protected]

handle socket events

Definition at line 398 of file TP_Reactor.cpp.

References ACE_Event_Handler::add_reference(), ACE_EH_Dispatch_Info::dispatch(), ACE_EH_Dispatch_Info::event_handler_, get_socket_event_info(), ACE_EH_Dispatch_Info::handle_, ACE_Select_Reactor_Impl::handler_rep_, ACE_EH_Dispatch_Info::mask_, ACE_EH_Dispatch_Info::reference_counting_required_, ACE_TP_Token_Guard::release_token(), and ACE_Select_Reactor_Handler_Repository::unbind().

Referenced by dispatch_i().

00400 {
00401 
00402   // We got the lock, lets handle some I/O events.
00403   ACE_EH_Dispatch_Info dispatch_info;
00404 
00405   this->get_socket_event_info (dispatch_info);
00406 
00407   // If there is any event handler that is ready to be dispatched, the
00408   // dispatch information is recorded in dispatch_info.
00409   if (!dispatch_info.dispatch ())
00410     {
00411       // Check for removed handlers.
00412       if (dispatch_info.event_handler_ == 0)
00413         {
00414           this->handler_rep_.unbind(dispatch_info.handle_,
00415                                     dispatch_info.mask_);
00416         }
00417 
00418 
00419       return 0;
00420     }
00421 
00422   // Suspend the handler so that other threads don't start dispatching
00423   // it, if we can't suspend then return directly
00424   //
00425   // NOTE: This check was performed in older versions of the
00426   // TP_Reactor. Looks like it is a waste..
00427   if (dispatch_info.event_handler_ != this->notify_handler_)
00428     if (this->suspend_i (dispatch_info.handle_) == -1)
00429       return 0;
00430 
00431   // Call add_reference() if needed.
00432   if (dispatch_info.reference_counting_required_)
00433     dispatch_info.event_handler_->add_reference ();
00434 
00435   // Release the lock.  Others threads can start waiting.
00436   guard.release_token ();
00437 
00438   int result = 0;
00439 
00440   // If there was an event handler ready, dispatch it.
00441   // Decrement the event left
00442   --event_count;
00443 
00444   // Dispatched an event
00445   if (this->dispatch_socket_event (dispatch_info) == 0)
00446     ++result;
00447 
00448   return result;
00449 }

int ACE_TP_Reactor::handle_timer_events ( int &  event_count,
ACE_TP_Token_Guard g 
) [protected]

Handle timer events.

Definition at line 307 of file TP_Reactor.cpp.

References ACE_Timer_Node_Dispatch_Info, ACE_Timer_Queue_T< TYPE, FUNCTOR, ACE_LOCK >::postinvoke(), ACE_Timer_Queue_T< TYPE, FUNCTOR, ACE_LOCK >::preinvoke(), ACE_TP_Token_Guard::release_token(), ACE_Select_Reactor_Impl::timer_queue_, ACE_Timer_Queue_T< TYPE, FUNCTOR, ACE_LOCK >::timer_skew(), and ACE_Timer_Queue_T< TYPE, FUNCTOR, ACE_LOCK >::upcall().

Referenced by dispatch_i().

00309 {
00310   if (this->timer_queue_ == 0 || this->timer_queue_->is_empty())
00311     { // Empty timer queue so cannot have any expired timers.
00312       return 0;
00313     }
00314 
00315   // Get the current time
00316   ACE_Time_Value cur_time (this->timer_queue_->gettimeofday () +
00317                            this->timer_queue_->timer_skew ());
00318 
00319   // Look for a node in the timer queue whose timer <= the present
00320   // time.
00321   ACE_Timer_Node_Dispatch_Info info;
00322 
00323   if (this->timer_queue_->dispatch_info (cur_time, info))
00324     {
00325       const void *upcall_act = 0;
00326 
00327       // Preinvoke.
00328       this->timer_queue_->preinvoke (info, cur_time, upcall_act);
00329 
00330       // Release the token before dispatching notifies...
00331       guard.release_token ();
00332 
00333       // call the functor
00334       this->timer_queue_->upcall (info, cur_time);
00335 
00336       // Postinvoke
00337       this->timer_queue_->postinvoke (info, cur_time, upcall_act);
00338 
00339       // We have dispatched a timer
00340       return 1;
00341     }
00342 
00343   return 0;
00344 }

ACE_INLINE void ACE_TP_Reactor::no_op_sleep_hook ( void *   )  [static]

Called from handle events.

Definition at line 98 of file TP_Reactor.inl.

Referenced by ACE_TP_Token_Guard::acquire_read_token().

00099 {
00100 }

void ACE_TP_Reactor::notify_handle ( ACE_HANDLE  handle,
ACE_Reactor_Mask  mask,
ACE_Handle_Set ,
ACE_Event_Handler eh,
ACE_EH_PTMF  callback 
) [protected, virtual]

This method shouldn't get called.

Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.

Definition at line 652 of file TP_Reactor.cpp.

References ACE_ASSERT, ACE_ERROR, ACE_TEXT, and LM_ERROR.

Referenced by handle_notify_events().

00657 {
00658   ACE_ERROR ((LM_ERROR,
00659               ACE_TEXT ("ACE_TP_Reactor::notify_handle: ")
00660               ACE_TEXT ("Wrong version of notify_handle() got called \n")));
00661 
00662   ACE_ASSERT (eh == 0);
00663   ACE_UNUSED_ARG (eh);
00664 }

ACE_TP_Reactor& ACE_TP_Reactor::operator= ( const ACE_TP_Reactor  )  [private]

int ACE_TP_Reactor::owner ( ACE_thread_t t_id  )  [virtual]

Return the thread ID of the current Leader.

Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.

Definition at line 134 of file TP_Reactor.cpp.

References ACE_TRACE, and ACE_Thread::self().

00135 {
00136   ACE_TRACE ("ACE_TP_Reactor::owner");
00137   *t_id = ACE_Thread::self ();
00138 
00139   return 0;
00140 }

int ACE_TP_Reactor::owner ( ACE_thread_t  n_id,
ACE_thread_t o_id = 0 
) [virtual]

The ACE_TP_Reactor implementation does not have a single owner thread. Attempts to set the owner explicitly are ignored. The reported owner thread is the current Leader in the pattern.

Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.

Definition at line 124 of file TP_Reactor.cpp.

References ACE_TRACE, and ACE_Thread::self().

00125 {
00126   ACE_TRACE ("ACE_TP_Reactor::owner");
00127   if (o_id)
00128     *o_id = ACE_Thread::self ();
00129 
00130   return 0;
00131 }

int ACE_TP_Reactor::post_process_socket_event ( ACE_EH_Dispatch_Info dispatch_info,
int  status 
) [private]

Definition at line 582 of file TP_Reactor.cpp.

References ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER, ACE_TP_Token_Guard::acquire_token(), ACE_EH_Dispatch_Info::event_handler_, ACE_Select_Reactor_Handler_Repository::find(), ACE_EH_Dispatch_Info::handle_, ACE_Select_Reactor_Impl::handler_rep_, ACE_TP_Token_Guard::is_owner(), ACE_EH_Dispatch_Info::mask_, ACE_EH_Dispatch_Info::reference_counting_required_, ACE_Select_Reactor_T< ACE_Select_Reactor_Token >::remove_handler_i(), ACE_Event_Handler::remove_reference(), ACE_EH_Dispatch_Info::resume_flag_, and ACE_Select_Reactor_T< ACE_Select_Reactor_Token >::resume_i().

Referenced by dispatch_socket_event().

00584 {
00585   int result = 0;
00586 
00587   // First check if we really have to post process something, if not, then
00588   // we don't acquire the token which saves us a lot of time.
00589   if (status < 0 ||
00590      (dispatch_info.event_handler_ != this->notify_handler_ &&
00591       dispatch_info.resume_flag_ ==
00592         ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER))
00593     {
00594       // Get the reactor token and with this token acquired remove first the
00595       // handler and resume it at the same time. This must be atomic, see also
00596       // bugzilla 2395. When this is not atomic it can be that we resume the
00597       // handle after it is reused by the OS.
00598       ACE_TP_Token_Guard guard (this->token_);
00599 
00600       result = guard.acquire_token ();
00601 
00602       // If the guard is NOT the owner just return the retval
00603       if (!guard.is_owner ())
00604         return result;
00605 
00606       // A different event handler may have been registered during the
00607       // upcall if the handle was closed and then reopened, for
00608       // example.  Make sure we're removing and/or resuming the event
00609       // handler used during the upcall.
00610       ACE_Event_Handler const * const eh =
00611         this->handler_rep_.find (dispatch_info.handle_);
00612 
00613       // Only remove or resume the event handler used during the
00614       // upcall.
00615       if (eh == dispatch_info.event_handler_)
00616         {
00617           if (status < 0)
00618             {
00619               result =
00620                 this->remove_handler_i (dispatch_info.handle_,
00621                                         dispatch_info.mask_);
00622             }
00623 
00624           // Resume handler if required.
00625           if (dispatch_info.event_handler_ != this->notify_handler_ &&
00626               dispatch_info.resume_flag_ ==
00627               ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)
00628             this->resume_i (dispatch_info.handle_);
00629         }
00630     }
00631 
00632   // Call remove_reference() if needed.
00633   if (dispatch_info.reference_counting_required_)
00634     dispatch_info.event_handler_->remove_reference ();
00635 
00636   return result;
00637 }

int ACE_TP_Reactor::resumable_handler ( void   )  [virtual]

Does the reactor allow the application to resume the handle on its own ie. can it pass on the control of handle resumption to the application. The TP reactor has can allow applications to resume handles. So return a positive value.

Reimplemented from ACE_Select_Reactor_Impl.

Definition at line 640 of file TP_Reactor.cpp.

00641 {
00642   return 1;
00643 }


Member Data Documentation

ACE_TP_Reactor::ACE_ALLOC_HOOK_DECLARE

Declare the dynamic allocation hooks.

Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.

Definition at line 240 of file TP_Reactor.h.


The documentation for this class was generated from the following files:
Generated on Tue Feb 2 17:35:49 2010 for ACE by  doxygen 1.4.7