Go to the documentation of this file.00001
00002
00003 #include "orbsvcs/Notify/ThreadPool_Task.h"
00004
00005 ACE_RCSID(Notify, TAO_Notify_ThreadPool_Task, "$Id: ThreadPool_Task.cpp 84685 2009-03-02 22:49:17Z mesnier_p $")
00006
00007 #include "orbsvcs/Notify/Properties.h"
00008 #include "orbsvcs/Notify/Timer_Queue.h"
00009 #include "orbsvcs/Notify/Buffering_Strategy.h"
00010
00011 #include "tao/debug.h"
00012 #include "tao/ORB_Core.h"
00013
00014 #include "ace/OS_NS_errno.h"
00015
00016 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00017
00018 TAO_Notify_ThreadPool_Task::TAO_Notify_ThreadPool_Task (void)
00019 : shutdown_ (false)
00020 {
00021 }
00022
00023 TAO_Notify_ThreadPool_Task::~TAO_Notify_ThreadPool_Task ()
00024 {
00025 }
00026
00027 int
00028 TAO_Notify_ThreadPool_Task::init (int argc, ACE_TCHAR **argv)
00029 {
00030 return this->ACE_Task<ACE_NULL_SYNCH>::init (argc, argv);
00031 }
00032
00033 TAO_Notify_Timer*
00034 TAO_Notify_ThreadPool_Task::timer (void)
00035 {
00036 return this->timer_.get();
00037 }
00038
00039 TAO_Notify_Buffering_Strategy*
00040 TAO_Notify_ThreadPool_Task::buffering_strategy (void)
00041 {
00042 return this->buffering_strategy_.get ();
00043 }
00044
00045 void
00046 TAO_Notify_ThreadPool_Task::init (const NotifyExt::ThreadPoolParams& tp_params,
00047 const TAO_Notify_AdminProperties::Ptr& admin_properties)
00048 {
00049 ACE_ASSERT (this->timer_.get() == 0);
00050
00051 TAO_Notify_Timer_Queue* timer = 0;
00052 ACE_NEW_THROW_EX (timer,
00053 TAO_Notify_Timer_Queue (),
00054 CORBA::NO_MEMORY ());
00055 this->timer_.reset (timer);
00056
00057 TAO_Notify_Buffering_Strategy* buffering_strategy = 0;
00058 ACE_NEW_THROW_EX (buffering_strategy,
00059 TAO_Notify_Buffering_Strategy (*msg_queue (), admin_properties),
00060 CORBA::NO_MEMORY ());
00061 this->buffering_strategy_.reset (buffering_strategy);
00062
00063 long flags = THR_NEW_LWP | THR_DETACHED;
00064 CORBA::ORB_var orb =
00065 TAO_Notify_PROPERTIES::instance()->orb ();
00066
00067 flags |=
00068 orb->orb_core ()->orb_params ()->thread_creation_flags ();
00069
00070
00071
00072
00073 for ( CORBA::ULong i = 0; i < tp_params.static_threads; ++i )
00074 {
00075 this->_incr_refcnt();
00076 }
00077
00078
00079 if (this->ACE_Task <ACE_NULL_SYNCH>::activate (flags,
00080 tp_params.static_threads,
00081 0,
00082 ACE_THR_PRI_OTHER_DEF) == -1)
00083 {
00084
00085 for ( CORBA::ULong i = 0; i < tp_params.static_threads; ++i )
00086 {
00087 this->_decr_refcnt();
00088 }
00089
00090 if (ACE_OS::last_error () == EPERM)
00091 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Insufficient privilege.\n")));
00092 else if (ACE_OS::last_error () == EAGAIN)
00093 {
00094 ACE_DEBUG ((LM_DEBUG,
00095 ACE_TEXT ("(%P|%t) task activation at priority %d failed %p\n"),
00096 tp_params.default_priority, "activate"));
00097 throw CORBA::NO_RESOURCES ();
00098 }
00099
00100 throw CORBA::BAD_PARAM ();
00101 }
00102 }
00103
00104 void
00105 TAO_Notify_ThreadPool_Task::execute (TAO_Notify_Method_Request& method_request)
00106 {
00107 if (!shutdown_)
00108 {
00109 TAO_Notify_Method_Request_Queueable* request_copy = method_request.copy ();
00110
00111 if (this->buffering_strategy_->enqueue (request_copy) == -1)
00112 {
00113 if (TAO_debug_level > 0)
00114 ACE_DEBUG ((LM_DEBUG, "NS_ThreadPool_Task (%P|%t) - "
00115 "failed to enqueue\n"));
00116 }
00117 }
00118 }
00119
00120 int
00121 TAO_Notify_ThreadPool_Task::svc (void)
00122 {
00123 TAO_Notify_Method_Request_Queueable* method_request = 0;
00124
00125 while (!shutdown_)
00126 {
00127 try
00128 {
00129 ACE_Time_Value* dequeue_blocking_time = 0;
00130 ACE_Time_Value earliest_time;
00131
00132 if (!this->timer_->impl().is_empty ())
00133 {
00134 earliest_time = this->timer_->impl().earliest_time ();
00135 dequeue_blocking_time = &earliest_time;
00136 }
00137
00138
00139 int const result = buffering_strategy_->dequeue (method_request, dequeue_blocking_time);
00140
00141 if (result > 0)
00142 {
00143 method_request->execute ();
00144
00145 ACE_Message_Block::release (method_request);
00146 }
00147 else if (errno == ETIME)
00148 {
00149 this->timer_->impl ().expire ();
00150 }
00151 else
00152 {
00153 if (TAO_debug_level > 0)
00154 ACE_DEBUG ((LM_DEBUG, "(%P|%t)ThreadPool_Task dequeue failed\n"));
00155 }
00156 }
00157 catch (const CORBA::Exception& ex)
00158 {
00159 ex._tao_print_exception (
00160 "ThreadPool_Task (%P|%t) exception in method request\n");
00161 }
00162 }
00163
00164 return 0;
00165 }
00166
00167 void
00168 TAO_Notify_ThreadPool_Task::shutdown (void)
00169 {
00170 if (this->shutdown_)
00171 {
00172 return;
00173 }
00174
00175 this->shutdown_ = true;
00176
00177 this->buffering_strategy_->shutdown ();
00178 }
00179
00180 void
00181 TAO_Notify_ThreadPool_Task::release (void)
00182 {
00183 delete this;
00184 }
00185
00186 int
00187 TAO_Notify_ThreadPool_Task::close (u_long)
00188 {
00189
00190 this->_decr_refcnt();
00191 return 0;
00192 }
00193
00194 void
00195 TAO_Notify_ThreadPool_Task::update_qos_properties (const TAO_Notify_QoSProperties& qos_properties)
00196 {
00197 this->buffering_strategy_->update_qos_properties (qos_properties);
00198 }
00199
00200 TAO_END_VERSIONED_NAMESPACE_DECL