Go to the documentation of this file.00001
00002
00003 #include "tao/CSD_ThreadPool/CSD_TP_Task.h"
00004 #include "tao/CSD_ThreadPool/CSD_TP_Request.h"
00005 #include "tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.h"
00006 #include "tao/CSD_ThreadPool/CSD_TP_Cancel_Visitor.h"
00007
00008 ACE_RCSID (CSD_ThreadPool,
00009 TP_Task,
00010 "$Id: CSD_TP_Task.cpp 80443 2008-01-16 19:23:21Z johnnyw $")
00011
00012 #if !defined (__ACE_INLINE__)
00013 # include "tao/CSD_ThreadPool/CSD_TP_Task.inl"
00014 #endif
00015
00016 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00017
00018 TAO::CSD::TP_Task::~TP_Task()
00019 {
00020 }
00021
00022
00023 bool
00024 TAO::CSD::TP_Task::add_request(TP_Request* request)
00025 {
00026 GuardType guard(this->lock_);
00027
00028 if (!this->accepting_requests_)
00029 {
00030 ACE_DEBUG((LM_DEBUG,"(%P|%t) TP_Task::add_request() - "
00031 "not accepting requests\n"));
00032 return false;
00033 }
00034
00035
00036
00037
00038
00039
00040
00041 request->prepare_for_queue();
00042
00043 this->queue_.put(request);
00044
00045 this->work_available_.signal();
00046
00047 return true;
00048 }
00049
00050
00051 int
00052 TAO::CSD::TP_Task::open(void* num_threads_ptr)
00053 {
00054 Thread_Counter num = 1;
00055
00056 if (num_threads_ptr != 0)
00057 {
00058 Thread_Counter* tmp = static_cast<Thread_Counter*> (num_threads_ptr);
00059
00060 if (tmp == 0)
00061 {
00062
00063 ACE_ERROR_RETURN((LM_ERROR,
00064 ACE_TEXT ("(%P|%t) TP_Task failed to open. ")
00065 ACE_TEXT ("Invalid argument type passed to open().\n")),
00066 -1);
00067
00068 }
00069
00070 num = *tmp;
00071 }
00072
00073
00074 if (num < 1)
00075 {
00076 ACE_ERROR_RETURN((LM_ERROR,
00077 ACE_TEXT ("(%P|%t) TP_Task failed to open. ")
00078 ACE_TEXT ("num_threads (%u) is less-than 1.\n"),
00079 num),
00080 -1);
00081 }
00082
00083
00084 if (num > MAX_THREADPOOL_TASK_WORKER_THREADS)
00085 {
00086 ACE_ERROR_RETURN((LM_ERROR,
00087 ACE_TEXT ("(%P|%t) TP_Task failed to open. ")
00088 ACE_TEXT ("num_threads (%u) is too large. Max is %d.\n"),
00089 num, MAX_THREADPOOL_TASK_WORKER_THREADS),
00090 -1);
00091 }
00092
00093
00094 GuardType guard(this->lock_);
00095
00096
00097
00098 if (this->opened_)
00099 {
00100
00101 ACE_ERROR_RETURN((LM_ERROR,
00102 ACE_TEXT ("(%P|%t) TP_Task failed to open. ")
00103 ACE_TEXT ("Task has previously been open()'ed.\n")),
00104 -1);
00105
00106 }
00107
00108
00109 if (this->activate(THR_NEW_LWP | THR_JOINABLE, num) != 0)
00110 {
00111
00112
00113 ACE_ERROR_RETURN((LM_ERROR,
00114 ACE_TEXT ("(%P|%t) TP_Task failed to activate ")
00115 ACE_TEXT ("(%d) worker threads.\n"),
00116 num),
00117 -1);
00118 }
00119
00120
00121 this->opened_ = true;
00122
00123
00124 while (this->num_threads_ != num)
00125 {
00126 this->active_workers_.wait();
00127 }
00128
00129
00130 this->accepting_requests_ = true;
00131
00132 return 0;
00133 }
00134
00135
00136 int
00137 TAO::CSD::TP_Task::svc()
00138 {
00139
00140
00141 {
00142 GuardType guard(this->lock_);
00143
00144
00145 ACE_thread_t thr_id = ACE_OS::thr_self ();
00146 this->activated_threads_.push_back(thr_id);
00147 ++this->num_threads_;
00148 this->active_workers_.signal();
00149 }
00150
00151
00152
00153 TP_Dispatchable_Visitor dispatchable_visitor;
00154
00155
00156 while (1)
00157 {
00158 TP_Request_Handle request;
00159
00160
00161 {
00162
00163 GuardType guard(this->lock_);
00164
00165
00166 while (request.is_nil())
00167 {
00168 if (this->shutdown_initiated_)
00169 {
00170
00171 return 0;
00172 }
00173
00174 if (this->deferred_shutdown_initiated_)
00175 {
00176 this->deferred_shutdown_initiated_ = false;
00177 return 0;
00178 }
00179
00180
00181 if (!this->queue_.is_empty())
00182 {
00183
00184
00185
00186
00187
00188 this->queue_.accept_visitor(dispatchable_visitor);
00189
00190
00191
00192
00193
00194 request = dispatchable_visitor.request();
00195 }
00196
00197
00198
00199 if (request.is_nil())
00200 {
00201
00202
00203 this->work_available_.wait();
00204 }
00205 }
00206
00207
00208
00209
00210
00211
00212 }
00213
00214
00215 request->dispatch();
00216
00217
00218
00219
00220
00221 {
00222 GuardType guard(this->lock_);
00223 request->mark_as_ready();
00224 this->work_available_.signal();
00225 }
00226
00227
00228
00229
00230 dispatchable_visitor.reset();
00231
00232
00233
00234
00235 }
00236 }
00237
00238
00239 int
00240 TAO::CSD::TP_Task::close(u_long flag)
00241 {
00242 GuardType guard(this->lock_);
00243
00244 if (flag == 0)
00245 {
00246
00247 --this->num_threads_;
00248 this->active_workers_.signal();
00249 }
00250 else
00251 {
00252
00253
00254
00255 if (!this->opened_)
00256 {
00257 return 0;
00258 }
00259
00260
00261 this->shutdown_initiated_ = true;
00262
00263
00264 this->accepting_requests_ = false;
00265
00266
00267 this->work_available_.broadcast();
00268
00269 bool calling_thread_in_tp = false;
00270
00271 ACE_thread_t my_thr_id = ACE_OS::thr_self ();
00272
00273
00274
00275 size_t const size = this->activated_threads_.size ();
00276
00277 for (size_t i = 0; i < size; i ++)
00278 {
00279 if (this->activated_threads_[i] == my_thr_id)
00280 {
00281 calling_thread_in_tp = true;
00282 this->deferred_shutdown_initiated_ = true;
00283 break;
00284 }
00285 }
00286
00287
00288 size_t target_num_threads = calling_thread_in_tp ? 1 : 0;
00289 while (this->num_threads_ != target_num_threads)
00290 {
00291 this->active_workers_.wait();
00292 }
00293
00294
00295 TP_Cancel_Visitor cancel_visitor;
00296 this->queue_.accept_visitor(cancel_visitor);
00297
00298 this->opened_ = false;
00299 this->shutdown_initiated_ = false;
00300 }
00301
00302 return 0;
00303 }
00304
00305
00306
00307 void
00308 TAO::CSD::TP_Task::cancel_servant (PortableServer::Servant servant)
00309 {
00310 GuardType guard(this->lock_);
00311
00312
00313 TP_Cancel_Visitor cancel_visitor(servant);
00314 this->queue_.accept_visitor(cancel_visitor);
00315 }
00316
00317 TAO_END_VERSIONED_NAMESPACE_DECL