#include <CEC_Reactive_Pulling_Strategy.h>
Inheritance diagram for TAO_CEC_Reactive_Pulling_Strategy:
Public Member Functions | |
TAO_CEC_Reactive_Pulling_Strategy (const ACE_Time_Value &rate, const ACE_Time_Value &relative_timeout, TAO_CEC_EventChannel *event_channel, CORBA::ORB_ptr orb) | |
void | handle_timeout (const ACE_Time_Value &tv, const void *arg) |
Receive the timeout from the adapter. | |
virtual void | activate (void) |
virtual void | shutdown (void) |
Private Attributes | |
TAO_CEC_Pulling_Strategy_Adapter | adapter_ |
The Adapter for the reactor events. | |
ACE_Time_Value | rate_ |
The polling rate. | |
ACE_Time_Value | relative_timeout_ |
The relative timeout. | |
TAO_CEC_EventChannel * | event_channel_ |
The event channel. | |
CORBA::ORB_var | orb_ |
The ORB. | |
CORBA::PolicyCurrent_var | policy_current_ |
To control the timeout policy in the thread. | |
CORBA::PolicyList | policy_list_ |
Precomputed policy list to the set timeout. | |
ACE_Reactor * | reactor_ |
The ORB reactor. | |
long | timer_id_ |
The timer id. |
The events are dispatched in FIFO ordering, using the invoking thread to push the event to the consumer.
Definition at line 75 of file CEC_Reactive_Pulling_Strategy.h.
|
The scheduler is used to find the range of priorities and similar info. Definition at line 29 of file CEC_Reactive_Pulling_Strategy.cpp. References TAO_HAS_CORBA_MESSAGING.
00033 : adapter_ (this), 00034 rate_ (rate), 00035 relative_timeout_ (relative_timeout), 00036 event_channel_ (event_channel), 00037 orb_ (CORBA::ORB::_duplicate (orb)) 00038 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00039 // Initialise timer_id_ to an invalid timer id, so that in case we don't 00040 // schedule a timer, we don't cancel a random timer at shutdown 00041 , timer_id_ (-1) 00042 #endif /* TAO_HAS_CORBA_MESSAGING */ 00043 { 00044 this->reactor_ = 00045 this->orb_->orb_core ()->reactor (); 00046 } |
|
Initialize all the data structures, activate any internal threads, etc. Implements TAO_CEC_Pulling_Strategy. Definition at line 102 of file CEC_Reactive_Pulling_Strategy.cpp. References CORBA::PolicyCurrent::_narrow(), ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_NEW_ENV, TAO::unbounded_object_reference_sequence< Policy, Policy_var >::length(), and ACE_Reactor::schedule_timer().
00103 { 00104 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00105 timer_id_ = this->reactor_->schedule_timer (&this->adapter_, 00106 0, 00107 this->rate_, 00108 this->rate_); 00109 if (timer_id_ == -1) 00110 return; 00111 00112 ACE_TRY_NEW_ENV 00113 { 00114 // Get the PolicyCurrent object 00115 CORBA::Object_var tmp = 00116 this->orb_->resolve_initial_references ("PolicyCurrent" 00117 ACE_ENV_ARG_PARAMETER); 00118 ACE_TRY_CHECK; 00119 00120 this->policy_current_ = 00121 CORBA::PolicyCurrent::_narrow (tmp.in () 00122 ACE_ENV_ARG_PARAMETER); 00123 ACE_TRY_CHECK; 00124 00125 // Pre-compute the policy list to the set the right timeout 00126 // value... 00127 // We need to convert the relative timeout into 100's of nano seconds. 00128 TimeBase::TimeT timeout; 00129 ORBSVCS_Time::Time_Value_to_TimeT (timeout, 00130 this->relative_timeout_); 00131 CORBA::Any any; 00132 any <<= timeout; 00133 00134 this->policy_list_.length (1); 00135 this->policy_list_[0] = 00136 this->orb_->create_policy ( 00137 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE, 00138 any 00139 ACE_ENV_ARG_PARAMETER); 00140 ACE_TRY_CHECK; 00141 } 00142 ACE_CATCHANY 00143 { 00144 } 00145 ACE_ENDTRY; 00146 #endif /* TAO_HAS_CORBA_MESSAGING */ 00147 } |
|
Receive the timeout from the adapter.
Definition at line 49 of file CEC_Reactive_Pulling_Strategy.cpp. References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_CHECK_EX, ACE_TRY_EX, ACE_TRY_NEW_ENV, TAO_CEC_SupplierAdmin::for_each(), TAO_CEC_EventChannel::supplier_admin(), and TAO_CEC_EventChannel::supplier_control(). Referenced by TAO_CEC_Pulling_Strategy_Adapter::handle_timeout().
00052 { 00053 ACE_TRY_NEW_ENV 00054 { 00055 // Query the state of the Current object *before* we initiate 00056 // the iteration... 00057 CORBA::PolicyTypeSeq types; 00058 CORBA::PolicyList_var policies = 00059 this->policy_current_->get_policy_overrides (types 00060 ACE_ENV_ARG_PARAMETER); 00061 ACE_TRY_CHECK; 00062 00063 // Change the timeout 00064 this->policy_current_->set_policy_overrides (this->policy_list_, 00065 CORBA::ADD_OVERRIDE 00066 ACE_ENV_ARG_PARAMETER); 00067 ACE_TRY_CHECK; 00068 00069 ACE_TRY_EX (query) 00070 { 00071 TAO_CEC_Pull_Event worker (this->event_channel_->consumer_admin (), 00072 this->event_channel_->supplier_control ()); 00073 00074 this->event_channel_->supplier_admin ()->for_each (&worker 00075 ACE_ENV_ARG_PARAMETER); 00076 ACE_TRY_CHECK_EX (query); 00077 } 00078 ACE_CATCHANY 00079 { 00080 // Ignore all exceptions 00081 } 00082 ACE_ENDTRY; 00083 00084 this->policy_current_->set_policy_overrides (policies.in (), 00085 CORBA::SET_OVERRIDE 00086 ACE_ENV_ARG_PARAMETER); 00087 ACE_TRY_CHECK; 00088 for (CORBA::ULong i = 0; i != policies->length (); ++i) 00089 { 00090 policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); 00091 ACE_TRY_CHECK; 00092 } 00093 } 00094 ACE_CATCHANY 00095 { 00096 // Ignore all exceptions 00097 } 00098 ACE_ENDTRY; 00099 } |
|
Deactivate any internal threads and cleanup internal data structures, it should only return once the threads have finished their jobs. Implements TAO_CEC_Pulling_Strategy. Definition at line 150 of file CEC_Reactive_Pulling_Strategy.cpp. References ACE_Reactor::cancel_timer(), and ACE_Event_Handler::reactor().
00151 { 00152 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00153 this->reactor_->cancel_timer (timer_id_); 00154 #endif /* TAO_HAS_CORBA_MESSAGING */ 00155 this->adapter_.reactor (0); 00156 } |
|
The Adapter for the reactor events.
Definition at line 95 of file CEC_Reactive_Pulling_Strategy.h. |
|
The event channel.
Definition at line 104 of file CEC_Reactive_Pulling_Strategy.h. |
|
The ORB.
Definition at line 107 of file CEC_Reactive_Pulling_Strategy.h. |
|
To control the timeout policy in the thread.
Definition at line 110 of file CEC_Reactive_Pulling_Strategy.h. |
|
Precomputed policy list to the set timeout.
Definition at line 113 of file CEC_Reactive_Pulling_Strategy.h. |
|
The polling rate.
Definition at line 98 of file CEC_Reactive_Pulling_Strategy.h. |
|
The ORB reactor.
Definition at line 116 of file CEC_Reactive_Pulling_Strategy.h. |
|
The relative timeout.
Definition at line 101 of file CEC_Reactive_Pulling_Strategy.h. |
|
The timer id.
Definition at line 120 of file CEC_Reactive_Pulling_Strategy.h. |