00001 // -*- C++ -*- 00002 00003 //============================================================================= 00004 /** 00005 * @file TP_Reactor.h 00006 * 00007 * $Id: TP_Reactor.h 80826 2008-03-04 14:51:23Z wotte $ 00008 * 00009 * The ACE_TP_Reactor (aka, Thread Pool Reactor) uses the 00010 * Leader/Followers pattern to demultiplex events among a pool of 00011 * threads. When using a thread pool reactor, an application 00012 * pre-spawns a fixed number of threads. When these threads 00013 * invoke the ACE_TP_Reactor's handle_events() method, one thread 00014 * will become the leader and wait for an event. The other 00015 * follower threads will queue up waiting for their turn to become 00016 * the leader. When an event occurs, the leader will pick a 00017 * follower to become the leader and go on to handle the event. 00018 * The consequence of using ACE_TP_Reactor is the amortization of 00019 * the costs used to create threads. The context switching cost 00020 * will also reduce. Moreover, the total resources used by 00021 * threads are bounded because there are a fixed number of threads. 00022 * 00023 * @author Irfan Pyarali <irfan@cs.wustl.edu> 00024 * @author Nanbor Wang <nanbor@cs.wustl.edu> 00025 */ 00026 //============================================================================= 00027 00028 00029 #ifndef ACE_TP_REACTOR_H 00030 #define ACE_TP_REACTOR_H 00031 00032 #include /**/ "ace/pre.h" 00033 00034 #include "ace/Select_Reactor.h" 00035 #include "ace/Timer_Queue.h" /* Simple forward decl won't work... */ 00036 00037 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00038 # pragma once 00039 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00040 00041 ACE_BEGIN_VERSIONED_NAMESPACE_DECL 00042 00043 /** 00044 * @class ACE_EH_Dispatch_Info 00045 * 00046 * @brief This structure contains information of the activated event 00047 * handler. 00048 */ 00049 class ACE_EH_Dispatch_Info 00050 { 00051 public: 00052 ACE_EH_Dispatch_Info (void); 00053 00054 void set (ACE_HANDLE handle, 00055 ACE_Event_Handler *event_handler, 00056 ACE_Reactor_Mask mask, 00057 ACE_EH_PTMF callback); 00058 00059 bool dispatch (void) const; 00060 00061 ACE_HANDLE handle_; 00062 ACE_Event_Handler *event_handler_; 00063 ACE_Reactor_Mask mask_; 00064 ACE_EH_PTMF callback_; 00065 int resume_flag_; 00066 bool reference_counting_required_; 00067 00068 private: 00069 bool dispatch_; 00070 00071 // Disallow copying and assignment. 00072 ACE_EH_Dispatch_Info (const ACE_EH_Dispatch_Info &); 00073 ACE_EH_Dispatch_Info &operator= (const ACE_EH_Dispatch_Info &); 00074 }; 00075 00076 00077 /** 00078 * @class ACE_TP_Token_Guard 00079 * 00080 * @brief A helper class that helps grabbing, releasing and waiting 00081 * on tokens for a thread that tries calling handle_events (). 00082 * 00083 * In short, this class will be owned by one thread by creating on the 00084 * stack. This class gives the status of the ownership of the token 00085 * and manages the ownership 00086 */ 00087 00088 class ACE_TP_Token_Guard 00089 { 00090 public: 00091 00092 /// Constructor that will grab the token for us 00093 ACE_TP_Token_Guard (ACE_Select_Reactor_Token &token); 00094 00095 /// Destructor. This will release the token if it hasnt been 00096 /// released till this point 00097 ~ACE_TP_Token_Guard (void); 00098 00099 /// Release the token .. 00100 void release_token (void); 00101 00102 /// Returns whether the thread that created this object ownes the 00103 /// token or not. 00104 bool is_owner (void); 00105 00106 /// A helper method that grabs the token for us, after which the 00107 /// thread that owns that can do some actual work. 00108 int acquire_read_token (ACE_Time_Value *max_wait_time = 0); 00109 00110 /** 00111 * A helper method that grabs the token for us, after which the 00112 * thread that owns that can do some actual work. This differs from 00113 * acquire_read_token() as it uses acquire () to get the token instead of 00114 * acquire_read () 00115 */ 00116 int acquire_token (ACE_Time_Value *max_wait_time = 0); 00117 00118 private: 00119 00120 // Disallow default construction. 00121 ACE_TP_Token_Guard (void); 00122 00123 // Disallow copying and assignment. 00124 ACE_TP_Token_Guard (const ACE_TP_Token_Guard &); 00125 ACE_TP_Token_Guard &operator= (const ACE_TP_Token_Guard &); 00126 00127 private: 00128 00129 /// The Select Reactor token. 00130 ACE_Select_Reactor_Token &token_; 00131 00132 /// Flag that indicate whether the thread that created this object 00133 /// owns the token or not. A value of false indicates that this class 00134 /// hasnt got the token (and hence the thread) and a value of true 00135 /// vice-versa. 00136 bool owner_; 00137 00138 }; 00139 00140 /** 00141 * @class ACE_TP_Reactor 00142 * 00143 * @brief Specialization of ACE_Select_Reactor to support thread-pool 00144 * based event dispatching. 00145 * 00146 * One of the shortcomings of the ACE_Select_Reactor is that it 00147 * does not support a thread pool-based event dispatching model, 00148 * similar to the one in ACE_WFMO_Reactor. In ACE_Select_Reactor, only 00149 * thread can call handle_events() at any given time. ACE_TP_Reactor 00150 * removes this short-coming. 00151 * 00152 * ACE_TP_Reactor is a specialization of ACE_Select_Reactor to support 00153 * thread pool-based event dispatching. This reactor takes advantage 00154 * of the fact that events reported by @c select() are persistent if not 00155 * acted upon immediately. It works by remembering the event handler 00156 * which was just activated, suspending it for further I/O activities, 00157 * releasing the internal lock (so that another thread can start waiting 00158 * in the event loop) and then dispatching the event's handler outside the 00159 * scope of the reactor lock. After the event handler has been dispatched 00160 * the event handler is resumed for further I/O activity. 00161 * 00162 * This reactor implementation is best suited for situations when the 00163 * callbacks to event handlers can take arbitrarily long and/or a number 00164 * of threads are available to run the event loop. Note that I/O-processing 00165 * callback code in event handlers (e.g. handle_input()) does not have to 00166 * be modified or made thread-safe for this reactor. This is because 00167 * before an I/O event is dispatched to an event handler, the handler is 00168 * suspended; it is resumed by the reactor after the upcall completes. 00169 * Therefore, multiple I/O events will not be made to one event handler 00170 * multiple threads simultaneously. This suspend/resume protection does not 00171 * apply to either timers scheduled with the reactor or to notifications 00172 * requested via the reactor. When using timers and/or notifications you 00173 * must provide proper protection for your class in the context of multiple 00174 * threads. 00175 */ 00176 class ACE_Export ACE_TP_Reactor : public ACE_Select_Reactor 00177 { 00178 public: 00179 00180 /// Initialize ACE_TP_Reactor with the default size. 00181 ACE_TP_Reactor (ACE_Sig_Handler * = 0, 00182 ACE_Timer_Queue * = 0, 00183 int mask_signals = 1, 00184 int s_queue = ACE_Select_Reactor_Token::FIFO); 00185 00186 /** 00187 * Initialize the ACE_TP_Reactor to manage 00188 * @a max_number_of_handles. If @a restart is non-0 then the 00189 * ACE_Reactor's @c handle_events() method will be restarted 00190 * automatically when @c EINTR occurs. If @a sh or 00191 * @a tq are non-0 they are used as the signal handler and 00192 * timer queue, respectively. 00193 */ 00194 ACE_TP_Reactor (size_t max_number_of_handles, 00195 int restart = 0, 00196 ACE_Sig_Handler *sh = 0, 00197 ACE_Timer_Queue *tq = 0, 00198 int mask_signals = 1, 00199 int s_queue = ACE_Select_Reactor_Token::FIFO); 00200 00201 /** 00202 * This event loop driver that blocks for @a max_wait_time before 00203 * returning. It will return earlier if timer events, I/O events, 00204 * or signal events occur. Note that @a max_wait_time can be 0, in 00205 * which case this method blocks indefinitely until events occur. 00206 * 00207 * @a max_wait_time is decremented to reflect how much time this call 00208 * took. For instance, if a time value of 3 seconds is passed to 00209 * handle_events and an event occurs after 2 seconds, 00210 * @a max_wait_time will equal 1 second. This can be used if an 00211 * application wishes to handle events for some fixed amount of 00212 * time. 00213 * 00214 * @return The total number of events that were dispatched; 0 if the 00215 * @a max_wait_time elapsed without dispatching any handlers, or -1 00216 * if an error occurs (check @c errno for more information). 00217 */ 00218 virtual int handle_events (ACE_Time_Value *max_wait_time = 0); 00219 00220 virtual int handle_events (ACE_Time_Value &max_wait_time); 00221 00222 /// Does the reactor allow the application to resume the handle on 00223 /// its own ie. can it pass on the control of handle resumption to 00224 /// the application. The TP reactor has can allow applications to 00225 /// resume handles. So return a positive value. 00226 virtual int resumable_handler (void); 00227 00228 /// Called from handle events 00229 static void no_op_sleep_hook (void *); 00230 00231 /// The ACE_TP_Reactor implementation does not have a single owner thread. 00232 /// Attempts to set the owner explicitly are ignored. The reported owner 00233 /// thread is the current Leader in the pattern. 00234 virtual int owner (ACE_thread_t n_id, ACE_thread_t *o_id = 0); 00235 00236 /// Return the thread ID of the current Leader. 00237 virtual int owner (ACE_thread_t *t_id); 00238 00239 /// Declare the dynamic allocation hooks. 00240 ACE_ALLOC_HOOK_DECLARE; 00241 00242 protected: 00243 // = Internal methods that do the actual work. 00244 00245 /// Template method from the base class. 00246 virtual void clear_dispatch_mask (ACE_HANDLE handle, 00247 ACE_Reactor_Mask mask); 00248 00249 /// Dispatch just 1 signal, timer, notification handlers 00250 int dispatch_i (ACE_Time_Value *max_wait_time, 00251 ACE_TP_Token_Guard &guard); 00252 00253 /// Get the event that needs dispatching. It could be either a 00254 /// signal, timer, notification handlers or return possibly 1 I/O 00255 /// handler for dispatching. In the most common use case, this would 00256 /// return 1 I/O handler for dispatching 00257 int get_event_for_dispatching (ACE_Time_Value *max_wait_time); 00258 00259 #if 0 00260 // @Ciju 00261 // signal handling isn't in a production state yet. 00262 // Commenting it out for now. 00263 00264 /// Method to handle signals 00265 /// @note It is just busted at this point in time. 00266 int handle_signals (int &event_count, 00267 ACE_TP_Token_Guard &g); 00268 #endif // #if 0 00269 00270 /// Handle timer events 00271 int handle_timer_events (int &event_count, 00272 ACE_TP_Token_Guard &g); 00273 00274 /// Handle notify events 00275 int handle_notify_events (int &event_count, 00276 ACE_TP_Token_Guard &g); 00277 00278 /// handle socket events 00279 int handle_socket_events (int &event_count, 00280 ACE_TP_Token_Guard &g); 00281 00282 /// This method shouldn't get called. 00283 virtual void notify_handle (ACE_HANDLE handle, 00284 ACE_Reactor_Mask mask, 00285 ACE_Handle_Set &, 00286 ACE_Event_Handler *eh, 00287 ACE_EH_PTMF callback); 00288 private: 00289 00290 /// Get the handle of the notify pipe from the ready set if there is 00291 /// an event in the notify pipe. 00292 ACE_HANDLE get_notify_handle (void); 00293 00294 /// Get socket event dispatch information. 00295 int get_socket_event_info (ACE_EH_Dispatch_Info &info); 00296 00297 /// Notify the appropriate <callback> in the context of the <eh> 00298 /// associated with <handle> that a particular event has occurred. 00299 int dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info); 00300 00301 /// Clear the @a handle from the read_set 00302 void clear_handle_read_set (ACE_HANDLE handle); 00303 00304 int post_process_socket_event (ACE_EH_Dispatch_Info &dispatch_info,int status); 00305 00306 private: 00307 /// Deny access since member-wise won't work... 00308 ACE_TP_Reactor (const ACE_TP_Reactor &); 00309 ACE_TP_Reactor &operator = (const ACE_TP_Reactor &); 00310 }; 00311 00312 ACE_END_VERSIONED_NAMESPACE_DECL 00313 00314 #if defined (__ACE_INLINE__) 00315 #include "ace/TP_Reactor.inl" 00316 #endif /* __ACE_INLINE__ */ 00317 00318 #include /**/ "ace/post.h" 00319 00320 #endif /* ACE_TP_REACTOR_H */