00001 // -*- C++ -*- 00002 00003 //============================================================================= 00004 /** 00005 * @file CSD_TP_Task.h 00006 * 00007 * $Id: CSD_TP_Task.h 80443 2008-01-16 19:23:21Z johnnyw $ 00008 * 00009 * @author Tim Bradley <bradley_t@ociweb.com> 00010 */ 00011 //============================================================================= 00012 00013 #ifndef TAO_CSD_TP_TASK_H 00014 #define TAO_CSD_TP_TASK_H 00015 00016 #include /**/ "ace/pre.h" 00017 00018 #include "tao/CSD_ThreadPool/CSD_TP_Export.h" 00019 00020 #include "tao/CSD_ThreadPool/CSD_TP_Queue.h" 00021 #include "tao/PortableServer/PortableServer.h" 00022 #include "tao/Condition.h" 00023 00024 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00025 # pragma once 00026 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00027 00028 #include "ace/Task.h" 00029 #include "ace/Synch.h" 00030 #include "ace/Containers_T.h" 00031 #include "ace/Vector_T.h" 00032 00033 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00034 00035 namespace TAO 00036 { 00037 namespace CSD 00038 { 00039 /// Typedef for the number of threads. 00040 typedef unsigned long Thread_Counter; 00041 00042 /** 00043 * @class TP_Task 00044 * 00045 * @brief Active Object managing a queue of request objects. 00046 * 00047 * There are two types of "users" of a TP_Task object: 00048 * 00049 * 1) The TP_Strategy object that "owns" this task object. 00050 * 2) The worker threads that "run" this task object as an 00051 * "active object". 00052 * 00053 * The TP_Strategy object that "owns" this task object dictates 00054 * when the worker threads are activated and when they are shutdown. It 00055 * also injects requests into this task's queue via calls to the 00056 * add_request() method. It is also the TP_Strategy object that 00057 * dictates the number of worker threads to be activated via a call to 00058 * the set_num_threads() method. 00059 * 00060 * The active object pattern is implemented via the use of the 00061 * the ACE_Task_Base base class, and each worker thread will 00062 * invoke this task's svc() method, and when the svc() returns, the 00063 * worker thread will invoke this task's close() method (with the 00064 * flag argument equal to 0). 00065 * 00066 * @note I just wanted to document an idea... When the pool consists 00067 * of only one worker thread, we could care less about checking 00068 * if target servant objects are busy or not. The simple fact 00069 * that only one thread will be dispatching all requests means 00070 * that servant objects will never be busy when the thread 00071 * tests to see if a request is "ready_for_dispatch()". I'm 00072 * just wondering if this knowledge can be applied to the 00073 * implementation such that the "pool with one worker thread" case 00074 * performs more efficiently. This is STP vs SSTP. 00075 * 00076 */ 00077 class TAO_CSD_TP_Export TP_Task : public ACE_Task_Base 00078 { 00079 public: 00080 00081 /// Default Constructor. 00082 TP_Task(); 00083 00084 /// Virtual Destructor. 00085 virtual ~TP_Task(); 00086 00087 /// Put a request object on to the request queue. 00088 /// Returns true if successful, false otherwise (it has been "rejected"). 00089 bool add_request(TP_Request* request); 00090 00091 /// Activate the worker threads 00092 virtual int open(void* num_threads_ptr = 0); 00093 00094 /// The "mainline" executed by each worker thread. 00095 virtual int svc(); 00096 00097 /// Multi-purpose: argument value is used to differentiate purpose. 00098 /// 00099 /// 0) Invoked by each worker thread after its invocation of the 00100 /// svc() method has completed (ie, returned). 00101 /// 1) Invoked by the strategy object to shutdown all worker threads. 00102 virtual int close(u_long flag = 0); 00103 00104 /// Cancel all requests that are targeted for the provided servant. 00105 void cancel_servant (PortableServer::Servant servant); 00106 00107 private: 00108 00109 typedef TAO_SYNCH_MUTEX LockType; 00110 typedef ACE_Guard<LockType> GuardType; 00111 typedef TAO_Condition<LockType> ConditionType; 00112 00113 00114 /// Lock to protect the "state" (all of the data members) of this object. 00115 LockType lock_; 00116 00117 /// Condition used to signal worker threads that they may be able to 00118 /// find a request in the queue_ that needs to be dispatched to a 00119 /// servant that is currently "not busy". 00120 /// This condition will be signal()'ed each time a new request is 00121 /// added to the queue_, and also when a servant has become "not busy". 00122 ConditionType work_available_; 00123 00124 /// This condition will be signal()'ed each time the num_threads_ 00125 /// data member has its value changed. This is used to keep the 00126 /// close(1) invocation (ie, a shutdown request) blocked until all 00127 /// of the worker threads have stopped running. 00128 ConditionType active_workers_; 00129 00130 /// Flag used to indicate when this task will (or will not) accept 00131 /// requests via the the add_request() method. 00132 bool accepting_requests_; 00133 00134 /// Flag used to initiate a shutdown request to all worker threads. 00135 bool shutdown_initiated_; 00136 00137 /// Complete shutdown needed to be deferred because the thread calling 00138 /// close(1) was also one of the ThreadPool threads 00139 bool deferred_shutdown_initiated_; 00140 00141 /// Flag used to avoid multiple open() calls. 00142 bool opened_; 00143 00144 /// The number of currently active worker threads. 00145 Thread_Counter num_threads_; 00146 00147 /// The queue of pending servant requests (a.k.a. the "request queue"). 00148 TP_Queue queue_; 00149 00150 typedef ACE_Vector <ACE_thread_t> Thread_Ids; 00151 00152 /// The list of ids for the threads launched by this task. 00153 Thread_Ids activated_threads_; 00154 00155 enum { MAX_THREADPOOL_TASK_WORKER_THREADS = 50 }; 00156 }; 00157 00158 } 00159 } 00160 00161 TAO_END_VERSIONED_NAMESPACE_DECL 00162 00163 #if defined (__ACE_INLINE__) 00164 # include "tao/CSD_ThreadPool/CSD_TP_Task.inl" 00165 #endif /* __ACE_INLINE__ */ 00166 00167 #include /**/ "ace/post.h" 00168 00169 #endif /* TAO_CSD_TP_TASK_H */