#include <ThreadPool_Task.h>
Inheritance diagram for TAO_Notify_ThreadPool_Task:
Public Member Functions | |
TAO_Notify_ThreadPool_Task (void) | |
Constuctor. | |
virtual | ~TAO_Notify_ThreadPool_Task () |
Destructor. | |
virtual int | init (int argc, ACE_TCHAR **argv) |
Call the base class init. | |
virtual int | close (u_long flags) |
release reference to my self. | |
void | init (const NotifyExt::ThreadPoolParams &tp_params, const TAO_Notify_AdminProperties::Ptr &admin_properties) |
Activate the threadpool. | |
virtual void | execute (TAO_Notify_Method_Request &method_request) |
Queue the request. | |
virtual void | shutdown () |
Shutdown task. | |
virtual void | update_qos_properties (const TAO_Notify_QoSProperties &qos_properties) |
Update QoS Properties. | |
virtual TAO_Notify_Timer * | timer (void) |
The object used by clients to register timers. | |
Protected Member Functions | |
virtual int | svc (void) |
Task svc. | |
Private Member Functions | |
virtual void | release (void) |
Release. | |
Private Attributes | |
ACE_Auto_Ptr< TAO_Notify_Buffering_Strategy > | buffering_strategy_ |
The buffering strategy to use. | |
bool | shutdown_ |
Shutdown. | |
TAO_Notify_Timer_Queue::Ptr | timer_ |
The Queue based timer. | |
Friends | |
class | TAO_Notify_Method_Request_Shutdown |
Definition at line 41 of file ThreadPool_Task.h.
|
Constuctor.
Definition at line 18 of file ThreadPool_Task.cpp.
00019 : shutdown_ (false) 00020 { 00021 } |
|
Destructor.
Definition at line 23 of file ThreadPool_Task.cpp.
00024 { 00025 } |
|
release reference to my self.
Reimplemented from ACE_Task_Base. Definition at line 187 of file ThreadPool_Task.cpp. References TAO_Notify_Refcountable::_decr_refcnt().
00188 { 00189 // _incr_refcnt() for each spawned thread in init() 00190 this->_decr_refcnt(); 00191 return 0; 00192 } |
|
Queue the request.
Implements TAO_Notify_Worker_Task. Definition at line 102 of file ThreadPool_Task.cpp. References ACE_CHECK, ACE_DEBUG, ACE_ENV_SINGLE_ARG_PARAMETER, buffering_strategy_, LM_DEBUG, and TAO_debug_level.
00103 { 00104 if (!shutdown_) 00105 { 00106 TAO_Notify_Method_Request_Queueable* request_copy = method_request.copy (ACE_ENV_SINGLE_ARG_PARAMETER); 00107 ACE_CHECK; 00108 00109 if (this->buffering_strategy_->enqueue (request_copy) == -1) 00110 { 00111 if (TAO_debug_level > 0) 00112 ACE_DEBUG ((LM_DEBUG, "NS_ThreadPool_Task (%P|%t) - " 00113 "failed to enqueue\n")); 00114 } 00115 } 00116 } |
|
Activate the threadpool.
Definition at line 40 of file ThreadPool_Task.cpp. References TAO_Notify_Refcountable::_decr_refcnt(), TAO_Notify_Refcountable::_incr_refcnt(), ACE_ASSERT, ACE_CHECK, ACE_DEBUG, ACE_NEW_THROW_EX, ACE_TEXT(), ACE_THR_PRI_OTHER_DEF, ACE_THROW, buffering_strategy_, EPERM, TAO_Singleton< TYPE, ACE_LOCK >::instance(), ACE_OS::last_error(), LM_DEBUG, LM_ERROR, TAO_Notify_AdminProperties::Ptr, ACE_Auto_Basic_Ptr< X >::reset(), and TAO_debug_level.
00042 { 00043 ACE_ASSERT (this->timer_.get() == 0); 00044 00045 TAO_Notify_Timer_Queue* timer = 0; 00046 ACE_NEW_THROW_EX (timer, 00047 TAO_Notify_Timer_Queue (), 00048 CORBA::NO_MEMORY ()); 00049 ACE_CHECK; 00050 this->timer_.reset (timer); 00051 00052 00053 TAO_Notify_Buffering_Strategy* buffering_strategy = 0; 00054 ACE_NEW_THROW_EX (buffering_strategy, 00055 TAO_Notify_Buffering_Strategy (*msg_queue (), admin_properties), 00056 CORBA::NO_MEMORY ()); 00057 this->buffering_strategy_.reset (buffering_strategy); 00058 ACE_CHECK; 00059 00060 long flags = THR_NEW_LWP | THR_DETACHED; 00061 CORBA::ORB_var orb = 00062 TAO_Notify_PROPERTIES::instance()->orb (); 00063 00064 flags |= 00065 orb->orb_core ()->orb_params ()->thread_creation_flags (); 00066 00067 // Guards the thread for auto-deletion; paired with close. 00068 // This is done in the originating thread before the spawn to 00069 // avoid any race conditions. 00070 for ( CORBA::ULong i = 0; i < tp_params.static_threads; ++i ) 00071 { 00072 this->_incr_refcnt(); 00073 } 00074 00075 // Become an active object. 00076 if (this->ACE_Task <ACE_NULL_SYNCH>::activate (flags, 00077 tp_params.static_threads, 00078 0, 00079 ACE_THR_PRI_OTHER_DEF) == -1) 00080 { 00081 // Undo the ref counts on error 00082 for ( CORBA::ULong i = 0; i < tp_params.static_threads; ++i ) 00083 { 00084 this->_decr_refcnt(); 00085 } 00086 00087 if (TAO_debug_level > 0) 00088 { 00089 if (ACE_OS::last_error () == EPERM) 00090 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Insufficient privilege.\n"))); 00091 else 00092 ACE_DEBUG ((LM_ERROR, 00093 ACE_TEXT ("(%t) task activation at priority %d failed\n") 00094 ACE_TEXT ("exiting!\n%a"), 00095 tp_params.default_priority)); 00096 } 00097 ACE_THROW (CORBA::BAD_PARAM ()); 00098 } 00099 } |
|
Call the base class init.
Definition at line 28 of file ThreadPool_Task.cpp. References ACE_Shared_Object::init(). Referenced by TAO_Notify_Builder::apply_thread_pool_concurrency().
00029 { 00030 return this->ACE_Task<ACE_NULL_SYNCH>::init (argc, argv); 00031 } |
|
Release.
Implements TAO_Notify_Refcountable. Definition at line 181 of file ThreadPool_Task.cpp.
00182 { 00183 delete this; 00184 } |
|
Shutdown task.
Implements TAO_Notify_Worker_Task. Definition at line 168 of file ThreadPool_Task.cpp. References buffering_strategy_.
00169 { 00170 if (this->shutdown_) 00171 { 00172 return; 00173 } 00174 00175 this->shutdown_ = true; 00176 00177 this->buffering_strategy_->shutdown (); 00178 } |
|
Task svc.
Reimplemented from ACE_Task_Base. Definition at line 119 of file ThreadPool_Task.cpp. References ACE_ANY_EXCEPTION, ACE_CATCHANY, ACE_DEBUG, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_PRINT_EXCEPTION, ACE_TRY_CHECK, ACE_TRY_NEW_ENV, buffering_strategy_, ETIME, TAO_Notify_Method_Request::execute(), LM_DEBUG, ACE_Message_Block::release(), and TAO_debug_level.
00120 { 00121 TAO_Notify_Method_Request_Queueable* method_request; 00122 00123 while (!shutdown_) 00124 { 00125 ACE_TRY_NEW_ENV 00126 { 00127 ACE_Time_Value* dequeue_blocking_time = 0; 00128 ACE_Time_Value earliest_time; 00129 00130 if (!this->timer_->impl().is_empty ()) 00131 { 00132 earliest_time = this->timer_->impl().earliest_time (); 00133 dequeue_blocking_time = &earliest_time; 00134 } 00135 00136 // Dequeue 1 item 00137 int result = buffering_strategy_->dequeue (method_request, dequeue_blocking_time); 00138 00139 if (result > 0) 00140 { 00141 method_request->execute (ACE_ENV_SINGLE_ARG_PARAMETER); 00142 ACE_TRY_CHECK; 00143 00144 ACE_Message_Block::release (method_request); 00145 } 00146 else if (errno == ETIME) 00147 { 00148 this->timer_->impl ().expire (); 00149 } 00150 else 00151 { 00152 if (TAO_debug_level > 0) 00153 ACE_DEBUG ((LM_DEBUG, "ThreadPool_Task dequeue failed\n")); 00154 } 00155 } 00156 ACE_CATCHANY 00157 { 00158 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, 00159 "ThreadPool_Task (%P|%t) exception in method request\n"); 00160 } 00161 ACE_ENDTRY; 00162 } /* while */ 00163 00164 return 0; 00165 } |
|
The object used by clients to register timers.
Implements TAO_Notify_Worker_Task. Definition at line 34 of file ThreadPool_Task.cpp.
00035 { 00036 return this->timer_.get(); 00037 } |
|
Update QoS Properties.
Reimplemented from TAO_Notify_Worker_Task. Definition at line 195 of file ThreadPool_Task.cpp. References buffering_strategy_.
00196 { 00197 this->buffering_strategy_->update_qos_properties (qos_properties); 00198 } |
|
Definition at line 45 of file ThreadPool_Task.h. |
|
The buffering strategy to use.
Definition at line 84 of file ThreadPool_Task.h. Referenced by execute(), init(), shutdown(), svc(), and update_qos_properties(). |
|
Shutdown.
Definition at line 87 of file ThreadPool_Task.h. |
|
The Queue based timer.
Definition at line 90 of file ThreadPool_Task.h. |