Thread_Pool.cpp

Go to the documentation of this file.
00001 #include "tao/RTCORBA/Thread_Pool.h"
00002 
00003 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00004 
00005 ACE_RCSID (RTCORBA,
00006            Thread_Pool,
00007            "$Id: Thread_Pool.cpp 81455 2008-04-28 10:26:37Z johnnyw $")
00008 
00009 #if ! defined (__ACE_INLINE__)
00010 #include "tao/RTCORBA/Thread_Pool.inl"
00011 #endif /* __ACE_INLINE__ */
00012 
00013 #include "tao/Exception.h"
00014 #include "tao/ORB_Core.h"
00015 #include "tao/ORB_Core_TSS_Resources.h"
00016 #include "tao/ORB.h"
00017 #include "tao/Acceptor_Registry.h"
00018 #include "tao/Transport_Cache_Manager.h"
00019 #include "tao/debug.h"
00020 #include "tao/RTCORBA/Priority_Mapping_Manager.h"
00021 #include "tao/LF_Follower.h"
00022 #include "tao/Leader_Follower.h"
00023 #include "ace/Auto_Ptr.h"
00024 
00025 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00026 
00027 TAO_RT_New_Leader_Generator::TAO_RT_New_Leader_Generator (
00028   TAO_Thread_Lane &lane)
00029   : lane_ (lane)
00030 {
00031 }
00032 
00033 void
00034 TAO_RT_New_Leader_Generator::no_leaders_available (void)
00035 {
00036   // Request a new dynamic thread from the Thread Lane
00037   this->lane_.new_dynamic_thread ();
00038 }
00039 
00040 TAO_Thread_Pool_Threads::TAO_Thread_Pool_Threads (TAO_Thread_Lane &lane)
00041   : ACE_Task_Base (lane.pool ().manager ().orb_core ().thr_mgr ()),
00042     lane_ (lane)
00043 {
00044 }
00045 
00046 int
00047 TAO_Thread_Pool_Threads::svc (void)
00048 {
00049   TAO_ORB_Core &orb_core =
00050     this->lane ().pool ().manager ().orb_core ();
00051 
00052   if (orb_core.has_shutdown ())
00053     return 0;
00054 
00055   // Set TSS resources for this thread.
00056   TAO_Thread_Pool_Threads::set_tss_resources (orb_core, this->lane_);
00057 
00058   try
00059     {
00060       // Do the work
00061       this->run (orb_core);
00062     }
00063   catch (const ::CORBA::Exception& ex)
00064     {
00065       // No point propagating this exception.  Print it out.
00066       ACE_ERROR ((LM_ERROR,
00067                   "orb->run() raised exception for thread %t\n"));
00068 
00069       ex._tao_print_exception ("");
00070     }
00071 
00072   return 0;
00073 }
00074 
00075 int
00076 TAO_Thread_Pool_Threads::run (TAO_ORB_Core &orb_core)
00077 {
00078   CORBA::ORB_ptr orb = orb_core.orb ();
00079 
00080   // Run the ORB.
00081   orb->run ();
00082 
00083   return 0;
00084 }
00085 
00086 void
00087 TAO_Thread_Pool_Threads::set_tss_resources (TAO_ORB_Core &orb_core,
00088                                             TAO_Thread_Lane &thread_lane)
00089 {
00090   /// Get the ORB_Core's TSS resources.
00091   TAO_ORB_Core_TSS_Resources &tss =
00092     *orb_core.get_tss_resources ();
00093 
00094   /// Set the lane attribute in TSS.
00095   tss.lane_ = &thread_lane;
00096 }
00097 
00098 TAO_Dynamic_Thread_Pool_Threads::TAO_Dynamic_Thread_Pool_Threads (TAO_Thread_Lane &lane)
00099   : TAO_Thread_Pool_Threads (lane)
00100 {
00101 }
00102 
00103 int
00104 TAO_Dynamic_Thread_Pool_Threads::run (TAO_ORB_Core &orb_core)
00105 {
00106   CORBA::ORB_ptr orb = orb_core.orb ();
00107 
00108   switch (this->lane_.lifespan ())
00109   {
00110     case TAO_RT_ORBInitializer::TAO_RTCORBA_DT_FIXED :
00111       {
00112         ACE_Time_Value tv_run (this->lane_.dynamic_thread_time ());
00113         orb->run (tv_run);
00114       }
00115       break;
00116     case TAO_RT_ORBInitializer::TAO_RTCORBA_DT_IDLE :
00117       {
00118         // A timeout is specified, run the ORB in an idle loop, if we
00119         // don't handle any operations for the given timeout we just
00120         // exit the loop and this thread ends itself.
00121         ACE_Time_Value tv (this->lane_.dynamic_thread_time ());
00122         while (!orb_core.has_shutdown () && orb->work_pending (tv))
00123           {
00124             // Run the ORB for the specified timeout, this prevents looping
00125             // between work_pending/handle_events
00126             tv = this->lane_.dynamic_thread_time ();
00127             orb->run (tv);
00128             // Reset the idle timeout
00129             tv = this->lane_.dynamic_thread_time ();
00130           }
00131       }
00132       break;
00133     case TAO_RT_ORBInitializer::TAO_RTCORBA_DT_INFINITIVE :
00134       {
00135         // No timeout specified, run the ORB until it shutdowns
00136         orb->run ();
00137       }
00138       break;
00139   }
00140 
00141   if (TAO_debug_level > 7)
00142     {
00143       ACE_DEBUG ((LM_DEBUG,
00144                   ACE_TEXT ("TAO Process %P Pool %d Lane %d Thread %t\n")
00145                   ACE_TEXT ("Current number of dynamic threads left = %d; ")
00146                   ACE_TEXT ("RTCorba worker thread is ending!\n"),
00147                   this->lane_.pool ().id (),
00148                   this->lane_.id (),
00149                   this->thr_count () - 1));
00150     }
00151 
00152   return 0;
00153 }
00154 
00155 TAO_Thread_Lane::TAO_Thread_Lane (TAO_Thread_Pool &pool,
00156                                   CORBA::ULong id,
00157                                   CORBA::Short lane_priority,
00158                                   CORBA::ULong static_threads,
00159                                   CORBA::ULong dynamic_threads,
00160                                   TAO_RT_ORBInitializer::TAO_RTCORBA_DT_LifeSpan lifespan,
00161                                   ACE_Time_Value const &dynamic_thread_time)
00162   : pool_ (pool),
00163     id_ (id),
00164     lane_priority_ (lane_priority),
00165     shutdown_ (false),
00166     static_threads_number_ (static_threads),
00167     dynamic_threads_number_ (dynamic_threads),
00168     static_threads_ (*this),
00169     dynamic_threads_ (*this),
00170     new_thread_generator_ (*this),
00171     resources_ (pool.manager ().orb_core (),
00172                 &new_thread_generator_),
00173     native_priority_ (TAO_INVALID_PRIORITY),
00174     lifespan_ (lifespan),
00175     dynamic_thread_time_ (dynamic_thread_time)
00176 {
00177 }
00178 
00179 bool
00180 TAO_Thread_Lane::new_dynamic_thread (void)
00181 {
00182   // Note that we are checking this condition below without the lock
00183   // held.
00184   if (this->dynamic_threads_.thr_count () >= this->dynamic_threads_number_)
00185     return false;
00186 
00187   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX,
00188                     mon,
00189                     this->lock_,
00190                     false);
00191 
00192   TAO_Thread_Pool_Manager &manager = this->pool_.manager ();
00193 
00194   if (!manager.orb_core ().has_shutdown () && !this->shutdown_&&
00195       this->dynamic_threads_.thr_count () < this->dynamic_threads_number_)
00196     {
00197       if (TAO_debug_level > 0)
00198         ACE_DEBUG ((LM_DEBUG,
00199                     ACE_TEXT ("TAO Process %P Pool %d Lane %d Thread %t\n")
00200                     ACE_TEXT ("Current number of dynamic threads = %d; ")
00201                     ACE_TEXT ("static threads = %d; max dynamic threads = %d\n")
00202                     ACE_TEXT ("No leaders available; creating new leader!\n"),
00203                     this->pool_.id (),
00204                     this->id_,
00205                     this->dynamic_threads_.thr_count (),
00206                     this->static_threads_number_,
00207                     this->dynamic_threads_number_));
00208 
00209       int result =
00210         this->create_threads_i (this->dynamic_threads_,
00211                                 1,
00212                                 THR_BOUND | THR_DETACHED);
00213 
00214       if (result != 0)
00215         ACE_ERROR_RETURN ((LM_ERROR,
00216                           ACE_TEXT ("Pool %d Lane %d Thread %t: ")
00217                           ACE_TEXT ("cannot create dynamic thread\n"),
00218                           this->pool_.id (),
00219                           this->id_),
00220                           false);
00221     }
00222 
00223   return true;
00224 }
00225 
00226 void
00227 TAO_Thread_Lane::shutting_down (void)
00228 {
00229   ACE_GUARD (TAO_SYNCH_MUTEX,
00230              mon,
00231              this->lock_);
00232 
00233   // We are shutting down, this way we are not creating any more new dynamic
00234   // threads
00235   this->shutdown_ = true;
00236 }
00237 
00238 void
00239 TAO_Thread_Lane::validate_and_map_priority (void)
00240 {
00241   // Make sure that static_threads_number_ is not zero.
00242   if (this->static_threads_number_ == 0)
00243     throw ::CORBA::BAD_PARAM ();
00244 
00245   // Check that the priority is in bounds.
00246   if (this->lane_priority_ < RTCORBA::minPriority
00247            // The line below will always be false unless the value of
00248            // RTCORBA::maxPriority, which is now assigned the value of
00249            // 32767, is changed in RTCORBA.pidl.
00250 //      || this->lane_priority_ > RTCORBA::maxPriority
00251      )
00252     {
00253       throw ::CORBA::BAD_PARAM ();
00254     }
00255 
00256   CORBA::ORB_ptr orb = this->pool_.manager ().orb_core ().orb ();
00257 
00258   // Get the priority mapping manager.
00259   CORBA::Object_var obj =
00260     orb->resolve_initial_references (TAO_OBJID_PRIORITYMAPPINGMANAGER);
00261 
00262   TAO_Priority_Mapping_Manager_var mapping_manager =
00263     TAO_Priority_Mapping_Manager::_narrow (obj.in ());
00264 
00265   RTCORBA::PriorityMapping *pm = mapping_manager.in ()->mapping ();
00266 
00267   // Map CORBA priority to native priority.
00268   CORBA::Boolean const result =
00269     pm->to_native (this->lane_priority_, this->native_priority_);
00270 
00271   if (!result)
00272     throw ::CORBA::DATA_CONVERSION ();
00273 
00274   if (TAO_debug_level > 3)
00275     {
00276       ACE_DEBUG ((LM_DEBUG,
00277                   ACE_TEXT ("TAO (%P|%t) - creating thread at ")
00278                   ACE_TEXT ("(corba:native) priority %d:%d\n"),
00279                   this->lane_priority_,
00280                   this->native_priority_));
00281     }
00282 }
00283 
00284 void
00285 TAO_Thread_Lane::open (void)
00286 {
00287   // Validate and map priority.
00288   this->validate_and_map_priority ();
00289 
00290   char pool_lane_id[10];
00291   TAO_ORB_Parameters *params =
00292     this->pool ().manager ().orb_core ().orb_params ();
00293   TAO_EndpointSet endpoint_set;
00294 
00295   // Create a string just *:* which means all pools all thread id's
00296   ACE_OS::sprintf (pool_lane_id,
00297                    "*:*");
00298 
00299   // Get the endpoints for all
00300   params->get_endpoint_set (pool_lane_id, endpoint_set);
00301 
00302   // Create a string with pool:* which means all lanes for this pool
00303   ACE_OS::sprintf (pool_lane_id,
00304                    "%d:*",
00305                    this->pool ().id ());
00306 
00307   // Get the endpoints for this pool.
00308   params->get_endpoint_set (pool_lane_id, endpoint_set);
00309 
00310   // Create a string with *:lane which means a lan of all pools
00311   ACE_OS::sprintf (pool_lane_id,
00312                    "*:%d",
00313                    this->id ());
00314 
00315   // Get the endpoints for this lane.
00316   params->get_endpoint_set (pool_lane_id, endpoint_set);
00317 
00318   // Create a string with the pool:thread id.
00319   ACE_OS::sprintf (pool_lane_id,
00320                    "%d:%d",
00321                    this->pool ().id (),
00322                    this->id ());
00323 
00324   // Get the endpoints for this lane.
00325   params->get_endpoint_set (pool_lane_id, endpoint_set);
00326 
00327   bool ignore_address = false;
00328 
00329   if (endpoint_set.is_empty ())
00330     {
00331       // If endpoints are not specified for this lane, use the
00332       // endpoints specified for the default lane but ignore their
00333       // addresses.
00334       params->get_endpoint_set (TAO_DEFAULT_LANE, endpoint_set);
00335 
00336       ignore_address = true;
00337     }
00338   else
00339     {
00340       // If endpoints are specified for this lane, use them with their
00341       // addresses.
00342       ignore_address = false;
00343     }
00344 
00345   // Open the acceptor registry.
00346   int const result =
00347     this->resources_.open_acceptor_registry (endpoint_set, ignore_address);
00348 
00349   if (result == -1)
00350     throw ::CORBA::INTERNAL (
00351                  CORBA::SystemException::_tao_minor_code (
00352                    TAO_ACCEPTOR_REGISTRY_OPEN_LOCATION_CODE,
00353                    0),
00354                  CORBA::COMPLETED_NO);
00355 }
00356 
00357 TAO_Thread_Lane::~TAO_Thread_Lane (void)
00358 {
00359 }
00360 
00361 void
00362 TAO_Thread_Lane::finalize (void)
00363 {
00364   // Finalize resources.
00365   this->resources_.finalize ();
00366 }
00367 
00368 void
00369 TAO_Thread_Lane::shutdown_reactor (void)
00370 {
00371   this->resources_.shutdown_reactor ();
00372 }
00373 
00374 void
00375 TAO_Thread_Lane::wait (void)
00376 {
00377   this->static_threads_.wait ();
00378   this->dynamic_threads_.wait ();
00379 }
00380 
00381 int
00382 TAO_Thread_Lane::is_collocated (const TAO_MProfile &mprofile)
00383 {
00384   return this->resources_.is_collocated (mprofile);
00385 }
00386 
00387 CORBA::ULong
00388 TAO_Thread_Lane::current_threads (void) const
00389 {
00390   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX,
00391                     mon,
00392                     this->lock_,
00393                     0);
00394 
00395   return (this->static_threads_.thr_count () +
00396          this->dynamic_threads_.thr_count ());
00397 }
00398 
00399 
00400 int
00401 TAO_Thread_Lane::create_static_threads (void)
00402 {
00403   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX,
00404                     mon,
00405                     this->lock_,
00406                     0);
00407 
00408   // Create static threads.
00409   return this->create_threads_i (this->static_threads_,
00410                                  this->static_threads_number_,
00411                                  THR_NEW_LWP | THR_JOINABLE);
00412 }
00413 
00414 int
00415 TAO_Thread_Lane::create_dynamic_threads (CORBA::ULong number_of_threads)
00416 {
00417   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX,
00418                     mon,
00419                     this->lock_,
00420                     0);
00421 
00422   return this->create_threads_i (this->dynamic_threads_,
00423                                  number_of_threads,
00424                                  THR_BOUND | THR_DETACHED);
00425 }
00426 
00427 int
00428 TAO_Thread_Lane::create_threads_i (TAO_Thread_Pool_Threads &thread_pool,
00429                                    CORBA::ULong number_of_threads,
00430                                    long thread_flags)
00431 {
00432   // Overwritten parameters.
00433   int force_active = 1;
00434 
00435   // Default parameters.
00436   int default_grp_id = -1;
00437   ACE_Task_Base *default_task = 0;
00438   ACE_hthread_t *default_thread_handles = 0;
00439   void **default_stack = 0;
00440 
00441   // Setting stack size.
00442   size_t *stack_size_array = 0;
00443   ACE_NEW_RETURN (stack_size_array,
00444                   size_t[number_of_threads],
00445                   -1);
00446   size_t index;
00447   for (index = 0;
00448        index != number_of_threads;
00449        ++index)
00450     stack_size_array[index] =
00451       this->pool ().stack_size ();
00452 
00453   // Make sure the dynamically created stack size array is properly
00454   // deleted.
00455   ACE_Auto_Basic_Array_Ptr<size_t> auto_stack_size_array (stack_size_array);
00456 
00457   TAO_ORB_Core &orb_core =
00458     this->pool ().manager ().orb_core ();
00459 
00460   long flags =
00461     thread_flags |
00462     orb_core.orb_params ()->thread_creation_flags ();
00463 
00464   // Activate the threads.
00465   int result =
00466     thread_pool.activate (flags,
00467                           number_of_threads,
00468                           force_active,
00469                           this->native_priority_,
00470                           default_grp_id,
00471                           default_task,
00472                           default_thread_handles,
00473                           default_stack,
00474                           stack_size_array);
00475 
00476   if (result != 0)
00477     return result;
00478 
00479   return result;
00480 }
00481 
00482 TAO_Thread_Pool::TAO_Thread_Pool (TAO_Thread_Pool_Manager &manager,
00483                                   CORBA::ULong id,
00484                                   CORBA::ULong stack_size,
00485                                   CORBA::ULong static_threads,
00486                                   CORBA::ULong dynamic_threads,
00487                                   CORBA::Short default_priority,
00488                                   CORBA::Boolean allow_request_buffering,
00489                                   CORBA::ULong max_buffered_requests,
00490                                   CORBA::ULong max_request_buffer_size,
00491                                   TAO_RT_ORBInitializer::TAO_RTCORBA_DT_LifeSpan lifespan,
00492                                   ACE_Time_Value const &dynamic_thread_time)
00493   : manager_ (manager),
00494     id_ (id),
00495     stack_size_ (stack_size),
00496     allow_borrowing_ (0),
00497     allow_request_buffering_ (allow_request_buffering),
00498     max_buffered_requests_ (max_buffered_requests),
00499     max_request_buffer_size_ (max_request_buffer_size),
00500     lifespan_ (lifespan),
00501     dynamic_thread_time_ (dynamic_thread_time),
00502     lanes_ (0),
00503     number_of_lanes_ (1),
00504     with_lanes_ (false)
00505 {
00506   // No support for buffering.
00507   if (allow_request_buffering)
00508     throw ::CORBA::NO_IMPLEMENT ();
00509 
00510   // Create one lane.
00511   ACE_NEW (this->lanes_,
00512            TAO_Thread_Lane *[this->number_of_lanes_]);
00513   ACE_NEW (this->lanes_[0],
00514            TAO_Thread_Lane (*this,
00515                             0,
00516                             default_priority,
00517                             static_threads,
00518                             dynamic_threads,
00519                             lifespan,
00520                             dynamic_thread_time
00521                            ));
00522 }
00523 
00524 TAO_Thread_Pool::TAO_Thread_Pool (TAO_Thread_Pool_Manager &manager,
00525                                   CORBA::ULong id,
00526                                   CORBA::ULong stack_size,
00527                                   const RTCORBA::ThreadpoolLanes &lanes,
00528                                   CORBA::Boolean allow_borrowing,
00529                                   CORBA::Boolean allow_request_buffering,
00530                                   CORBA::ULong max_buffered_requests,
00531                                   CORBA::ULong max_request_buffer_size,
00532                                   TAO_RT_ORBInitializer::TAO_RTCORBA_DT_LifeSpan lifespan,
00533                                   ACE_Time_Value const &dynamic_thread_time)
00534   : manager_ (manager),
00535     id_ (id),
00536     stack_size_ (stack_size),
00537     allow_borrowing_ (allow_borrowing),
00538     allow_request_buffering_ (allow_request_buffering),
00539     max_buffered_requests_ (max_buffered_requests),
00540     max_request_buffer_size_ (max_request_buffer_size),
00541     lifespan_ (lifespan),
00542     dynamic_thread_time_ (dynamic_thread_time),
00543     lanes_ (0),
00544     number_of_lanes_ (lanes.length ()),
00545     with_lanes_ (true)
00546 {
00547   // No support for buffering or borrowing.
00548   if (allow_borrowing ||
00549       allow_request_buffering)
00550     throw ::CORBA::NO_IMPLEMENT ();
00551 
00552   // Create multiple lane.
00553   ACE_NEW (this->lanes_,
00554            TAO_Thread_Lane *[this->number_of_lanes_]);
00555   for (CORBA::ULong i = 0;
00556        i != this->number_of_lanes_;
00557        ++i)
00558     ACE_NEW (this->lanes_[i],
00559              TAO_Thread_Lane (*this,
00560                               i,
00561                               lanes[i].lane_priority,
00562                               lanes[i].static_threads,
00563                               lanes[i].dynamic_threads,
00564                               lifespan,
00565                               dynamic_thread_time
00566                              ));
00567 }
00568 
00569 void
00570 TAO_Thread_Pool::open (void)
00571 {
00572   // Open all the lanes.
00573   for (CORBA::ULong i = 0;
00574        i != this->number_of_lanes_;
00575        ++i)
00576     {
00577       this->lanes_[i]->open ();
00578     }
00579 }
00580 
00581 TAO_Thread_Pool::~TAO_Thread_Pool (void)
00582 {
00583   // Delete all the lanes.
00584   for (CORBA::ULong i = 0;
00585        i != this->number_of_lanes_;
00586        ++i)
00587     delete this->lanes_[i];
00588 
00589   delete[] this->lanes_;
00590 }
00591 
00592 void
00593 TAO_Thread_Pool::finalize (void)
00594 {
00595   // Finalize all the lanes.
00596   for (CORBA::ULong i = 0;
00597        i != this->number_of_lanes_;
00598        ++i)
00599     this->lanes_[i]->finalize ();
00600 }
00601 
00602 void
00603 TAO_Thread_Pool::shutdown_reactor (void)
00604 {
00605   // Finalize all the lanes.
00606   for (CORBA::ULong i = 0;
00607        i != this->number_of_lanes_;
00608        ++i)
00609     this->lanes_[i]->shutdown_reactor ();
00610 }
00611 
00612 void
00613 TAO_Thread_Pool::shutting_down (void)
00614 {
00615   // Finalize all the lanes.
00616   for (CORBA::ULong i = 0;
00617        i != this->number_of_lanes_;
00618        ++i)
00619     this->lanes_[i]->shutting_down ();
00620 }
00621 
00622 
00623 void
00624 TAO_Thread_Pool::wait (void)
00625 {
00626   // Finalize all the lanes.
00627   for (CORBA::ULong i = 0;
00628        i != this->number_of_lanes_;
00629        ++i)
00630     this->lanes_[i]->wait ();
00631 }
00632 
00633 int
00634 TAO_Thread_Pool::is_collocated (const TAO_MProfile &mprofile)
00635 {
00636   // Finalize all the lanes.
00637   for (CORBA::ULong i = 0;
00638        i != this->number_of_lanes_;
00639        ++i)
00640     {
00641       int result =
00642         this->lanes_[i]->is_collocated (mprofile);
00643 
00644       if (result)
00645         return result;
00646     }
00647 
00648   return 0;
00649 }
00650 
00651 int
00652 TAO_Thread_Pool::create_static_threads (void)
00653 {
00654   for (CORBA::ULong i = 0;
00655        i != this->number_of_lanes_;
00656        ++i)
00657     {
00658       // Ask each lane to create its set of static threads.
00659       int const result = this->lanes_[i]->create_static_threads ();
00660 
00661       // Return on failure.
00662       if (result != 0)
00663         return result;
00664     }
00665 
00666   // Success.
00667   return 0;
00668 }
00669 
00670 #define TAO_THREAD_POOL_MANAGER_GUARD \
00671   ACE_GUARD_THROW_EX ( \
00672     TAO_SYNCH_MUTEX, \
00673     mon, \
00674     this->lock_, \
00675     CORBA::INTERNAL ( \
00676       CORBA::SystemException::_tao_minor_code ( \
00677         TAO_GUARD_FAILURE, \
00678         0), \
00679       CORBA::COMPLETED_NO));
00680 
00681 TAO_Thread_Pool_Manager::TAO_Thread_Pool_Manager (TAO_ORB_Core &orb_core)
00682   : orb_core_ (orb_core),
00683     thread_pools_ (),
00684     thread_pool_id_counter_ (1),
00685     lock_ ()
00686 {
00687 }
00688 
00689 TAO_Thread_Pool_Manager::~TAO_Thread_Pool_Manager (void)
00690 {
00691   // Delete all the pools.
00692   for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
00693        iterator != this->thread_pools_.end ();
00694        ++iterator)
00695     delete (*iterator).int_id_;
00696 }
00697 
00698 void
00699 TAO_Thread_Pool_Manager::finalize (void)
00700 {
00701   // Finalize all the pools.
00702   for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
00703        iterator != this->thread_pools_.end ();
00704        ++iterator)
00705     (*iterator).int_id_->finalize ();
00706 }
00707 
00708 void
00709 TAO_Thread_Pool_Manager::shutdown_reactor (void)
00710 {
00711   // Finalize all the pools.
00712   for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
00713        iterator != this->thread_pools_.end ();
00714        ++iterator)
00715     (*iterator).int_id_->shutdown_reactor ();
00716 }
00717 
00718 void
00719 TAO_Thread_Pool_Manager::wait (void)
00720 {
00721   // Finalize all the pools.
00722   for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
00723        iterator != this->thread_pools_.end ();
00724        ++iterator)
00725     (*iterator).int_id_->wait ();
00726 }
00727 
00728 int
00729 TAO_Thread_Pool_Manager::is_collocated (const TAO_MProfile &mprofile)
00730 {
00731   // Finalize all the pools.
00732   for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
00733        iterator != this->thread_pools_.end ();
00734        ++iterator)
00735     {
00736       int const result = (*iterator).int_id_->is_collocated (mprofile);
00737 
00738       if (result)
00739         return result;
00740     }
00741 
00742   return 0;
00743 }
00744 
00745 RTCORBA::ThreadpoolId
00746 TAO_Thread_Pool_Manager::create_threadpool (CORBA::ULong stacksize,
00747                                             CORBA::ULong static_threads,
00748                                             CORBA::ULong dynamic_threads,
00749                                             RTCORBA::Priority default_priority,
00750                                             CORBA::Boolean allow_request_buffering,
00751                                             CORBA::ULong max_buffered_requests,
00752                                             CORBA::ULong max_request_buffer_size,
00753                                             TAO_RT_ORBInitializer::TAO_RTCORBA_DT_LifeSpan lifespan,
00754                                             ACE_Time_Value const &dynamic_thread_time)
00755 {
00756   TAO_THREAD_POOL_MANAGER_GUARD;
00757 
00758   return this->create_threadpool_i (stacksize,
00759                                     static_threads,
00760                                     dynamic_threads,
00761                                     default_priority,
00762                                     allow_request_buffering,
00763                                     max_buffered_requests,
00764                                     max_request_buffer_size,
00765                                     lifespan,
00766                                     dynamic_thread_time);
00767 }
00768 
00769 RTCORBA::ThreadpoolId
00770 TAO_Thread_Pool_Manager::create_threadpool_with_lanes (CORBA::ULong stacksize,
00771                                                        const RTCORBA::ThreadpoolLanes & lanes,
00772                                                        CORBA::Boolean allow_borrowing,
00773                                                        CORBA::Boolean allow_request_buffering,
00774                                                        CORBA::ULong max_buffered_requests,
00775                                                        CORBA::ULong max_request_buffer_size,
00776                                                        TAO_RT_ORBInitializer::TAO_RTCORBA_DT_LifeSpan lifespan,
00777                                                        ACE_Time_Value const &dynamic_thread_time)
00778 {
00779   TAO_THREAD_POOL_MANAGER_GUARD;
00780 
00781   return this->create_threadpool_with_lanes_i (stacksize,
00782                                                lanes,
00783                                                allow_borrowing,
00784                                                allow_request_buffering,
00785                                                max_buffered_requests,
00786                                                max_request_buffer_size,
00787                                                lifespan,
00788                                                dynamic_thread_time);
00789 }
00790 
00791 void
00792 TAO_Thread_Pool_Manager::destroy_threadpool (RTCORBA::ThreadpoolId threadpool)
00793 {
00794   TAO_Thread_Pool *tao_thread_pool = 0;
00795 
00796   // The guard is just for the map, don't do a wait inside the guard, because
00797   // during the wait other threads can try to access the thread pool manager
00798   // also, this can be one of the threads we are waiting for, which then
00799   // results in a deadlock
00800   {
00801     TAO_THREAD_POOL_MANAGER_GUARD;
00802 
00803     // Unbind the thread pool from the map.
00804     int const result = this->thread_pools_.unbind (threadpool, tao_thread_pool);
00805 
00806     // If the thread pool is not found in our map.
00807     if (result != 0)
00808       throw RTCORBA::RTORB::InvalidThreadpool ();
00809   }
00810 
00811   // Mark the thread pool that we are shutting down.
00812   tao_thread_pool->shutting_down ();
00813 
00814   // Shutdown reactor.
00815   tao_thread_pool->shutdown_reactor ();
00816 
00817   // Wait for the threads.
00818   tao_thread_pool->wait ();
00819 
00820   // Finalize resources.
00821   tao_thread_pool->finalize ();
00822 
00823   // Delete the thread pool.
00824   delete tao_thread_pool;
00825 
00826 }
00827 
00828 RTCORBA::ThreadpoolId
00829 TAO_Thread_Pool_Manager::create_threadpool_i (CORBA::ULong stacksize,
00830                                               CORBA::ULong static_threads,
00831                                               CORBA::ULong dynamic_threads,
00832                                               RTCORBA::Priority default_priority,
00833                                               CORBA::Boolean allow_request_buffering,
00834                                               CORBA::ULong max_buffered_requests,
00835                                               CORBA::ULong max_request_buffer_size,
00836                                               TAO_RT_ORBInitializer::TAO_RTCORBA_DT_LifeSpan lifespan,
00837                                               ACE_Time_Value const &dynamic_thread_time)
00838 {
00839   // Create the thread pool.
00840   TAO_Thread_Pool *thread_pool = 0;
00841 
00842   ACE_NEW_THROW_EX (thread_pool,
00843                     TAO_Thread_Pool (*this,
00844                                      this->thread_pool_id_counter_,
00845                                      stacksize,
00846                                      static_threads,
00847                                      dynamic_threads,
00848                                      default_priority,
00849                                      allow_request_buffering,
00850                                      max_buffered_requests,
00851                                      max_request_buffer_size,
00852                                      lifespan,
00853                                      dynamic_thread_time
00854                                     ),
00855                     CORBA::NO_MEMORY ());
00856 
00857   return this->create_threadpool_helper (thread_pool);
00858 }
00859 
00860 RTCORBA::ThreadpoolId
00861 TAO_Thread_Pool_Manager::create_threadpool_with_lanes_i (CORBA::ULong stacksize,
00862                                                          const RTCORBA::ThreadpoolLanes &lanes,
00863                                                          CORBA::Boolean allow_borrowing,
00864                                                          CORBA::Boolean allow_request_buffering,
00865                                                          CORBA::ULong max_buffered_requests,
00866                                                          CORBA::ULong max_request_buffer_size,
00867                                                          TAO_RT_ORBInitializer::TAO_RTCORBA_DT_LifeSpan lifespan,
00868                                                          ACE_Time_Value const &dynamic_thread_time)
00869 {
00870   // Create the thread pool.
00871   TAO_Thread_Pool *thread_pool = 0;
00872 
00873   ACE_NEW_THROW_EX (thread_pool,
00874                     TAO_Thread_Pool (*this,
00875                                      this->thread_pool_id_counter_,
00876                                      stacksize,
00877                                      lanes,
00878                                      allow_borrowing,
00879                                      allow_request_buffering,
00880                                      max_buffered_requests,
00881                                      max_request_buffer_size,
00882                                      lifespan,
00883                                      dynamic_thread_time
00884                                     ),
00885                     CORBA::NO_MEMORY ());
00886 
00887   return this->create_threadpool_helper (thread_pool);
00888 }
00889 
00890 RTCORBA::ThreadpoolId
00891 TAO_Thread_Pool_Manager::create_threadpool_helper (TAO_Thread_Pool *thread_pool)
00892 {
00893   // Make sure of safe deletion in case of errors.
00894   auto_ptr<TAO_Thread_Pool> safe_thread_pool (thread_pool);
00895 
00896   // Open the pool.
00897   thread_pool->open ();
00898 
00899   // Create the static threads.
00900   int result = thread_pool->create_static_threads ();
00901 
00902   // Throw exception in case of errors.
00903   if (result != 0)
00904     {
00905       // Finalize thread pool related resources.
00906       thread_pool->finalize ();
00907 
00908       throw ::CORBA::INTERNAL (
00909         CORBA::SystemException::_tao_minor_code (
00910           TAO_RTCORBA_THREAD_CREATION_LOCATION_CODE,
00911           errno),
00912         CORBA::COMPLETED_NO);
00913     }
00914 
00915   // Bind thread to internal table.
00916   result =
00917     this->thread_pools_.bind (this->thread_pool_id_counter_,
00918                               thread_pool);
00919 
00920   // Throw exceptin in case of errors.
00921   if (result != 0)
00922     throw ::CORBA::INTERNAL ();
00923 
00924   //
00925   // Success.
00926   //
00927 
00928   // No need to delete thread pool.
00929   safe_thread_pool.release ();
00930 
00931   // Return current counter and perform post-increment.
00932   return this->thread_pool_id_counter_++;
00933 }
00934 
00935 TAO_Thread_Pool *
00936 TAO_Thread_Pool_Manager::get_threadpool (RTCORBA::ThreadpoolId thread_pool_id )
00937 {
00938   TAO_THREAD_POOL_MANAGER_GUARD;
00939 
00940   TAO_Thread_Pool *thread_pool = 0;
00941   int const result = thread_pools_.find (thread_pool_id, thread_pool);
00942 
00943   ACE_UNUSED_ARG (result);
00944 
00945   return thread_pool;
00946 }
00947 
00948 TAO_ORB_Core &
00949 TAO_Thread_Pool_Manager::orb_core (void) const
00950 {
00951   return this->orb_core_;
00952 }
00953 
00954 TAO_END_VERSIONED_NAMESPACE_DECL
00955 
00956 #endif /* TAO_HAS_CORBA_MESSAGING && TAO_HAS_CORBA_MESSAGING != 0 */

Generated on Tue Feb 2 17:42:49 2010 for TAO_RTCORBA by  doxygen 1.4.7