TP_Reactor.h

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 
00003 //=============================================================================
00004 /**
00005  *  @file    TP_Reactor.h
00006  *
00007  *  $Id: TP_Reactor.h 80826 2008-03-04 14:51:23Z wotte $
00008  *
00009  *  The ACE_TP_Reactor (aka, Thread Pool Reactor) uses the
00010  *  Leader/Followers pattern to demultiplex events among a pool of
00011  *  threads.  When using a thread pool reactor, an application
00012  *  pre-spawns a fixed number of threads.  When these threads
00013  *  invoke the ACE_TP_Reactor's handle_events() method, one thread
00014  *  will become the leader and wait for an event.  The other
00015  *  follower threads will queue up waiting for their turn to become
00016  *  the leader.  When an event occurs, the leader will pick a
00017  *  follower to become the leader and go on to handle the event.
00018  *  The consequence of using ACE_TP_Reactor is the amortization of
00019  *  the costs used to create threads.  The context switching cost
00020  *  will also reduce.  Moreover, the total resources used by
00021  *  threads are bounded because there are a fixed number of threads.
00022  *
00023  *  @author Irfan Pyarali <irfan@cs.wustl.edu>
00024  *  @author Nanbor Wang <nanbor@cs.wustl.edu>
00025  */
00026 //=============================================================================
00027 
00028 
00029 #ifndef ACE_TP_REACTOR_H
00030 #define ACE_TP_REACTOR_H
00031 
00032 #include /**/ "ace/pre.h"
00033 
00034 #include "ace/Select_Reactor.h"
00035 #include "ace/Timer_Queue.h"    /* Simple forward decl won't work... */
00036 
00037 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00038 # pragma once
00039 #endif /* ACE_LACKS_PRAGMA_ONCE */
00040 
00041 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00042 
00043 /**
00044  * @class ACE_EH_Dispatch_Info
00045  *
00046  * @brief This structure contains information of the activated event
00047  * handler.
00048  */
00049 class ACE_EH_Dispatch_Info
00050 {
00051 public:
00052   ACE_EH_Dispatch_Info (void);
00053 
00054   void set (ACE_HANDLE handle,
00055             ACE_Event_Handler *event_handler,
00056             ACE_Reactor_Mask mask,
00057             ACE_EH_PTMF callback);
00058 
00059   bool dispatch (void) const;
00060 
00061   ACE_HANDLE handle_;
00062   ACE_Event_Handler *event_handler_;
00063   ACE_Reactor_Mask mask_;
00064   ACE_EH_PTMF callback_;
00065   int resume_flag_;
00066   bool reference_counting_required_;
00067 
00068 private:
00069   bool dispatch_;
00070 
00071   // Disallow copying and assignment.
00072   ACE_EH_Dispatch_Info (const ACE_EH_Dispatch_Info &);
00073   ACE_EH_Dispatch_Info &operator= (const ACE_EH_Dispatch_Info &);
00074 };
00075 
00076 
00077 /**
00078  * @class ACE_TP_Token_Guard
00079  *
00080  * @brief A helper class that helps grabbing, releasing and waiting
00081  * on tokens for a thread that tries calling handle_events ().
00082  *
00083  * In short, this class will be owned by one thread by creating on the
00084  * stack. This class gives the status of the ownership of the token
00085  * and manages the ownership
00086  */
00087 
00088 class ACE_TP_Token_Guard
00089 {
00090 public:
00091 
00092   /// Constructor that will grab the token for us
00093   ACE_TP_Token_Guard (ACE_Select_Reactor_Token &token);
00094 
00095   /// Destructor. This will release the token if it hasnt been
00096   /// released till this point
00097   ~ACE_TP_Token_Guard (void);
00098 
00099   /// Release the token ..
00100   void release_token (void);
00101 
00102   /// Returns whether the thread that created this object ownes the
00103   /// token or not.
00104   bool is_owner (void);
00105 
00106   /// A helper method that grabs the token for us, after which the
00107   /// thread that owns that can do some actual work.
00108   int acquire_read_token (ACE_Time_Value *max_wait_time = 0);
00109 
00110   /**
00111    * A helper method that grabs the token for us, after which the
00112    * thread that owns that can do some actual work. This differs from
00113    * acquire_read_token() as it uses acquire () to get the token instead of
00114    * acquire_read ()
00115    */
00116   int acquire_token (ACE_Time_Value *max_wait_time = 0);
00117 
00118 private:
00119 
00120   // Disallow default construction.
00121   ACE_TP_Token_Guard (void);
00122 
00123   // Disallow copying and assignment.
00124   ACE_TP_Token_Guard (const ACE_TP_Token_Guard &);
00125   ACE_TP_Token_Guard &operator= (const ACE_TP_Token_Guard &);
00126 
00127 private:
00128 
00129   /// The Select Reactor token.
00130   ACE_Select_Reactor_Token &token_;
00131 
00132   /// Flag that indicate whether the thread that created this object
00133   /// owns the token or not. A value of false indicates that this class
00134   /// hasnt got the token (and hence the thread) and a value of true
00135   /// vice-versa.
00136   bool owner_;
00137 
00138 };
00139 
00140 /**
00141  * @class ACE_TP_Reactor
00142  *
00143  * @brief Specialization of ACE_Select_Reactor to support thread-pool
00144  * based event dispatching.
00145  *
00146  * One of the shortcomings of the ACE_Select_Reactor is that it
00147  * does not support a thread pool-based event dispatching model,
00148  * similar to the one in ACE_WFMO_Reactor.  In ACE_Select_Reactor, only
00149  * thread can call handle_events() at any given time. ACE_TP_Reactor
00150  * removes this short-coming.
00151  *
00152  * ACE_TP_Reactor is a specialization of ACE_Select_Reactor to support
00153  * thread pool-based event dispatching. This reactor takes advantage
00154  * of the fact that events reported by @c select() are persistent if not
00155  * acted upon immediately.  It works by remembering the event handler
00156  * which was just activated, suspending it for further I/O activities,
00157  * releasing the internal lock (so that another thread can start waiting
00158  * in the event loop) and then dispatching the event's handler outside the
00159  * scope of the reactor lock. After the event handler has been dispatched
00160  * the event handler is resumed for further I/O activity.
00161  *
00162  * This reactor implementation is best suited for situations when the
00163  * callbacks to event handlers can take arbitrarily long and/or a number
00164  * of threads are available to run the event loop. Note that I/O-processing
00165  * callback code in event handlers (e.g. handle_input()) does not have to
00166  * be modified or made thread-safe for this reactor.  This is because
00167  * before an I/O event is dispatched to an event handler, the handler is
00168  * suspended; it is resumed by the reactor after the upcall completes.
00169  * Therefore, multiple I/O events will not be made to one event handler
00170  * multiple threads simultaneously. This suspend/resume protection does not
00171  * apply to either timers scheduled with the reactor or to notifications
00172  * requested via the reactor. When using timers and/or notifications you
00173  * must provide proper protection for your class in the context of multiple
00174  * threads.
00175  */
00176 class ACE_Export ACE_TP_Reactor : public ACE_Select_Reactor
00177 {
00178 public:
00179 
00180   /// Initialize ACE_TP_Reactor with the default size.
00181   ACE_TP_Reactor (ACE_Sig_Handler * = 0,
00182                   ACE_Timer_Queue * = 0,
00183                   int mask_signals = 1,
00184                   int s_queue = ACE_Select_Reactor_Token::FIFO);
00185 
00186   /**
00187    * Initialize the ACE_TP_Reactor to manage
00188    * @a max_number_of_handles.  If @a restart is non-0 then the
00189    * ACE_Reactor's @c handle_events() method will be restarted
00190    * automatically when @c EINTR occurs.  If @a sh or
00191    * @a tq are non-0 they are used as the signal handler and
00192    * timer queue, respectively.
00193    */
00194   ACE_TP_Reactor (size_t max_number_of_handles,
00195                   int restart = 0,
00196                   ACE_Sig_Handler *sh = 0,
00197                   ACE_Timer_Queue *tq = 0,
00198                   int mask_signals = 1,
00199                   int s_queue = ACE_Select_Reactor_Token::FIFO);
00200 
00201   /**
00202    * This event loop driver that blocks for @a max_wait_time before
00203    * returning.  It will return earlier if timer events, I/O events,
00204    * or signal events occur.  Note that @a max_wait_time can be 0, in
00205    * which case this method blocks indefinitely until events occur.
00206    *
00207    * @a max_wait_time is decremented to reflect how much time this call
00208    * took.  For instance, if a time value of 3 seconds is passed to
00209    * handle_events and an event occurs after 2 seconds,
00210    * @a max_wait_time will equal 1 second.  This can be used if an
00211    * application wishes to handle events for some fixed amount of
00212    * time.
00213    *
00214    * @return The total number of events that were dispatched; 0 if the
00215    * @a max_wait_time elapsed without dispatching any handlers, or -1
00216    * if an error occurs (check @c errno for more information).
00217    */
00218   virtual int handle_events (ACE_Time_Value *max_wait_time = 0);
00219 
00220   virtual int handle_events (ACE_Time_Value &max_wait_time);
00221 
00222   /// Does the reactor allow the application to resume the handle on
00223   /// its own ie. can it pass on the control of handle resumption to
00224   /// the application.  The TP reactor has can allow applications to
00225   /// resume handles.  So return a positive value.
00226   virtual int resumable_handler (void);
00227 
00228   /// Called from handle events
00229   static void no_op_sleep_hook (void *);
00230 
00231   /// The ACE_TP_Reactor implementation does not have a single owner thread.
00232   /// Attempts to set the owner explicitly are ignored. The reported owner
00233   /// thread is the current Leader in the pattern.
00234   virtual int owner (ACE_thread_t n_id, ACE_thread_t *o_id = 0);
00235 
00236   /// Return the thread ID of the current Leader.
00237   virtual int owner (ACE_thread_t *t_id);
00238 
00239   /// Declare the dynamic allocation hooks.
00240   ACE_ALLOC_HOOK_DECLARE;
00241 
00242 protected:
00243   // = Internal methods that do the actual work.
00244 
00245   /// Template method from the base class.
00246   virtual void clear_dispatch_mask (ACE_HANDLE handle,
00247                                     ACE_Reactor_Mask mask);
00248 
00249   /// Dispatch just 1 signal, timer, notification handlers
00250   int dispatch_i (ACE_Time_Value *max_wait_time,
00251                   ACE_TP_Token_Guard &guard);
00252 
00253   /// Get the event that needs dispatching. It could be either a
00254   /// signal, timer, notification handlers or return possibly 1 I/O
00255   /// handler for dispatching. In the most common use case, this would
00256   /// return 1 I/O handler for dispatching
00257   int get_event_for_dispatching (ACE_Time_Value *max_wait_time);
00258 
00259 #if 0
00260   // @Ciju
00261   // signal handling isn't in a production state yet.
00262   // Commenting it out for now.
00263 
00264   /// Method to handle signals
00265   /// @note It is just busted at this point in time.
00266   int handle_signals (int &event_count,
00267                       ACE_TP_Token_Guard &g);
00268 #endif // #if 0
00269 
00270   /// Handle timer events
00271   int handle_timer_events (int &event_count,
00272                            ACE_TP_Token_Guard &g);
00273 
00274   /// Handle notify events
00275   int handle_notify_events (int &event_count,
00276                             ACE_TP_Token_Guard &g);
00277 
00278   /// handle socket events
00279   int handle_socket_events (int &event_count,
00280                             ACE_TP_Token_Guard &g);
00281 
00282   /// This method shouldn't get called.
00283   virtual void notify_handle (ACE_HANDLE handle,
00284                               ACE_Reactor_Mask mask,
00285                               ACE_Handle_Set &,
00286                               ACE_Event_Handler *eh,
00287                               ACE_EH_PTMF callback);
00288 private:
00289 
00290   /// Get the handle of the notify pipe from the ready set if there is
00291   /// an event in the notify pipe.
00292   ACE_HANDLE get_notify_handle (void);
00293 
00294   /// Get socket event dispatch information.
00295   int get_socket_event_info (ACE_EH_Dispatch_Info &info);
00296 
00297   /// Notify the appropriate <callback> in the context of the <eh>
00298   /// associated with <handle> that a particular event has occurred.
00299   int dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info);
00300 
00301   /// Clear the @a handle from the read_set
00302   void clear_handle_read_set (ACE_HANDLE handle);
00303 
00304   int post_process_socket_event (ACE_EH_Dispatch_Info &dispatch_info,int status);
00305 
00306 private:
00307   /// Deny access since member-wise won't work...
00308   ACE_TP_Reactor (const ACE_TP_Reactor &);
00309   ACE_TP_Reactor &operator = (const ACE_TP_Reactor &);
00310 };
00311 
00312 ACE_END_VERSIONED_NAMESPACE_DECL
00313 
00314 #if defined (__ACE_INLINE__)
00315 #include "ace/TP_Reactor.inl"
00316 #endif /* __ACE_INLINE__ */
00317 
00318 #include /**/ "ace/post.h"
00319 
00320 #endif /* ACE_TP_REACTOR_H */

Generated on Tue Feb 2 17:18:43 2010 for ACE by  doxygen 1.4.7