ThreadPool_Task.cpp

Go to the documentation of this file.
00001 // ThreadPool_Task.cpp,v 1.28 2006/03/14 06:14:34 jtc Exp
00002 
00003 #include "orbsvcs/Notify/ThreadPool_Task.h"
00004 
00005 ACE_RCSID(Notify, TAO_Notify_ThreadPool_Task, "ThreadPool_Task.cpp,v 1.28 2006/03/14 06:14:34 jtc Exp")
00006 
00007 #include "orbsvcs/Notify/Properties.h"
00008 #include "orbsvcs/Notify/Timer_Queue.h"
00009 #include "orbsvcs/Notify/Buffering_Strategy.h"
00010 
00011 #include "tao/debug.h"
00012 #include "tao/ORB_Core.h"
00013 
00014 #include "ace/OS_NS_errno.h"
00015 
00016 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00017 
00018 TAO_Notify_ThreadPool_Task::TAO_Notify_ThreadPool_Task (void)
00019 : shutdown_ (false)
00020 {
00021 }
00022 
00023 TAO_Notify_ThreadPool_Task::~TAO_Notify_ThreadPool_Task ()
00024 {
00025 }
00026 
00027 int
00028 TAO_Notify_ThreadPool_Task::init (int argc, ACE_TCHAR **argv)
00029 {
00030   return this->ACE_Task<ACE_NULL_SYNCH>::init (argc, argv);
00031 }
00032 
00033 TAO_Notify_Timer*
00034 TAO_Notify_ThreadPool_Task::timer (void)
00035 {
00036   return this->timer_.get();
00037 }
00038 
00039 void
00040 TAO_Notify_ThreadPool_Task::init (const NotifyExt::ThreadPoolParams& tp_params,
00041                                   const TAO_Notify_AdminProperties::Ptr& admin_properties  ACE_ENV_ARG_DECL)
00042 {
00043   ACE_ASSERT (this->timer_.get() == 0);
00044 
00045   TAO_Notify_Timer_Queue* timer = 0;
00046   ACE_NEW_THROW_EX (timer,
00047     TAO_Notify_Timer_Queue (),
00048     CORBA::NO_MEMORY ());
00049   ACE_CHECK;
00050   this->timer_.reset (timer);
00051 
00052 
00053   TAO_Notify_Buffering_Strategy* buffering_strategy = 0;
00054   ACE_NEW_THROW_EX (buffering_strategy,
00055     TAO_Notify_Buffering_Strategy (*msg_queue (), admin_properties),
00056     CORBA::NO_MEMORY ());
00057   this->buffering_strategy_.reset (buffering_strategy);
00058   ACE_CHECK;
00059 
00060   long flags = THR_NEW_LWP | THR_DETACHED;
00061   CORBA::ORB_var orb =
00062     TAO_Notify_PROPERTIES::instance()->orb ();
00063 
00064   flags |=
00065     orb->orb_core ()->orb_params ()->thread_creation_flags ();
00066 
00067   // Guards the thread for auto-deletion; paired with close.
00068   // This is done in the originating thread before the spawn to
00069   // avoid any race conditions.
00070   for ( CORBA::ULong i = 0; i < tp_params.static_threads; ++i )
00071   {
00072     this->_incr_refcnt();
00073   }
00074 
00075   // Become an active object.
00076   if (this->ACE_Task <ACE_NULL_SYNCH>::activate (flags,
00077     tp_params.static_threads,
00078     0,
00079     ACE_THR_PRI_OTHER_DEF) == -1)
00080   {
00081     // Undo the ref counts on error
00082     for ( CORBA::ULong i = 0; i < tp_params.static_threads; ++i )
00083     {
00084       this->_decr_refcnt();
00085     }
00086 
00087     if (TAO_debug_level > 0)
00088     {
00089       if (ACE_OS::last_error () == EPERM)
00090         ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Insufficient privilege.\n")));
00091       else
00092         ACE_DEBUG ((LM_ERROR,
00093         ACE_TEXT ("(%t) task activation at priority %d failed\n")
00094         ACE_TEXT ("exiting!\n%a"),
00095         tp_params.default_priority));
00096     }
00097     ACE_THROW (CORBA::BAD_PARAM ());
00098   }
00099 }
00100 
00101 void
00102 TAO_Notify_ThreadPool_Task::execute (TAO_Notify_Method_Request& method_request ACE_ENV_ARG_DECL)
00103 {
00104   if (!shutdown_)
00105   {
00106     TAO_Notify_Method_Request_Queueable* request_copy = method_request.copy (ACE_ENV_SINGLE_ARG_PARAMETER);
00107     ACE_CHECK;
00108 
00109     if (this->buffering_strategy_->enqueue (request_copy) == -1)
00110     {
00111       if (TAO_debug_level > 0)
00112         ACE_DEBUG ((LM_DEBUG, "NS_ThreadPool_Task (%P|%t) - "
00113         "failed to enqueue\n"));
00114     }
00115   }
00116 }
00117 
00118 int
00119 TAO_Notify_ThreadPool_Task::svc (void)
00120 {
00121   TAO_Notify_Method_Request_Queueable* method_request;
00122 
00123   while (!shutdown_)
00124   {
00125     ACE_TRY_NEW_ENV
00126     {
00127       ACE_Time_Value* dequeue_blocking_time = 0;
00128       ACE_Time_Value earliest_time;
00129 
00130       if (!this->timer_->impl().is_empty ())
00131       {
00132         earliest_time = this->timer_->impl().earliest_time ();
00133         dequeue_blocking_time = &earliest_time;
00134       }
00135 
00136       // Dequeue 1 item
00137       int result = buffering_strategy_->dequeue (method_request, dequeue_blocking_time);
00138 
00139       if (result > 0)
00140       {
00141         method_request->execute (ACE_ENV_SINGLE_ARG_PARAMETER);
00142         ACE_TRY_CHECK;
00143 
00144         ACE_Message_Block::release (method_request);
00145       }
00146       else if (errno == ETIME)
00147       {
00148         this->timer_->impl ().expire ();
00149       }
00150       else
00151       {
00152         if (TAO_debug_level > 0)
00153           ACE_DEBUG ((LM_DEBUG, "ThreadPool_Task dequeue failed\n"));
00154       }
00155     }
00156     ACE_CATCHANY
00157     {
00158       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00159         "ThreadPool_Task (%P|%t) exception in method request\n");
00160     }
00161     ACE_ENDTRY;
00162   } /* while */
00163 
00164   return 0;
00165 }
00166 
00167 void
00168 TAO_Notify_ThreadPool_Task::shutdown (void)
00169 {
00170   if (this->shutdown_)
00171   {
00172     return;
00173   }
00174 
00175   this->shutdown_ = true;
00176 
00177   this->buffering_strategy_->shutdown ();
00178 }
00179 
00180 void
00181 TAO_Notify_ThreadPool_Task::release (void)
00182 {
00183   delete this;
00184 }
00185 
00186 int
00187 TAO_Notify_ThreadPool_Task::close (u_long)
00188 {
00189   // _incr_refcnt() for each spawned thread in init()
00190   this->_decr_refcnt();
00191   return 0;
00192 }
00193 
00194 void
00195 TAO_Notify_ThreadPool_Task::update_qos_properties (const TAO_Notify_QoSProperties& qos_properties)
00196 {
00197   this->buffering_strategy_->update_qos_properties (qos_properties);
00198 }
00199 
00200 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:24:17 2006 for TAO_CosNotification by doxygen 1.3.6