#include <CEC_Reactive_ConsumerControl.h>
Inheritance diagram for TAO_CEC_Reactive_ConsumerControl:
Public Member Functions | |
TAO_CEC_Reactive_ConsumerControl (const ACE_Time_Value &rate, const ACE_Time_Value &timeout, unsigned int retries, TAO_CEC_EventChannel *event_channel, CORBA::ORB_ptr orb) | |
virtual | ~TAO_CEC_Reactive_ConsumerControl (void) |
destructor... | |
void | handle_timeout (const ACE_Time_Value &tv, const void *arg) |
Receive the timeout from the adapter. | |
virtual int | activate (void) |
virtual int | shutdown (void) |
virtual void | consumer_not_exist (TAO_CEC_ProxyPushSupplier *proxy) |
virtual void | consumer_not_exist (TAO_CEC_ProxyPullSupplier *proxy) |
virtual void | system_exception (TAO_CEC_ProxyPushSupplier *proxy, CORBA::SystemException &) |
Some system exception was rasied while trying to push an event. | |
virtual bool | need_to_disconnect (PortableServer::ServantBase *proxy) |
virtual void | successful_transmission (PortableServer::ServantBase *proxy) |
Allow others to inform us when a send or receive was successful. | |
Private Member Functions | |
void | query_consumers (void) |
Private Attributes | |
ACE_Time_Value | rate_ |
The polling rate. | |
ACE_Time_Value | timeout_ |
The polling timeout. | |
unsigned int | retries_ |
The number of retries per proxy until it is disconnected. | |
TAO_CEC_ConsumerControl_Adapter | adapter_ |
The Adapter for the reactor events. | |
TAO_CEC_EventChannel * | event_channel_ |
The event channel. | |
CORBA::ORB_var | orb_ |
The ORB. | |
CORBA::PolicyCurrent_var | policy_current_ |
To control the timeout policy in the thread. | |
CORBA::PolicyList | policy_list_ |
Precomputed policy list to the set timeout. | |
ACE_Reactor * | reactor_ |
The ORB reactor. | |
long | timer_id_ |
The timer id. |
Defines the interface for the consumer control strategy. This strategy handles misbehaving or failing consumers. = MEMORY MANAGMENT = LOCKING = TODO
Definition at line 81 of file CEC_Reactive_ConsumerControl.h.
TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_CEC_Reactive_ConsumerControl::TAO_CEC_Reactive_ConsumerControl | ( | const ACE_Time_Value & | rate, | |
const ACE_Time_Value & | timeout, | |||
unsigned int | retries, | |||
TAO_CEC_EventChannel * | event_channel, | |||
CORBA::ORB_ptr | orb | |||
) |
Constructor. It does not assume ownership of the <event_channel> parameter.
Definition at line 41 of file CEC_Reactive_ConsumerControl.cpp.
References orb_.
00046 : rate_ (rate), 00047 timeout_ (timeout), 00048 retries_ (retries), 00049 adapter_ (this), 00050 event_channel_ (ec), 00051 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00052 typed_event_channel_ (0), 00053 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00054 orb_ (CORBA::ORB::_duplicate (orb)) 00055 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00056 // Initialise timer_id_ to an invalid timer id, so that in case we don't 00057 // schedule a timer, we don't cancel a random timer at shutdown 00058 , timer_id_ (-1) 00059 #endif /* TAO_HAS_CORBA_MESSAGING */ 00060 { 00061 this->reactor_ = 00062 this->orb_->orb_core ()->reactor (); 00063 }
TAO_CEC_Reactive_ConsumerControl::~TAO_CEC_Reactive_ConsumerControl | ( | void | ) | [virtual] |
int TAO_CEC_Reactive_ConsumerControl::activate | ( | void | ) | [virtual] |
Activate any internal threads or timers used to poll the state of the consumers
Reimplemented from TAO_CEC_ConsumerControl.
Definition at line 240 of file CEC_Reactive_ConsumerControl.cpp.
References TAO_Pseudo_Var_T< T >::in(), orb_, policy_current_, policy_list_, reactor_, ACE_Reactor::schedule_timer(), timer_id_, and ACE_Time_Value::zero.
00241 { 00242 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00243 try 00244 { 00245 // Get the PolicyCurrent object 00246 CORBA::Object_var tmp = 00247 this->orb_->resolve_initial_references ("PolicyCurrent"); 00248 00249 this->policy_current_ = 00250 CORBA::PolicyCurrent::_narrow (tmp.in ()); 00251 00252 // Pre-compute the policy list to the set the right timeout 00253 // value... 00254 // We need to convert the relative timeout into 100's of nano seconds. 00255 TimeBase::TimeT timeout; 00256 ORBSVCS_Time::Time_Value_to_TimeT (timeout, 00257 this->timeout_); 00258 CORBA::Any any; 00259 any <<= timeout; 00260 00261 this->policy_list_.length (1); 00262 this->policy_list_[0] = 00263 this->orb_->create_policy ( 00264 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE, 00265 any); 00266 00267 // Only schedule the timer, when the rate is not zero 00268 if (this->rate_ != ACE_Time_Value::zero) 00269 { 00270 // Schedule the timer after these policies has been set, because the 00271 // handle_timeout uses these policies, if done in front, the channel 00272 // can crash when the timeout expires before initiazation is ready. 00273 timer_id_ = this->reactor_->schedule_timer (&this->adapter_, 00274 0, 00275 this->rate_, 00276 this->rate_); 00277 if (timer_id_ == -1) 00278 return -1; 00279 } 00280 } 00281 catch (const CORBA::Exception&) 00282 { 00283 return -1; 00284 } 00285 #endif /* TAO_HAS_CORBA_MESSAGING */ 00286 00287 return 0; 00288 }
void TAO_CEC_Reactive_ConsumerControl::consumer_not_exist | ( | TAO_CEC_ProxyPullSupplier * | proxy | ) | [virtual] |
Invoked by helper classes when they detect that a consumer no longer exists (i.e. _non_existent() returns true and/or the CORBA::OBJECT_NOT_EXIST exception has been raised).
Reimplemented from TAO_CEC_ConsumerControl.
Definition at line 325 of file CEC_Reactive_ConsumerControl.cpp.
References CORBA::Exception::_tao_print_exception(), ACE_TEXT(), and TAO_CEC_ProxyPullSupplier::disconnect_pull_supplier().
00327 { 00328 try 00329 { 00330 proxy->disconnect_pull_supplier (); 00331 } 00332 catch (const CORBA::Exception& ex) 00333 { 00334 ex._tao_print_exception ( 00335 ACE_TEXT ("Reactive_ConsumerControl::consumer_not_exist")); 00336 // Ignore all exceptions.. 00337 } 00338 }
void TAO_CEC_Reactive_ConsumerControl::consumer_not_exist | ( | TAO_CEC_ProxyPushSupplier * | proxy | ) | [virtual] |
When pushing an event to the consumer a CORBA::OBJECT_NOT_EXIST exception was raised. The only interpretation is that the object has been destroyed. The strategy has to (at the very least), reclaim all the resources attached to that object.
Reimplemented from TAO_CEC_ConsumerControl.
Definition at line 303 of file CEC_Reactive_ConsumerControl.cpp.
References CORBA::Exception::_tao_print_exception(), ACE_DEBUG, ACE_TEXT(), TAO_CEC_ProxyPushSupplier::disconnect_push_supplier(), LM_DEBUG, and TAO_debug_level.
00305 { 00306 try 00307 { 00308 proxy->disconnect_push_supplier (); 00309 00310 if (TAO_debug_level >= 10) 00311 { 00312 ACE_DEBUG ((LM_DEBUG, 00313 ACE_TEXT ("ProxyPushSupplier disconnected due to consumer_not_exist\n"))); 00314 } 00315 } 00316 catch (const CORBA::Exception& ex) 00317 { 00318 ex._tao_print_exception ( 00319 ACE_TEXT ("Reactive_ConsumerControl::consumer_not_exist")); 00320 // Ignore all exceptions.. 00321 } 00322 }
void TAO_CEC_Reactive_ConsumerControl::handle_timeout | ( | const ACE_Time_Value & | tv, | |
const void * | arg | |||
) |
Receive the timeout from the adapter.
Definition at line 200 of file CEC_Reactive_ConsumerControl.cpp.
References policy_current_, and query_consumers().
Referenced by TAO_CEC_ConsumerControl_Adapter::handle_timeout().
00203 { 00204 try 00205 { 00206 // Query the state of the Current object *before* we initiate 00207 // the iteration... 00208 CORBA::PolicyTypeSeq types; 00209 CORBA::PolicyList_var policies = 00210 this->policy_current_->get_policy_overrides (types); 00211 00212 // Change the timeout 00213 this->policy_current_->set_policy_overrides (this->policy_list_, 00214 CORBA::ADD_OVERRIDE); 00215 00216 try 00217 { 00218 // Query the state of the consumers... 00219 this->query_consumers (); 00220 } 00221 catch (const CORBA::Exception&) 00222 { 00223 // Ignore all exceptions 00224 } 00225 00226 this->policy_current_->set_policy_overrides (policies.in (), 00227 CORBA::SET_OVERRIDE); 00228 for (CORBA::ULong i = 0; i != policies->length (); ++i) 00229 { 00230 policies[i]->destroy (); 00231 } 00232 } 00233 catch (const CORBA::Exception&) 00234 { 00235 // Ignore all exceptions 00236 } 00237 }
bool TAO_CEC_Reactive_ConsumerControl::need_to_disconnect | ( | PortableServer::ServantBase * | proxy | ) | [virtual] |
Do we need to disconnect this supplier? The parameter type for proxy is PortableServer::ServantBase* due to the fact that this method will be used for TAO_CEC_ProxyPushSupplier's and TAO_CEC_ProxyPullSupplier's.
Reimplemented from TAO_CEC_ConsumerControl.
Definition at line 122 of file CEC_Reactive_ConsumerControl.cpp.
00124 { 00125 bool disconnect = true; 00126 00127 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00128 if (this->typed_event_channel_) 00129 { 00130 // Typed EC 00131 TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0; 00132 if (this->typed_event_channel_-> 00133 get_servant_retry_map ().find (proxy, entry) == 0) 00134 { 00135 ++entry->int_id_; 00136 if (entry->int_id_ <= this->retries_) 00137 { 00138 disconnect = false; 00139 } 00140 } 00141 } 00142 else 00143 { 00144 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00145 00146 // Un-typed EC 00147 TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0; 00148 if (this->event_channel_-> 00149 get_servant_retry_map ().find (proxy, entry) == 0) 00150 { 00151 ++entry->int_id_; 00152 if (entry->int_id_ <= this->retries_) 00153 { 00154 disconnect = false; 00155 } 00156 } 00157 00158 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00159 } 00160 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00161 00162 return disconnect; 00163 }
void TAO_CEC_Reactive_ConsumerControl::query_consumers | ( | void | ) | [private] |
Check if the consumers still exists. It is a helper method for handle_timeout() to isolate the exceptions.
Definition at line 96 of file CEC_Reactive_ConsumerControl.cpp.
References TAO_CEC_EventChannel::consumer_admin(), event_channel_, and TAO_CEC_ConsumerAdmin::for_each().
Referenced by handle_timeout().
00097 { 00098 TAO_CEC_Ping_Push_Consumer push_worker (this); 00099 00100 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00101 if (this->typed_event_channel_) 00102 { 00103 // Typed EC 00104 this->typed_event_channel_->typed_consumer_admin ()->for_each (&push_worker); 00105 } 00106 else 00107 { 00108 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00109 00110 // Un-typed EC 00111 this->event_channel_->consumer_admin ()->for_each (&push_worker); 00112 00113 TAO_CEC_Ping_Pull_Consumer pull_worker (this); 00114 this->event_channel_->consumer_admin ()->for_each (&pull_worker); 00115 00116 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00117 } 00118 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00119 }
int TAO_CEC_Reactive_ConsumerControl::shutdown | ( | void | ) | [virtual] |
Reimplemented from TAO_CEC_ConsumerControl.
Definition at line 291 of file CEC_Reactive_ConsumerControl.cpp.
References adapter_, ACE_Reactor::cancel_timer(), ACE_Event_Handler::reactor(), reactor_, and timer_id_.
00292 { 00293 int r = 0; 00294 00295 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00296 r = this->reactor_->cancel_timer (timer_id_); 00297 #endif /* TAO_HAS_CORBA_MESSAGING */ 00298 this->adapter_.reactor (0); 00299 return r; 00300 }
void TAO_CEC_Reactive_ConsumerControl::successful_transmission | ( | PortableServer::ServantBase * | proxy | ) | [virtual] |
Allow others to inform us when a send or receive was successful.
Reimplemented from TAO_CEC_ConsumerControl.
Definition at line 166 of file CEC_Reactive_ConsumerControl.cpp.
00168 { 00169 00170 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00171 if (this->typed_event_channel_) 00172 { 00173 // Typed EC 00174 TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0; 00175 if (this->typed_event_channel_-> 00176 get_servant_retry_map ().find (proxy, entry) == 0) 00177 { 00178 entry->int_id_ = 0; 00179 } 00180 } 00181 else 00182 { 00183 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00184 00185 // Un-typed EC 00186 TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0; 00187 if (this->event_channel_-> 00188 get_servant_retry_map ().find (proxy, entry) == 0) 00189 { 00190 entry->int_id_ = 0; 00191 } 00192 00193 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00194 } 00195 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00196 00197 }
void TAO_CEC_Reactive_ConsumerControl::system_exception | ( | TAO_CEC_ProxyPushSupplier * | proxy, | |
CORBA::SystemException & | ||||
) | [virtual] |
Some system exception was rasied while trying to push an event.
Reimplemented from TAO_CEC_ConsumerControl.
Definition at line 341 of file CEC_Reactive_ConsumerControl.cpp.
References ACE_DEBUG, ACE_TEXT(), TAO_CEC_ProxyPushSupplier::disconnect_push_supplier(), LM_DEBUG, and TAO_debug_level.
00344 { 00345 try 00346 { 00347 if (this->need_to_disconnect (proxy)) 00348 { 00349 proxy->disconnect_push_supplier (); 00350 00351 if (TAO_debug_level >= 10) 00352 { 00353 ACE_DEBUG ((LM_DEBUG, 00354 ACE_TEXT ("ProxyPushSupplier disconnected due to consumer_not_exist\n"))); 00355 } 00356 } 00357 } 00358 catch (const CORBA::Exception&) 00359 { 00360 // Ignore all exceptions.. 00361 } 00362 }
The Adapter for the reactor events.
Definition at line 144 of file CEC_Reactive_ConsumerControl.h.
Referenced by shutdown().
The event channel.
Definition at line 147 of file CEC_Reactive_ConsumerControl.h.
Referenced by query_consumers().
CORBA::PolicyCurrent_var TAO_CEC_Reactive_ConsumerControl::policy_current_ [private] |
To control the timeout policy in the thread.
Definition at line 158 of file CEC_Reactive_ConsumerControl.h.
Referenced by activate(), and handle_timeout().
CORBA::PolicyList TAO_CEC_Reactive_ConsumerControl::policy_list_ [private] |
Precomputed policy list to the set timeout.
Definition at line 161 of file CEC_Reactive_ConsumerControl.h.
Referenced by activate().
The ORB reactor.
Definition at line 164 of file CEC_Reactive_ConsumerControl.h.
Referenced by activate(), and shutdown().
unsigned int TAO_CEC_Reactive_ConsumerControl::retries_ [private] |
The number of retries per proxy until it is disconnected.
Definition at line 141 of file CEC_Reactive_ConsumerControl.h.
long TAO_CEC_Reactive_ConsumerControl::timer_id_ [private] |
The timer id.
Definition at line 168 of file CEC_Reactive_ConsumerControl.h.
Referenced by activate(), and shutdown().