Thread_Pool.cpp

Go to the documentation of this file.
00001 #include "tao/RTCORBA/Thread_Pool.h"
00002 
00003 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00004 
00005 ACE_RCSID (RTCORBA,
00006            Thread_Pool,
00007            "Thread_Pool.cpp,v 1.26 2006/03/10 07:19:16 jtc Exp")
00008 
00009 #if ! defined (__ACE_INLINE__)
00010 #include "tao/RTCORBA/Thread_Pool.inl"
00011 #endif /* __ACE_INLINE__ */
00012 
00013 #include "tao/Exception.h"
00014 #include "tao/ORB_Core.h"
00015 #include "tao/ORB_Core_TSS_Resources.h"
00016 #include "tao/ORB.h"
00017 #include "tao/Acceptor_Registry.h"
00018 #include "tao/Transport_Cache_Manager.h"
00019 #include "tao/debug.h"
00020 #include "tao/RTCORBA/Priority_Mapping_Manager.h"
00021 #include "tao/LF_Follower.h"
00022 #include "tao/Leader_Follower.h"
00023 #include "ace/Auto_Ptr.h"
00024 
00025 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00026 
00027 TAO_RT_New_Leader_Generator::TAO_RT_New_Leader_Generator (
00028   TAO_Thread_Lane &lane)
00029   : lane_ (lane)
00030 {
00031 }
00032 
00033 void
00034 TAO_RT_New_Leader_Generator::no_leaders_available (void)
00035 {
00036   // Request a new dynamic thread from the Thread Lane
00037   this->lane_.new_dynamic_thread ();
00038 }
00039 
00040 TAO_Thread_Pool_Threads::TAO_Thread_Pool_Threads (TAO_Thread_Lane &lane)
00041   : ACE_Task_Base (lane.pool ().manager ().orb_core ().thr_mgr ()),
00042     lane_ (lane)
00043 {
00044 }
00045 
00046 int
00047 TAO_Thread_Pool_Threads::svc (void)
00048 {
00049   TAO_ORB_Core &orb_core =
00050     this->lane ().pool ().manager ().orb_core ();
00051 
00052   if (orb_core.has_shutdown ())
00053     return 0;
00054 
00055   // Set TSS resources for this thread.
00056   TAO_Thread_Pool_Threads::set_tss_resources (orb_core,
00057                                               this->lane_);
00058 
00059   ACE_TRY_NEW_ENV
00060     {
00061       // Do the work
00062       this->run (orb_core);
00063     }
00064   ACE_CATCHANY
00065     {
00066       // No point propagating this exception.  Print it out.
00067       ACE_ERROR ((LM_ERROR,
00068                   "orb->run() raised exception for thread %t\n"));
00069 
00070       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00071                            "");
00072     }
00073   ACE_ENDTRY;
00074 
00075   return 0;
00076 }
00077 
00078 int
00079 TAO_Thread_Pool_Threads::run (TAO_ORB_Core &orb_core ACE_ENV_ARG_PARAMETER)
00080 {
00081   CORBA::ORB_ptr orb = orb_core.orb ();
00082 
00083   // Run the ORB.
00084   orb->run (ACE_ENV_SINGLE_ARG_PARAMETER);
00085   ACE_CHECK_RETURN (-1);
00086 
00087   return 0;
00088 }
00089 
00090 void
00091 TAO_Thread_Pool_Threads::set_tss_resources (TAO_ORB_Core &orb_core,
00092                                             TAO_Thread_Lane &thread_lane)
00093 {
00094   /// Get the ORB_Core's TSS resources.
00095   TAO_ORB_Core_TSS_Resources &tss =
00096     *orb_core.get_tss_resources ();
00097 
00098   /// Set the lane attribute in TSS.
00099   tss.lane_ = &thread_lane;
00100 }
00101 
00102 TAO_Dynamic_Thread_Pool_Threads::TAO_Dynamic_Thread_Pool_Threads (TAO_Thread_Lane &lane)
00103   : TAO_Thread_Pool_Threads (lane)
00104 {
00105 }
00106 
00107 int
00108 TAO_Dynamic_Thread_Pool_Threads::run (TAO_ORB_Core &orb_core ACE_ENV_ARG_PARAMETER)
00109 {
00110   CORBA::ORB_ptr orb = orb_core.orb ();
00111 
00112   if (this->lane_.dynamic_thread_idle_timeout () == ACE_Time_Value::zero)
00113     {
00114       // No timeout specified, run the ORB until it shutdowns
00115       orb->run (ACE_ENV_SINGLE_ARG_PARAMETER);
00116       ACE_TRY_CHECK;
00117     }
00118   else
00119     {
00120       // A timeout is specified, run the ORB in an idle loop, if we
00121       // don't handle any operations for the given timeout we just
00122       // exit the loop and this thread ends itself.
00123       ACE_Time_Value tv (this->lane_.dynamic_thread_idle_timeout ());
00124       while (!orb_core.has_shutdown () && orb->work_pending (tv))
00125         {
00126           orb->perform_work ();
00127           tv = this->lane_.dynamic_thread_idle_timeout ();
00128         }
00129 
00130       if (TAO_debug_level > 7)
00131         ACE_DEBUG ((LM_DEBUG,
00132                     ACE_TEXT ("TAO Process %P Pool %d Lane %d Thread %t\n")
00133                     ACE_TEXT ("Current number of dynamic threads left = %d; ")
00134                     ACE_TEXT ("RTCorba worker thread is ending!\n"),
00135                     this->lane_.pool ().id (),
00136                     this->lane_.id (),
00137                     this->thr_count () - 1));
00138     }
00139 
00140   return 0;
00141 }
00142 
00143 TAO_Thread_Lane::TAO_Thread_Lane (TAO_Thread_Pool &pool,
00144                                   CORBA::ULong id,
00145                                   CORBA::Short lane_priority,
00146                                   CORBA::ULong static_threads,
00147                                   CORBA::ULong dynamic_threads,
00148                                   ACE_Time_Value const &dynamic_thread_idle_timeout
00149                                   ACE_ENV_ARG_DECL_NOT_USED)
00150   : pool_ (pool),
00151     id_ (id),
00152     lane_priority_ (lane_priority),
00153     shutdown_ (false),
00154     static_threads_number_ (static_threads),
00155     dynamic_threads_number_ (dynamic_threads),
00156     static_threads_ (*this),
00157     dynamic_threads_ (*this),
00158     new_thread_generator_ (*this),
00159     resources_ (pool.manager ().orb_core (),
00160                 &new_thread_generator_),
00161     native_priority_ (TAO_INVALID_PRIORITY),
00162     dynamic_thread_idle_timeout_ (dynamic_thread_idle_timeout)
00163 {
00164 }
00165 
00166 bool
00167 TAO_Thread_Lane::new_dynamic_thread (void)
00168 {
00169   // Note that we are checking this condition below without the lock
00170   // held.
00171   if (this->dynamic_threads_.thr_count () >= this->dynamic_threads_number_)
00172     return false;
00173 
00174   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX,
00175                     mon,
00176                     this->lock_,
00177                     false);
00178 
00179   TAO_Thread_Pool_Manager &manager =
00180     this->pool_.manager ();
00181 
00182   if (!manager.orb_core ().has_shutdown () && !this->shutdown_&&
00183       this->dynamic_threads_.thr_count () < this->dynamic_threads_number_)
00184     {
00185       if (TAO_debug_level > 0)
00186         ACE_DEBUG ((LM_DEBUG,
00187                     ACE_TEXT ("TAO Process %P Pool %d Lane %d Thread %t\n")
00188                     ACE_TEXT ("Current number of dynamic threads = %d; ")
00189                     ACE_TEXT ("static threads = %d; max dynamic threads = %d\n")
00190                     ACE_TEXT ("No leaders available; creating new leader!\n"),
00191                     this->pool_.id (),
00192                     this->id_,
00193                     this->dynamic_threads_.thr_count (),
00194                     this->static_threads_number_,
00195                     this->dynamic_threads_number_));
00196 
00197       int result =
00198         this->create_threads_i (this->dynamic_threads_,
00199                                 1,
00200                                 THR_BOUND | THR_DETACHED);
00201 
00202       if (result != 0)
00203         ACE_ERROR_RETURN ((LM_ERROR,
00204                           ACE_TEXT ("Pool %d Lane %d Thread %t: ")
00205                           ACE_TEXT ("cannot create dynamic thread\n"),
00206                           this->pool_.id (),
00207                           this->id_),
00208                           false);
00209     }
00210 
00211   return true;
00212 }
00213 
00214 void
00215 TAO_Thread_Lane::shutting_down (void)
00216 {
00217   ACE_GUARD (ACE_SYNCH_MUTEX,
00218              mon,
00219              this->lock_);
00220 
00221   // We are shutting down, this way we are not creating any more new dynamic
00222   // threads
00223   this->shutdown_ = true;
00224 }
00225 
00226 void
00227 TAO_Thread_Lane::validate_and_map_priority (ACE_ENV_SINGLE_ARG_DECL)
00228 {
00229   // Make sure that static_threads_number_ is not zero.
00230   if (this->static_threads_number_ == 0)
00231     ACE_THROW (CORBA::BAD_PARAM ());
00232 
00233   // Check that the priority is in bounds.
00234   if (this->lane_priority_ < RTCORBA::minPriority
00235            // The line below will always be false unless the value of
00236            // RTCORBA::maxPriority, which is now assigned the value of
00237            // 32767, is changed in RTCORBA.pidl.
00238 //      || this->lane_priority_ > RTCORBA::maxPriority
00239      )
00240     {
00241       ACE_THROW (CORBA::BAD_PARAM ());
00242     }
00243 
00244   CORBA::ORB_ptr orb =
00245     this->pool_.manager ().orb_core ().orb ();
00246 
00247   // Get the priority mapping manager.
00248   CORBA::Object_var obj =
00249     orb->resolve_initial_references (TAO_OBJID_PRIORITYMAPPINGMANAGER
00250                                      ACE_ENV_ARG_PARAMETER);
00251   ACE_CHECK;
00252 
00253   TAO_Priority_Mapping_Manager_var mapping_manager =
00254     TAO_Priority_Mapping_Manager::_narrow (obj.in ()
00255                                            ACE_ENV_ARG_PARAMETER);
00256   ACE_CHECK;
00257 
00258   RTCORBA::PriorityMapping *pm =
00259     mapping_manager.in ()->mapping ();
00260 
00261   // Map CORBA priority to native priority.
00262   CORBA::Boolean result =
00263     pm->to_native (this->lane_priority_,
00264                    this->native_priority_);
00265 
00266   if (!result)
00267     ACE_THROW (CORBA::DATA_CONVERSION ());
00268 
00269   if (TAO_debug_level > 3)
00270     {
00271       ACE_DEBUG ((LM_DEBUG,
00272                   ACE_TEXT ("TAO (%P|%t) - creating thread at ")
00273                   ACE_TEXT ("(corba:native) priority %d:%d\n"),
00274                   this->lane_priority_,
00275                   this->native_priority_));
00276     }
00277 }
00278 
00279 void
00280 TAO_Thread_Lane::open (ACE_ENV_SINGLE_ARG_DECL)
00281 {
00282   // Validate and map priority.
00283   this->validate_and_map_priority (ACE_ENV_SINGLE_ARG_PARAMETER);
00284   ACE_CHECK;
00285 
00286   // Create a string with the pool:thread id.
00287   char pool_lane_id[10];
00288   ACE_OS::sprintf (pool_lane_id,
00289                    "%d:%d",
00290                    this->pool ().id (),
00291                    this->id ());
00292 
00293   TAO_ORB_Parameters *params =
00294     this->pool ().manager ().orb_core ().orb_params ();
00295 
00296   TAO_EndpointSet endpoint_set;
00297   bool ignore_address;
00298 
00299   // Get the endpoints for this lane.
00300   params->get_endpoint_set (pool_lane_id,
00301                             endpoint_set);
00302 
00303   if (endpoint_set.is_empty ())
00304     {
00305       // If endpoints are not specified for this lane, use the
00306       // endpoints specified for the default lane but ignore their
00307       // addresses.
00308       params->get_endpoint_set (TAO_DEFAULT_LANE,
00309                                 endpoint_set);
00310 
00311       ignore_address = true;
00312     }
00313   else
00314     {
00315       // If endpoints are specified for this lane, use them with thier
00316       // addresses.
00317       ignore_address = false;
00318     }
00319 
00320   // Open the acceptor registry.
00321   int result = 0;
00322   result =
00323     this->resources_.open_acceptor_registry (endpoint_set,
00324                                              ignore_address
00325                                              ACE_ENV_ARG_PARAMETER);
00326   ACE_CHECK;
00327 
00328   if (result == -1)
00329     ACE_THROW (CORBA::INTERNAL (
00330                  CORBA::SystemException::_tao_minor_code (
00331                    TAO_ACCEPTOR_REGISTRY_OPEN_LOCATION_CODE,
00332                    0),
00333                  CORBA::COMPLETED_NO));
00334 }
00335 
00336 TAO_Thread_Lane::~TAO_Thread_Lane (void)
00337 {
00338 }
00339 
00340 void
00341 TAO_Thread_Lane::finalize (void)
00342 {
00343   // Finalize resources.
00344   this->resources_.finalize ();
00345 }
00346 
00347 void
00348 TAO_Thread_Lane::shutdown_reactor (void)
00349 {
00350   this->resources_.shutdown_reactor ();
00351 }
00352 
00353 void
00354 TAO_Thread_Lane::wait (void)
00355 {
00356   this->static_threads_.wait ();
00357   this->dynamic_threads_.wait ();
00358 }
00359 
00360 int
00361 TAO_Thread_Lane::is_collocated (const TAO_MProfile &mprofile)
00362 {
00363   return this->resources_.is_collocated (mprofile);
00364 }
00365 
00366 CORBA::ULong
00367 TAO_Thread_Lane::current_threads (void) const
00368 {
00369   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX,
00370                     mon,
00371                     this->lock_,
00372                     0);
00373 
00374   return (this->static_threads_.thr_count () +
00375          this->dynamic_threads_.thr_count ());
00376 }
00377 
00378 
00379 int
00380 TAO_Thread_Lane::create_static_threads (void)
00381 {
00382   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX,
00383                     mon,
00384                     this->lock_,
00385                     0);
00386 
00387   // Create static threads.
00388   return this->create_threads_i (this->static_threads_,
00389                                  this->static_threads_number_,
00390                                  THR_NEW_LWP | THR_JOINABLE);
00391 }
00392 
00393 int
00394 TAO_Thread_Lane::create_dynamic_threads (CORBA::ULong number_of_threads)
00395 {
00396   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX,
00397                     mon,
00398                     this->lock_,
00399                     0);
00400 
00401   return this->create_threads_i (this->dynamic_threads_,
00402                                  number_of_threads,
00403                                  THR_BOUND | THR_DETACHED);
00404 }
00405 
00406 int
00407 TAO_Thread_Lane::create_threads_i (TAO_Thread_Pool_Threads &thread_pool,
00408                                    CORBA::ULong number_of_threads,
00409                                    long thread_flags)
00410 {
00411   // Overwritten parameters.
00412   int force_active = 1;
00413 
00414   // Default parameters.
00415   int default_grp_id = -1;
00416   ACE_Task_Base *default_task = 0;
00417   ACE_hthread_t *default_thread_handles = 0;
00418   void **default_stack = 0;
00419 
00420   // Setting stack size.
00421   size_t *stack_size_array = 0;
00422   ACE_NEW_RETURN (stack_size_array,
00423                   size_t[number_of_threads],
00424                   -1);
00425   size_t index;
00426   for (index = 0;
00427        index != number_of_threads;
00428        ++index)
00429     stack_size_array[index] =
00430       this->pool ().stack_size ();
00431 
00432   // Make sure the dynamically created stack size array is properly
00433   // deleted.
00434   ACE_Auto_Basic_Array_Ptr<size_t> auto_stack_size_array (stack_size_array);
00435 
00436   TAO_ORB_Core &orb_core =
00437     this->pool ().manager ().orb_core ();
00438 
00439   long flags =
00440     thread_flags |
00441     orb_core.orb_params ()->thread_creation_flags ();
00442 
00443   // Activate the threads.
00444   int result =
00445     thread_pool.activate (flags,
00446                           number_of_threads,
00447                           force_active,
00448                           this->native_priority_,
00449                           default_grp_id,
00450                           default_task,
00451                           default_thread_handles,
00452                           default_stack,
00453                           stack_size_array);
00454 
00455   if (result != 0)
00456     return result;
00457 
00458   return result;
00459 }
00460 
00461 TAO_Thread_Pool::TAO_Thread_Pool (TAO_Thread_Pool_Manager &manager,
00462                                   CORBA::ULong id,
00463                                   CORBA::ULong stack_size,
00464                                   CORBA::ULong static_threads,
00465                                   CORBA::ULong dynamic_threads,
00466                                   CORBA::Short default_priority,
00467                                   CORBA::Boolean allow_request_buffering,
00468                                   CORBA::ULong max_buffered_requests,
00469                                   CORBA::ULong max_request_buffer_size,
00470                                   ACE_Time_Value const &dynamic_thread_idle_timeout
00471                                   ACE_ENV_ARG_DECL)
00472   : manager_ (manager),
00473     id_ (id),
00474     stack_size_ (stack_size),
00475     allow_borrowing_ (0),
00476     allow_request_buffering_ (allow_request_buffering),
00477     max_buffered_requests_ (max_buffered_requests),
00478     max_request_buffer_size_ (max_request_buffer_size),
00479     dynamic_thread_idle_timeout_ (dynamic_thread_idle_timeout),
00480     lanes_ (0),
00481     number_of_lanes_ (1),
00482     with_lanes_ (false)
00483 {
00484   // No support for buffering.
00485   if (allow_request_buffering)
00486     ACE_THROW (CORBA::NO_IMPLEMENT ());
00487 
00488   // Create one lane.
00489   this->lanes_ = new TAO_Thread_Lane *[this->number_of_lanes_];
00490   this->lanes_[0] =
00491     new TAO_Thread_Lane (*this,
00492                          0,
00493                          default_priority,
00494                          static_threads,
00495                          dynamic_threads,
00496                          dynamic_thread_idle_timeout
00497                          ACE_ENV_ARG_PARAMETER);
00498 }
00499 
00500 TAO_Thread_Pool::TAO_Thread_Pool (TAO_Thread_Pool_Manager &manager,
00501                                   CORBA::ULong id,
00502                                   CORBA::ULong stack_size,
00503                                   const RTCORBA::ThreadpoolLanes &lanes,
00504                                   CORBA::Boolean allow_borrowing,
00505                                   CORBA::Boolean allow_request_buffering,
00506                                   CORBA::ULong max_buffered_requests,
00507                                   CORBA::ULong max_request_buffer_size,
00508                                   ACE_Time_Value const &dynamic_thread_idle_timeout
00509                                   ACE_ENV_ARG_DECL)
00510   : manager_ (manager),
00511     id_ (id),
00512     stack_size_ (stack_size),
00513     allow_borrowing_ (allow_borrowing),
00514     allow_request_buffering_ (allow_request_buffering),
00515     max_buffered_requests_ (max_buffered_requests),
00516     max_request_buffer_size_ (max_request_buffer_size),
00517     dynamic_thread_idle_timeout_ (dynamic_thread_idle_timeout),
00518     lanes_ (0),
00519     number_of_lanes_ (lanes.length ()),
00520     with_lanes_ (true)
00521 {
00522   // No support for buffering or borrowing.
00523   if (allow_borrowing ||
00524       allow_request_buffering)
00525     ACE_THROW (CORBA::NO_IMPLEMENT ());
00526 
00527   // Create multiple lane.
00528   this->lanes_ = new TAO_Thread_Lane *[this->number_of_lanes_];
00529   for (CORBA::ULong i = 0;
00530        i != this->number_of_lanes_;
00531        ++i)
00532     this->lanes_[i] =
00533       new TAO_Thread_Lane (*this,
00534                            i,
00535                            lanes[i].lane_priority,
00536                            lanes[i].static_threads,
00537                            lanes[i].dynamic_threads,
00538                            dynamic_thread_idle_timeout
00539                            ACE_ENV_ARG_PARAMETER);
00540 }
00541 
00542 void
00543 TAO_Thread_Pool::open (ACE_ENV_SINGLE_ARG_DECL)
00544 {
00545   // Open all the lanes.
00546   for (CORBA::ULong i = 0;
00547        i != this->number_of_lanes_;
00548        ++i)
00549     {
00550       this->lanes_[i]->open (ACE_ENV_SINGLE_ARG_PARAMETER);
00551       ACE_CHECK;
00552     }
00553 }
00554 
00555 TAO_Thread_Pool::~TAO_Thread_Pool (void)
00556 {
00557   // Delete all the lanes.
00558   for (CORBA::ULong i = 0;
00559        i != this->number_of_lanes_;
00560        ++i)
00561     delete this->lanes_[i];
00562 
00563   delete[] this->lanes_;
00564 }
00565 
00566 void
00567 TAO_Thread_Pool::finalize (void)
00568 {
00569   // Finalize all the lanes.
00570   for (CORBA::ULong i = 0;
00571        i != this->number_of_lanes_;
00572        ++i)
00573     this->lanes_[i]->finalize ();
00574 }
00575 
00576 void
00577 TAO_Thread_Pool::shutdown_reactor (void)
00578 {
00579   // Finalize all the lanes.
00580   for (CORBA::ULong i = 0;
00581        i != this->number_of_lanes_;
00582        ++i)
00583     this->lanes_[i]->shutdown_reactor ();
00584 }
00585 
00586 void
00587 TAO_Thread_Pool::shutting_down (void)
00588 {
00589   // Finalize all the lanes.
00590   for (CORBA::ULong i = 0;
00591        i != this->number_of_lanes_;
00592        ++i)
00593     this->lanes_[i]->shutting_down ();
00594 }
00595 
00596 
00597 void
00598 TAO_Thread_Pool::wait (void)
00599 {
00600   // Finalize all the lanes.
00601   for (CORBA::ULong i = 0;
00602        i != this->number_of_lanes_;
00603        ++i)
00604     this->lanes_[i]->wait ();
00605 }
00606 
00607 int
00608 TAO_Thread_Pool::is_collocated (const TAO_MProfile &mprofile)
00609 {
00610   // Finalize all the lanes.
00611   for (CORBA::ULong i = 0;
00612        i != this->number_of_lanes_;
00613        ++i)
00614     {
00615       int result =
00616         this->lanes_[i]->is_collocated (mprofile);
00617 
00618       if (result)
00619         return result;
00620     }
00621 
00622   return 0;
00623 }
00624 
00625 int
00626 TAO_Thread_Pool::create_static_threads (void)
00627 {
00628   for (CORBA::ULong i = 0;
00629        i != this->number_of_lanes_;
00630        ++i)
00631     {
00632       // Ask each lane to create its set of static threads.
00633       int result = this->lanes_[i]->create_static_threads ();
00634 
00635       // Return on failure.
00636       if (result != 0)
00637         return result;
00638     }
00639 
00640   // Success.
00641   return 0;
00642 }
00643 
00644 #define TAO_THREAD_POOL_MANAGER_GUARD \
00645   ACE_GUARD_THROW_EX ( \
00646     ACE_SYNCH_MUTEX, \
00647     mon, \
00648     this->lock_, \
00649     CORBA::INTERNAL ( \
00650       CORBA::SystemException::_tao_minor_code ( \
00651         TAO_GUARD_FAILURE, \
00652         0), \
00653       CORBA::COMPLETED_NO));
00654 
00655 TAO_Thread_Pool_Manager::TAO_Thread_Pool_Manager (TAO_ORB_Core &orb_core)
00656   : orb_core_ (orb_core),
00657     thread_pools_ (),
00658     thread_pool_id_counter_ (1),
00659     lock_ ()
00660 {
00661 }
00662 
00663 TAO_Thread_Pool_Manager::~TAO_Thread_Pool_Manager (void)
00664 {
00665   // Delete all the pools.
00666   for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
00667        iterator != this->thread_pools_.end ();
00668        ++iterator)
00669     delete (*iterator).int_id_;
00670 }
00671 
00672 void
00673 TAO_Thread_Pool_Manager::finalize (void)
00674 {
00675   // Finalize all the pools.
00676   for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
00677        iterator != this->thread_pools_.end ();
00678        ++iterator)
00679     (*iterator).int_id_->finalize ();
00680 }
00681 
00682 void
00683 TAO_Thread_Pool_Manager::shutdown_reactor (void)
00684 {
00685   // Finalize all the pools.
00686   for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
00687        iterator != this->thread_pools_.end ();
00688        ++iterator)
00689     (*iterator).int_id_->shutdown_reactor ();
00690 }
00691 
00692 void
00693 TAO_Thread_Pool_Manager::wait (void)
00694 {
00695   // Finalize all the pools.
00696   for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
00697        iterator != this->thread_pools_.end ();
00698        ++iterator)
00699     (*iterator).int_id_->wait ();
00700 }
00701 
00702 int
00703 TAO_Thread_Pool_Manager::is_collocated (const TAO_MProfile &mprofile)
00704 {
00705   // Finalize all the pools.
00706   for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
00707        iterator != this->thread_pools_.end ();
00708        ++iterator)
00709     {
00710       int result =
00711         (*iterator).int_id_->is_collocated (mprofile);
00712 
00713       if (result)
00714         return result;
00715     }
00716 
00717   return 0;
00718 }
00719 
00720 RTCORBA::ThreadpoolId
00721 TAO_Thread_Pool_Manager::create_threadpool (CORBA::ULong stacksize,
00722                                             CORBA::ULong static_threads,
00723                                             CORBA::ULong dynamic_threads,
00724                                             RTCORBA::Priority default_priority,
00725                                             CORBA::Boolean allow_request_buffering,
00726                                             CORBA::ULong max_buffered_requests,
00727                                             CORBA::ULong max_request_buffer_size,
00728                                             ACE_Time_Value const &dynamic_thread_idle_timeout
00729                                             ACE_ENV_ARG_DECL)
00730   ACE_THROW_SPEC ((CORBA::SystemException))
00731 {
00732   TAO_THREAD_POOL_MANAGER_GUARD;
00733   ACE_CHECK_RETURN (0);
00734 
00735   return this->create_threadpool_i (stacksize,
00736                                     static_threads,
00737                                     dynamic_threads,
00738                                     default_priority,
00739                                     allow_request_buffering,
00740                                     max_buffered_requests,
00741                                     max_request_buffer_size,
00742                                     dynamic_thread_idle_timeout
00743                                     ACE_ENV_ARG_PARAMETER);
00744 }
00745 
00746 RTCORBA::ThreadpoolId
00747 TAO_Thread_Pool_Manager::create_threadpool_with_lanes (CORBA::ULong stacksize,
00748                                                        const RTCORBA::ThreadpoolLanes & lanes,
00749                                                        CORBA::Boolean allow_borrowing,
00750                                                        CORBA::Boolean allow_request_buffering,
00751                                                        CORBA::ULong max_buffered_requests,
00752                                                        CORBA::ULong max_request_buffer_size,
00753                                                        ACE_Time_Value const &dynamic_thread_idle_timeout
00754                                                        ACE_ENV_ARG_DECL)
00755   ACE_THROW_SPEC ((CORBA::SystemException))
00756 {
00757   TAO_THREAD_POOL_MANAGER_GUARD;
00758   ACE_CHECK_RETURN (0);
00759 
00760   return this->create_threadpool_with_lanes_i (stacksize,
00761                                                lanes,
00762                                                allow_borrowing,
00763                                                allow_request_buffering,
00764                                                max_buffered_requests,
00765                                                max_request_buffer_size,
00766                                                dynamic_thread_idle_timeout
00767                                                ACE_ENV_ARG_PARAMETER);
00768 }
00769 
00770 void
00771 TAO_Thread_Pool_Manager::destroy_threadpool (RTCORBA::ThreadpoolId threadpool
00772                                              ACE_ENV_ARG_DECL)
00773   ACE_THROW_SPEC ((CORBA::SystemException,
00774                    RTCORBA::RTORB::InvalidThreadpool))
00775 {
00776   TAO_Thread_Pool *tao_thread_pool = 0;
00777 
00778   // The guard is just for the map, don't do a wait inside the guard, because
00779   // during the wait other threads can try to access the thread pool manager
00780   // also, this can be one of the threads we are waiting for, which then
00781   // results in a deadlock
00782   {
00783     TAO_THREAD_POOL_MANAGER_GUARD;
00784     ACE_CHECK;
00785 
00786     // Unbind the thread pool from the map.
00787     int result =
00788       this->thread_pools_.unbind (threadpool,
00789                                   tao_thread_pool);
00790 
00791     // If the thread pool is not found in our map.
00792     if (result != 0)
00793       ACE_THROW (RTCORBA::RTORB::InvalidThreadpool ());
00794   }
00795 
00796   // Mark the thread pool that we are shutting down.
00797   tao_thread_pool->shutting_down ();
00798 
00799   // Shutdown reactor.
00800   tao_thread_pool->shutdown_reactor ();
00801 
00802   // Wait for the threads.
00803   tao_thread_pool->wait ();
00804 
00805   // Finalize resources.
00806   tao_thread_pool->finalize ();
00807 
00808   // Delete the thread pool.
00809   delete tao_thread_pool;
00810 
00811 }
00812 
00813 RTCORBA::ThreadpoolId
00814 TAO_Thread_Pool_Manager::create_threadpool_i (CORBA::ULong stacksize,
00815                                               CORBA::ULong static_threads,
00816                                               CORBA::ULong dynamic_threads,
00817                                               RTCORBA::Priority default_priority,
00818                                               CORBA::Boolean allow_request_buffering,
00819                                               CORBA::ULong max_buffered_requests,
00820                                               CORBA::ULong max_request_buffer_size,
00821                                               ACE_Time_Value const &dynamic_thread_idle_timeout
00822                                               ACE_ENV_ARG_DECL)
00823   ACE_THROW_SPEC ((CORBA::SystemException))
00824 {
00825   // Create the thread pool.
00826   TAO_Thread_Pool *thread_pool = 0;
00827 
00828   ACE_NEW_THROW_EX (thread_pool,
00829                     TAO_Thread_Pool (*this,
00830                                      this->thread_pool_id_counter_,
00831                                      stacksize,
00832                                      static_threads,
00833                                      dynamic_threads,
00834                                      default_priority,
00835                                      allow_request_buffering,
00836                                      max_buffered_requests,
00837                                      max_request_buffer_size,
00838                                      dynamic_thread_idle_timeout
00839                                      ACE_ENV_ARG_PARAMETER),
00840                     CORBA::NO_MEMORY ());
00841   ACE_CHECK_RETURN (0);
00842 
00843   return this->create_threadpool_helper (thread_pool
00844                                          ACE_ENV_ARG_PARAMETER);
00845 }
00846 
00847 RTCORBA::ThreadpoolId
00848 TAO_Thread_Pool_Manager::create_threadpool_with_lanes_i (CORBA::ULong stacksize,
00849                                                          const RTCORBA::ThreadpoolLanes &lanes,
00850                                                          CORBA::Boolean allow_borrowing,
00851                                                          CORBA::Boolean allow_request_buffering,
00852                                                          CORBA::ULong max_buffered_requests,
00853                                                          CORBA::ULong max_request_buffer_size,
00854                                                          ACE_Time_Value const &dynamic_thread_idle_timeout
00855                                                          ACE_ENV_ARG_DECL)
00856   ACE_THROW_SPEC ((CORBA::SystemException))
00857 {
00858   // Create the thread pool.
00859   TAO_Thread_Pool *thread_pool = 0;
00860 
00861   ACE_NEW_THROW_EX (thread_pool,
00862                     TAO_Thread_Pool (*this,
00863                                      this->thread_pool_id_counter_,
00864                                      stacksize,
00865                                      lanes,
00866                                      allow_borrowing,
00867                                      allow_request_buffering,
00868                                      max_buffered_requests,
00869                                      max_request_buffer_size,
00870                                      dynamic_thread_idle_timeout
00871                                      ACE_ENV_ARG_PARAMETER),
00872                     CORBA::NO_MEMORY ());
00873   ACE_CHECK_RETURN (0);
00874 
00875   return this->create_threadpool_helper (thread_pool
00876                                          ACE_ENV_ARG_PARAMETER);
00877 }
00878 
00879 RTCORBA::ThreadpoolId
00880 TAO_Thread_Pool_Manager::create_threadpool_helper (TAO_Thread_Pool *thread_pool
00881                                                    ACE_ENV_ARG_DECL)
00882   ACE_THROW_SPEC ((CORBA::SystemException))
00883 {
00884   // Make sure of safe deletion in case of errors.
00885   auto_ptr<TAO_Thread_Pool> safe_thread_pool (thread_pool);
00886 
00887   // Open the pool.
00888   thread_pool->open (ACE_ENV_SINGLE_ARG_PARAMETER);
00889   ACE_CHECK_RETURN (0);
00890 
00891   // Create the static threads.
00892   int result =
00893     thread_pool->create_static_threads ();
00894 
00895   // Throw exception in case of errors.
00896   if (result != 0)
00897     {
00898       // Finalize thread pool related resources.
00899       thread_pool->finalize ();
00900 
00901       ACE_THROW_RETURN (
00902         CORBA::INTERNAL (
00903           CORBA::SystemException::_tao_minor_code (
00904             TAO_RTCORBA_THREAD_CREATION_LOCATION_CODE,
00905             errno),
00906           CORBA::COMPLETED_NO),
00907         result);
00908     }
00909 
00910   // Bind thread to internal table.
00911   result =
00912     this->thread_pools_.bind (this->thread_pool_id_counter_,
00913                               thread_pool);
00914 
00915   // Throw exceptin in case of errors.
00916   if (result != 0)
00917     ACE_THROW_RETURN (CORBA::INTERNAL (),
00918                       result);
00919 
00920   //
00921   // Success.
00922   //
00923 
00924   // No need to delete thread pool.
00925   safe_thread_pool.release ();
00926 
00927   // Return current counter and perform post-increment.
00928   return this->thread_pool_id_counter_++;
00929 }
00930 
00931 TAO_Thread_Pool *
00932 TAO_Thread_Pool_Manager::get_threadpool (RTCORBA::ThreadpoolId thread_pool_id ACE_ENV_ARG_DECL)
00933 {
00934   TAO_THREAD_POOL_MANAGER_GUARD;
00935   ACE_CHECK_RETURN (0);
00936 
00937   TAO_Thread_Pool *thread_pool = 0;
00938   int result =
00939     thread_pools_.find (thread_pool_id,
00940                         thread_pool);
00941 
00942   ACE_UNUSED_ARG (result);
00943 
00944   return thread_pool;
00945 }
00946 
00947 TAO_ORB_Core &
00948 TAO_Thread_Pool_Manager::orb_core (void) const
00949 {
00950   return this->orb_core_;
00951 }
00952 
00953 TAO_END_VERSIONED_NAMESPACE_DECL
00954 
00955 #endif /* TAO_HAS_CORBA_MESSAGING && TAO_HAS_CORBA_MESSAGING != 0 */

Generated on Thu Nov 9 12:58:07 2006 for TAO_RTCORBA by doxygen 1.3.6