#include <CEC_Reactive_ConsumerControl.h>
Inheritance diagram for TAO_CEC_Reactive_ConsumerControl:
Public Member Functions | |
TAO_CEC_Reactive_ConsumerControl (const ACE_Time_Value &rate, const ACE_Time_Value &timeout, unsigned int retries, TAO_CEC_EventChannel *event_channel, CORBA::ORB_ptr orb) | |
virtual | ~TAO_CEC_Reactive_ConsumerControl (void) |
destructor... | |
void | handle_timeout (const ACE_Time_Value &tv, const void *arg) |
Receive the timeout from the adapter. | |
virtual int | activate (void) |
virtual int | shutdown (void) |
virtual void | consumer_not_exist (TAO_CEC_ProxyPushSupplier *proxy) |
virtual void | consumer_not_exist (TAO_CEC_ProxyPullSupplier *proxy) |
virtual void | system_exception (TAO_CEC_ProxyPushSupplier *proxy, CORBA::SystemException &) |
Some system exception was rasied while trying to push an event. | |
virtual bool | need_to_disconnect (PortableServer::ServantBase *proxy) |
virtual void | successful_transmission (PortableServer::ServantBase *proxy) |
Allow others to inform us when a send or receive was successful. | |
Private Member Functions | |
void | query_consumers () |
Private Attributes | |
ACE_Time_Value | rate_ |
The polling rate. | |
ACE_Time_Value | timeout_ |
The polling timeout. | |
unsigned int | retries_ |
The number of retries per proxy until it is disconnected. | |
TAO_CEC_ConsumerControl_Adapter | adapter_ |
The Adapter for the reactor events. | |
TAO_CEC_EventChannel * | event_channel_ |
The event channel. | |
CORBA::ORB_var | orb_ |
The ORB. | |
CORBA::PolicyCurrent_var | policy_current_ |
To control the timeout policy in the thread. | |
CORBA::PolicyList | policy_list_ |
Precomputed policy list to the set timeout. | |
ACE_Reactor * | reactor_ |
The ORB reactor. | |
long | timer_id_ |
The timer id. |
Defines the interface for the consumer control strategy. This strategy handles misbehaving or failing consumers. = MEMORY MANAGMENT = LOCKING = TODO
Definition at line 81 of file CEC_Reactive_ConsumerControl.h.
|
Constructor. It does not assume ownership of the parameter. Definition at line 41 of file CEC_Reactive_ConsumerControl.cpp. References TAO_HAS_CORBA_MESSAGING.
00046 : rate_ (rate), 00047 timeout_ (timeout), 00048 retries_ (retries), 00049 adapter_ (this), 00050 event_channel_ (ec), 00051 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00052 typed_event_channel_ (0), 00053 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00054 orb_ (CORBA::ORB::_duplicate (orb)) 00055 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00056 // Initialise timer_id_ to an invalid timer id, so that in case we don't 00057 // schedule a timer, we don't cancel a random timer at shutdown 00058 , timer_id_ (-1) 00059 #endif /* TAO_HAS_CORBA_MESSAGING */ 00060 { 00061 this->reactor_ = 00062 this->orb_->orb_core ()->reactor (); 00063 } |
|
destructor...
Definition at line 91 of file CEC_Reactive_ConsumerControl.cpp.
00092 { 00093 } |
|
Activate any internal threads or timers used to poll the state of the consumers Reimplemented from TAO_CEC_ConsumerControl. Definition at line 257 of file CEC_Reactive_ConsumerControl.cpp. References CORBA::PolicyCurrent::_narrow(), ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_NEW_ENV, TAO::unbounded_object_reference_sequence< Policy, Policy_var >::length(), rate_, and ACE_Reactor::schedule_timer().
00258 { 00259 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00260 ACE_TRY_NEW_ENV 00261 { 00262 // Get the PolicyCurrent object 00263 CORBA::Object_var tmp = 00264 this->orb_->resolve_initial_references ("PolicyCurrent" 00265 ACE_ENV_ARG_PARAMETER); 00266 ACE_TRY_CHECK; 00267 00268 this->policy_current_ = 00269 CORBA::PolicyCurrent::_narrow (tmp.in () 00270 ACE_ENV_ARG_PARAMETER); 00271 ACE_TRY_CHECK; 00272 00273 // Pre-compute the policy list to the set the right timeout 00274 // value... 00275 // We need to convert the relative timeout into 100's of nano seconds. 00276 TimeBase::TimeT timeout; 00277 ORBSVCS_Time::Time_Value_to_TimeT (timeout, 00278 this->timeout_); 00279 CORBA::Any any; 00280 any <<= timeout; 00281 00282 this->policy_list_.length (1); 00283 this->policy_list_[0] = 00284 this->orb_->create_policy ( 00285 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE, 00286 any 00287 ACE_ENV_ARG_PARAMETER); 00288 ACE_TRY_CHECK; 00289 00290 // Only schedule the timer, when the rate is not zero 00291 if (this->rate_ != ACE_Time_Value::zero) 00292 { 00293 // Schedule the timer after these policies has been set, because the 00294 // handle_timeout uses these policies, if done in front, the channel 00295 // can crash when the timeout expires before initiazation is ready. 00296 timer_id_ = this->reactor_->schedule_timer (&this->adapter_, 00297 0, 00298 this->rate_, 00299 this->rate_); 00300 if (timer_id_ == -1) 00301 return -1; 00302 } 00303 } 00304 ACE_CATCHANY 00305 { 00306 return -1; 00307 } 00308 ACE_ENDTRY; 00309 #endif /* TAO_HAS_CORBA_MESSAGING */ 00310 00311 return 0; 00312 } |
|
Invoked by helper classes when they detect that a consumer no longer exists (i.e. _non_existent() returns true and/or the CORBA::OBJECT_NOT_EXIST exception has been raised). Reimplemented from TAO_CEC_ConsumerControl. Definition at line 352 of file CEC_Reactive_ConsumerControl.cpp. References ACE_ANY_EXCEPTION, ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_PRINT_EXCEPTION, ACE_TEXT(), ACE_TRY, and ACE_TRY_CHECK.
00355 { 00356 ACE_TRY 00357 { 00358 proxy->disconnect_pull_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); 00359 ACE_TRY_CHECK; 00360 } 00361 ACE_CATCHANY 00362 { 00363 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, 00364 ACE_TEXT ("Reactive_ConsumerControl::consumer_not_exist")); 00365 // Ignore all exceptions.. 00366 } 00367 ACE_ENDTRY; 00368 } |
|
When pushing an event to the consumer a CORBA::OBJECT_NOT_EXIST exception was raised. The only interpretation is that the object has been destroyed. The strategy has to (at the very least), reclaim all the resources attached to that object. Reimplemented from TAO_CEC_ConsumerControl. Definition at line 327 of file CEC_Reactive_ConsumerControl.cpp. References ACE_ANY_EXCEPTION, ACE_CATCHANY, ACE_DEBUG, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_PRINT_EXCEPTION, ACE_TEXT(), ACE_TRY, ACE_TRY_CHECK, LM_DEBUG, and TAO_debug_level.
00330 { 00331 ACE_TRY 00332 { 00333 proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); 00334 ACE_TRY_CHECK; 00335 00336 if (TAO_debug_level >= 10) 00337 { 00338 ACE_DEBUG ((LM_DEBUG, 00339 ACE_TEXT ("ProxyPushSupplier disconnected due to consumer_not_exist\n"))); 00340 } 00341 } 00342 ACE_CATCHANY 00343 { 00344 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, 00345 ACE_TEXT ("Reactive_ConsumerControl::consumer_not_exist")); 00346 // Ignore all exceptions.. 00347 } 00348 ACE_ENDTRY; 00349 } |
|
Receive the timeout from the adapter.
Definition at line 207 of file CEC_Reactive_ConsumerControl.cpp. References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_CHECK_EX, ACE_TRY_EX, ACE_TRY_NEW_ENV, and query_consumers(). Referenced by TAO_CEC_ConsumerControl_Adapter::handle_timeout().
00210 { 00211 ACE_TRY_NEW_ENV 00212 { 00213 // Query the state of the Current object *before* we initiate 00214 // the iteration... 00215 CORBA::PolicyTypeSeq types; 00216 CORBA::PolicyList_var policies = 00217 this->policy_current_->get_policy_overrides (types 00218 ACE_ENV_ARG_PARAMETER); 00219 ACE_TRY_CHECK; 00220 00221 // Change the timeout 00222 this->policy_current_->set_policy_overrides (this->policy_list_, 00223 CORBA::ADD_OVERRIDE 00224 ACE_ENV_ARG_PARAMETER); 00225 ACE_TRY_CHECK; 00226 00227 ACE_TRY_EX (query) 00228 { 00229 // Query the state of the consumers... 00230 this->query_consumers (ACE_ENV_SINGLE_ARG_PARAMETER); 00231 ACE_TRY_CHECK_EX (query); 00232 } 00233 ACE_CATCHANY 00234 { 00235 // Ignore all exceptions 00236 } 00237 ACE_ENDTRY; 00238 00239 this->policy_current_->set_policy_overrides (policies.in (), 00240 CORBA::SET_OVERRIDE 00241 ACE_ENV_ARG_PARAMETER); 00242 ACE_TRY_CHECK; 00243 for (CORBA::ULong i = 0; i != policies->length (); ++i) 00244 { 00245 policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); 00246 ACE_TRY_CHECK; 00247 } 00248 } 00249 ACE_CATCHANY 00250 { 00251 // Ignore all exceptions 00252 } 00253 ACE_ENDTRY; 00254 } |
|
Do we need to disconnect this supplier? The parameter type for proxy is PortableServer::ServantBase* due to the fact that this method will be used for TAO_CEC_ProxyPushSupplier's and TAO_CEC_ProxyPullSupplier's. Reimplemented from TAO_CEC_ConsumerControl. Definition at line 129 of file CEC_Reactive_ConsumerControl.cpp. References retries_. Referenced by system_exception().
00131 { 00132 bool disconnect = true; 00133 00134 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00135 if (this->typed_event_channel_) 00136 { 00137 // Typed EC 00138 TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0; 00139 if (this->typed_event_channel_-> 00140 get_servant_retry_map ().find (proxy, entry) == 0) 00141 { 00142 ++entry->int_id_; 00143 if (entry->int_id_ <= this->retries_) 00144 { 00145 disconnect = false; 00146 } 00147 } 00148 } 00149 else 00150 { 00151 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00152 00153 // Un-typed EC 00154 TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0; 00155 if (this->event_channel_-> 00156 get_servant_retry_map ().find (proxy, entry) == 0) 00157 { 00158 ++entry->int_id_; 00159 if (entry->int_id_ <= this->retries_) 00160 { 00161 disconnect = false; 00162 } 00163 } 00164 00165 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00166 } 00167 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00168 00169 return disconnect; 00170 } |
|
Check if the consumers still exists. It is a helper method for handle_timeout() to isolate the exceptions. Definition at line 96 of file CEC_Reactive_ConsumerControl.cpp. References ACE_CHECK, ACE_ENV_ARG_PARAMETER, TAO_CEC_EventChannel::consumer_admin(), and TAO_CEC_ConsumerAdmin::for_each(). Referenced by handle_timeout().
00098 { 00099 TAO_CEC_Ping_Push_Consumer push_worker (this); 00100 00101 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00102 if (this->typed_event_channel_) 00103 { 00104 // Typed EC 00105 this->typed_event_channel_->typed_consumer_admin ()->for_each (&push_worker 00106 ACE_ENV_ARG_PARAMETER); 00107 ACE_CHECK; 00108 } 00109 else 00110 { 00111 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00112 00113 // Un-typed EC 00114 this->event_channel_->consumer_admin ()->for_each (&push_worker 00115 ACE_ENV_ARG_PARAMETER); 00116 ACE_CHECK; 00117 00118 TAO_CEC_Ping_Pull_Consumer pull_worker (this); 00119 this->event_channel_->consumer_admin ()->for_each (&pull_worker 00120 ACE_ENV_ARG_PARAMETER); 00121 ACE_CHECK; 00122 00123 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00124 } 00125 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00126 } |
|
Reimplemented from TAO_CEC_ConsumerControl. Definition at line 315 of file CEC_Reactive_ConsumerControl.cpp. References ACE_Reactor::cancel_timer(), and ACE_Event_Handler::reactor().
00316 { 00317 int r = 0; 00318 00319 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00320 r = this->reactor_->cancel_timer (timer_id_); 00321 #endif /* TAO_HAS_CORBA_MESSAGING */ 00322 this->adapter_.reactor (0); 00323 return r; 00324 } |
|
Allow others to inform us when a send or receive was successful.
Reimplemented from TAO_CEC_ConsumerControl. Definition at line 173 of file CEC_Reactive_ConsumerControl.cpp.
00175 { 00176 00177 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00178 if (this->typed_event_channel_) 00179 { 00180 // Typed EC 00181 TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0; 00182 if (this->typed_event_channel_-> 00183 get_servant_retry_map ().find (proxy, entry) == 0) 00184 { 00185 entry->int_id_ = 0; 00186 } 00187 } 00188 else 00189 { 00190 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00191 00192 // Un-typed EC 00193 TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0; 00194 if (this->event_channel_-> 00195 get_servant_retry_map ().find (proxy, entry) == 0) 00196 { 00197 entry->int_id_ = 0; 00198 } 00199 00200 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00201 } 00202 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00203 00204 } |
|
Some system exception was rasied while trying to push an event.
Reimplemented from TAO_CEC_ConsumerControl. Definition at line 371 of file CEC_Reactive_ConsumerControl.cpp. References ACE_CATCHANY, ACE_DEBUG, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TEXT(), ACE_TRY, ACE_TRY_CHECK, LM_DEBUG, need_to_disconnect(), and TAO_debug_level.
00375 { 00376 ACE_TRY 00377 { 00378 if (this->need_to_disconnect (proxy)) 00379 { 00380 proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); 00381 ACE_TRY_CHECK; 00382 00383 if (TAO_debug_level >= 10) 00384 { 00385 ACE_DEBUG ((LM_DEBUG, 00386 ACE_TEXT ("ProxyPushSupplier disconnected due to consumer_not_exist\n"))); 00387 } 00388 } 00389 } 00390 ACE_CATCHANY 00391 { 00392 // Ignore all exceptions.. 00393 } 00394 ACE_ENDTRY; 00395 } |
|
The Adapter for the reactor events.
Definition at line 147 of file CEC_Reactive_ConsumerControl.h. |
|
The event channel.
Definition at line 150 of file CEC_Reactive_ConsumerControl.h. |
|
The ORB.
Definition at line 158 of file CEC_Reactive_ConsumerControl.h. |
|
To control the timeout policy in the thread.
Definition at line 161 of file CEC_Reactive_ConsumerControl.h. |
|
Precomputed policy list to the set timeout.
Definition at line 164 of file CEC_Reactive_ConsumerControl.h. |
|
The polling rate.
Definition at line 138 of file CEC_Reactive_ConsumerControl.h. Referenced by activate(). |
|
The ORB reactor.
Definition at line 167 of file CEC_Reactive_ConsumerControl.h. |
|
The number of retries per proxy until it is disconnected.
Definition at line 144 of file CEC_Reactive_ConsumerControl.h. Referenced by need_to_disconnect(). |
|
The polling timeout.
Definition at line 141 of file CEC_Reactive_ConsumerControl.h. |
|
The timer id.
Definition at line 171 of file CEC_Reactive_ConsumerControl.h. |