00001 // -*- C++ -*- 00002 00003 //============================================================================= 00004 /** 00005 * @file TP_Reactor.h 00006 * 00007 * TP_Reactor.h,v 4.51 2006/03/20 10:10:14 jwillemsen Exp 00008 * 00009 * The ACE_TP_Reactor (aka, Thread Pool Reactor) uses the 00010 * Leader/Followers pattern to demultiplex events among a pool of 00011 * threads. When using a thread pool reactor, an application 00012 * pre-spawns a _fixed_ number of threads. When these threads 00013 * invoke the ACE_TP_Reactor's <handle_events> method, one thread 00014 * will become the leader and wait for an event. The other 00015 * follower threads will queue up waiting for their turn to become 00016 * the leader. When an event occurs, the leader will pick a 00017 * follower to become the leader and go on to handle the event. 00018 * The consequence of using ACE_TP_Reactor is the amortization of 00019 * the costs used to creating threads. The context switching cost 00020 * will also reduce. More over, the total resources used by 00021 * threads are bounded because there are a fixed number of threads. 00022 * 00023 * @author Irfan Pyarali <irfan@cs.wustl.edu> 00024 * @author Nanbor Wang <nanbor@cs.wustl.edu> 00025 */ 00026 //============================================================================= 00027 00028 00029 #ifndef ACE_TP_REACTOR_H 00030 #define ACE_TP_REACTOR_H 00031 00032 #include /**/ "ace/pre.h" 00033 00034 #include "ace/Select_Reactor.h" 00035 #include "ace/Timer_Queue.h" /* Simple forward decl won't work... */ 00036 00037 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00038 # pragma once 00039 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00040 00041 ACE_BEGIN_VERSIONED_NAMESPACE_DECL 00042 00043 /** 00044 * @class ACE_EH_Dispatch_Info 00045 * 00046 * @brief This structure contains information of the activated event 00047 * handler. 00048 */ 00049 class ACE_EH_Dispatch_Info 00050 { 00051 public: 00052 ACE_EH_Dispatch_Info (void); 00053 00054 void set (ACE_HANDLE handle, 00055 ACE_Event_Handler *event_handler, 00056 ACE_Reactor_Mask mask, 00057 ACE_EH_PTMF callback); 00058 00059 bool dispatch (void) const; 00060 00061 ACE_HANDLE handle_; 00062 ACE_Event_Handler *event_handler_; 00063 ACE_Reactor_Mask mask_; 00064 ACE_EH_PTMF callback_; 00065 int resume_flag_; 00066 bool reference_counting_required_; 00067 00068 private: 00069 bool dispatch_; 00070 00071 // Disallow copying and assignment. 00072 ACE_EH_Dispatch_Info (const ACE_EH_Dispatch_Info &); 00073 ACE_EH_Dispatch_Info &operator= (const ACE_EH_Dispatch_Info &); 00074 }; 00075 00076 00077 /** 00078 * @class ACE_TP_Token_Guard 00079 * 00080 * @brief A helper class that helps grabbing, releasing and waiting 00081 * on tokens for a thread that tries calling handle_events (). 00082 * 00083 * In short, this class will be owned by one thread by creating on the 00084 * stack. This class gives the status of the ownership of the token 00085 * and manages the ownership 00086 */ 00087 00088 class ACE_TP_Token_Guard 00089 { 00090 public: 00091 00092 /// Constructor that will grab the token for us 00093 ACE_TP_Token_Guard (ACE_Select_Reactor_Token &token); 00094 00095 /// Destructor. This will release the token if it hasnt been 00096 /// released till this point 00097 ~ACE_TP_Token_Guard (void); 00098 00099 /// Release the token .. 00100 void release_token (void); 00101 00102 /// Returns whether the thread that created this object ownes the 00103 /// token or not. 00104 int is_owner (void); 00105 00106 /// A helper method that grabs the token for us, after which the 00107 /// thread that owns that can do some actual work. 00108 int acquire_read_token (ACE_Time_Value *max_wait_time = 0); 00109 00110 /** 00111 * A helper method that grabs the token for us, after which the 00112 * thread that owns that can do some actual work. This differs from 00113 * acquire_read_token() as it uses acquire () to get the token instead of 00114 * acquire_read () 00115 */ 00116 int acquire_token (ACE_Time_Value *max_wait_time = 0); 00117 00118 private: 00119 00120 // Disallow default construction. 00121 ACE_TP_Token_Guard (void); 00122 00123 // Disallow copying and assignment. 00124 ACE_TP_Token_Guard (const ACE_TP_Token_Guard &); 00125 ACE_TP_Token_Guard &operator= (const ACE_TP_Token_Guard &); 00126 00127 private: 00128 00129 /// The Select Reactor token. 00130 ACE_Select_Reactor_Token &token_; 00131 00132 /// Flag that indicate whether the thread that created this object 00133 /// owns the token or not. A value of 0 indicates that this class 00134 /// hasnt got the token (and hence the thread) and a value of 1 00135 /// vice-versa. 00136 int owner_; 00137 00138 }; 00139 00140 /** 00141 * @class ACE_TP_Reactor 00142 * 00143 * @brief Specialization of Select Reactor to support thread-pool 00144 * based event dispatching. 00145 * 00146 * One of the short comings of the Select_Reactor in ACE is that it 00147 * did not support a thread pool based event dispatching model, 00148 * similar to the one in WFMO_Reactor. In Select_Reactor, only thread 00149 * can be blocked in <handle_events> at any given time. 00150 * 00151 * A new Reactor has been added to ACE that removes this short-coming. 00152 * TP_Reactor is a specialization of Select Reactor to support 00153 * thread-pool based event dispatching. This Reactor takes advantage 00154 * of the fact that events reported by <select> are persistent if not 00155 * acted upon immediately. It works by remembering the event handler 00156 * that just got activated, releasing the internal lock (so that some 00157 * other thread can start waiting in the event loop) and then 00158 * dispatching the event handler outside the context of the Reactor 00159 * lock. After the event handler has been dispatched the event handler is 00160 * resumed again. Don't call remove_handler() from the handle_x methods, 00161 * instead return -1. 00162 * 00163 * This Reactor is best suited for situations when the callbacks to 00164 * event handlers can take arbitrarily long and/or a number of threads 00165 * are available to run the event loops. Note that callback code in 00166 * Event Handlers (e.g. Event_Handler::handle_input) does not have to 00167 * be modified or made thread-safe for this Reactor. This is because 00168 * an activated Event Handler is suspended in the Reactor before the 00169 * upcall is made and resumed after the upcall completes. Therefore, 00170 * one Event Handler cannot be called by multiple threads 00171 * simultaneously. 00172 */ 00173 class ACE_Export ACE_TP_Reactor : public ACE_Select_Reactor 00174 { 00175 public: 00176 00177 // = Initialization and termination methods. 00178 00179 /// Initialize ACE_TP_Reactor with the default size. 00180 ACE_TP_Reactor (ACE_Sig_Handler * = 0, 00181 ACE_Timer_Queue * = 0, 00182 int mask_signals = 1, 00183 int s_queue = ACE_Select_Reactor_Token::FIFO); 00184 00185 /** 00186 * Initialize the ACE_TP_Reactor to manage 00187 * @a max_number_of_handles. If @a restart is non-0 then the 00188 * ACE_Reactor's <handle_events> method will be restarted 00189 * automatically when <EINTR> occurs. If <signal_handler> or 00190 * <timer_queue> are non-0 they are used as the signal handler and 00191 * timer queue, respectively. 00192 */ 00193 ACE_TP_Reactor (size_t max_number_of_handles, 00194 int restart = 0, 00195 ACE_Sig_Handler *sh = 0, 00196 ACE_Timer_Queue *tq = 0, 00197 int mask_signals = 1, 00198 int s_queue = ACE_Select_Reactor_Token::FIFO); 00199 00200 // = Event loop drivers. 00201 00202 /** 00203 * This event loop driver that blocks for <max_wait_time> before 00204 * returning. It will return earlier if timer events, I/O events, 00205 * or signal events occur. Note that <max_wait_time> can be 0, in 00206 * which case this method blocks indefinitely until events occur. 00207 * 00208 * <max_wait_time> is decremented to reflect how much time this call 00209 * took. For instance, if a time value of 3 seconds is passed to 00210 * handle_events and an event occurs after 2 seconds, 00211 * <max_wait_time> will equal 1 second. This can be used if an 00212 * application wishes to handle events for some fixed amount of 00213 * time. 00214 * 00215 * Returns the total number of ACE_Event_Handlers that were 00216 * dispatched, 0 if the <max_wait_time> elapsed without dispatching 00217 * any handlers, or -1 if something goes wrong. 00218 */ 00219 virtual int handle_events (ACE_Time_Value *max_wait_time = 0); 00220 00221 virtual int handle_events (ACE_Time_Value &max_wait_time); 00222 00223 /* 00224 * @todo The following methods are not supported. Support for 00225 * signals is not available in the TP_Reactor. These methods will be 00226 * supported once signal handling is supported. 00227 */ 00228 virtual int register_handler (int signum, 00229 ACE_Event_Handler *new_sh, 00230 ACE_Sig_Action *new_disp = 0, 00231 ACE_Event_Handler **old_sh = 0, 00232 ACE_Sig_Action *old_disp = 0); 00233 00234 virtual int register_handler (const ACE_Sig_Set &sigset, 00235 ACE_Event_Handler *new_sh, 00236 ACE_Sig_Action *new_disp = 0); 00237 00238 /** 00239 * The following template methods have been declared here to avoid 00240 * some compilers complaining that we have hidden some of the other 00241 * virtual functions. We need to override functions with signal 00242 * handlers and return -1 since the TP_Reactor does not support 00243 * signals. The definition of the following functions is just a 00244 * side-effect. The actual definitions will just call the base class 00245 * method. For detailed documentation of these methods please see 00246 * Select_Reactor_T.h. 00247 */ 00248 //@{ 00249 00250 virtual int register_handler (ACE_Event_Handler *eh, 00251 ACE_Reactor_Mask mask); 00252 00253 virtual int register_handler (ACE_HANDLE handle, 00254 ACE_Event_Handler *eh, 00255 ACE_Reactor_Mask mask); 00256 00257 #if defined (ACE_WIN32) 00258 00259 00260 00261 virtual int register_handler (ACE_Event_Handler *event_handler, 00262 ACE_HANDLE event_handle = ACE_INVALID_HANDLE); 00263 00264 #endif /* ACE_WIN32 */ 00265 00266 virtual int register_handler (ACE_HANDLE event_handle, 00267 ACE_HANDLE io_handle, 00268 ACE_Event_Handler *event_handler, 00269 ACE_Reactor_Mask mask); 00270 00271 virtual int register_handler (const ACE_Handle_Set &handles, 00272 ACE_Event_Handler *eh, 00273 ACE_Reactor_Mask mask); 00274 00275 //@} 00276 00277 /// Does the reactor allow the application to resume the handle on 00278 /// its own ie. can it pass on the control of handle resumption to 00279 /// the application. The TP reactor has can allow applications to 00280 /// resume handles. So return a positive value. 00281 virtual int resumable_handler (void); 00282 00283 /// Called from handle events 00284 static void no_op_sleep_hook (void *); 00285 00286 // = Any thread can perform a <handle_events>, override the owner() 00287 // methods to avoid the overhead of setting the owner thread. 00288 00289 /// Set the new owner of the thread and return the old owner. 00290 virtual int owner (ACE_thread_t n_id, ACE_thread_t *o_id = 0); 00291 00292 /// Return the current owner of the thread. 00293 virtual int owner (ACE_thread_t *t_id); 00294 00295 /// Declare the dynamic allocation hooks. 00296 ACE_ALLOC_HOOK_DECLARE; 00297 00298 protected: 00299 // = Internal methods that do the actual work. 00300 00301 /// Template method from the base class. 00302 virtual void clear_dispatch_mask (ACE_HANDLE handle, 00303 ACE_Reactor_Mask mask); 00304 00305 /// Dispatch just 1 signal, timer, notification handlers 00306 int dispatch_i (ACE_Time_Value *max_wait_time, 00307 ACE_TP_Token_Guard &guard); 00308 00309 /// Get the event that needs dispatching. It could be either a 00310 /// signal, timer, notification handlers or return possibly 1 I/O 00311 /// handler for dispatching. In the most common use case, this would 00312 /// return 1 I/O handler for dispatching 00313 int get_event_for_dispatching (ACE_Time_Value *max_wait_time); 00314 00315 /// Method to handle signals 00316 /// @note It is just busted at this point in time. 00317 int handle_signals (int &event_count, 00318 ACE_TP_Token_Guard &g); 00319 00320 /// Handle timer events 00321 int handle_timer_events (int &event_count, 00322 ACE_TP_Token_Guard &g); 00323 00324 /// Handle notify events 00325 int handle_notify_events (int &event_count, 00326 ACE_TP_Token_Guard &g); 00327 00328 /// handle socket events 00329 int handle_socket_events (int &event_count, 00330 ACE_TP_Token_Guard &g); 00331 00332 /// This method shouldn't get called. 00333 virtual void notify_handle (ACE_HANDLE handle, 00334 ACE_Reactor_Mask mask, 00335 ACE_Handle_Set &, 00336 ACE_Event_Handler *eh, 00337 ACE_EH_PTMF callback); 00338 private: 00339 00340 /// Get the handle of the notify pipe from the ready set if there is 00341 /// an event in the notify pipe. 00342 ACE_HANDLE get_notify_handle (void); 00343 00344 /// Get socket event dispatch information. 00345 int get_socket_event_info (ACE_EH_Dispatch_Info &info); 00346 00347 /// Notify the appropriate <callback> in the context of the <eh> 00348 /// associated with <handle> that a particular event has occurred. 00349 int dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info); 00350 00351 /// Clear the @a handle from the read_set 00352 void clear_handle_read_set (ACE_HANDLE handle); 00353 00354 int post_process_socket_event (ACE_EH_Dispatch_Info &dispatch_info,int status); 00355 00356 private: 00357 /// Deny access since member-wise won't work... 00358 ACE_TP_Reactor (const ACE_TP_Reactor &); 00359 ACE_TP_Reactor &operator = (const ACE_TP_Reactor &); 00360 }; 00361 00362 ACE_END_VERSIONED_NAMESPACE_DECL 00363 00364 #if defined (__ACE_INLINE__) 00365 #include "ace/TP_Reactor.inl" 00366 #endif /* __ACE_INLINE__ */ 00367 00368 #include /**/ "ace/post.h" 00369 00370 #endif /* ACE_TP_REACTOR_H */