#include <CEC_Reactive_SupplierControl.h>
Inheritance diagram for TAO_CEC_Reactive_SupplierControl:
Public Member Functions | |
TAO_CEC_Reactive_SupplierControl (const ACE_Time_Value &rate, const ACE_Time_Value &timeout, unsigned int retries, TAO_CEC_EventChannel *event_channel, CORBA::ORB_ptr orb) | |
virtual | ~TAO_CEC_Reactive_SupplierControl (void) |
destructor... | |
void | handle_timeout (const ACE_Time_Value &tv, const void *arg) |
Receive the timeout from the adapter. | |
virtual int | activate (void) |
virtual int | shutdown (void) |
virtual void | supplier_not_exist (TAO_CEC_ProxyPushConsumer *proxy) |
virtual void | supplier_not_exist (TAO_CEC_ProxyPullConsumer *proxy) |
virtual void | system_exception (TAO_CEC_ProxyPullConsumer *proxy, CORBA::SystemException &) |
Some system exception was rasied while trying to push an event. | |
virtual bool | need_to_disconnect (PortableServer::ServantBase *proxy) |
virtual void | successful_transmission (PortableServer::ServantBase *proxy) |
Allow others to inform us when a send or receive was successful. | |
Private Member Functions | |
void | query_suppliers (void) |
Private Attributes | |
ACE_Time_Value | rate_ |
The polling rate. | |
ACE_Time_Value | timeout_ |
The polling timeout. | |
unsigned int | retries_ |
The number of retries per proxy until it is disconnected. | |
TAO_CEC_SupplierControl_Adapter | adapter_ |
The Adapter for the reactor events. | |
TAO_CEC_EventChannel * | event_channel_ |
The event channel. | |
CORBA::ORB_var | orb_ |
The ORB. | |
CORBA::PolicyCurrent_var | policy_current_ |
To control the timeout policy in the thread. | |
CORBA::PolicyList | policy_list_ |
Precomputed policy list to the set timeout. | |
ACE_Reactor * | reactor_ |
The ORB reactor. | |
long | timer_id_ |
The timer id. |
Defines the interface for the supplier control strategy. This strategy handles misbehaving or failing suppliers. = MEMORY MANAGMENT = LOCKING = TODO
Definition at line 79 of file CEC_Reactive_SupplierControl.h.
TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_CEC_Reactive_SupplierControl::TAO_CEC_Reactive_SupplierControl | ( | const ACE_Time_Value & | rate, | |
const ACE_Time_Value & | timeout, | |||
unsigned int | retries, | |||
TAO_CEC_EventChannel * | event_channel, | |||
CORBA::ORB_ptr | orb | |||
) |
Constructor. It does not assume ownership of the <event_channel> parameter.
Definition at line 42 of file CEC_Reactive_SupplierControl.cpp.
References orb_.
00047 : rate_ (rate), 00048 timeout_ (timeout), 00049 retries_ (retries), 00050 adapter_ (this), 00051 event_channel_ (ec), 00052 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00053 typed_event_channel_ (0), 00054 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00055 orb_ (CORBA::ORB::_duplicate (orb)) 00056 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00057 // Initialise timer_id_ to an invalid timer id, so that in case we don't 00058 // schedule a timer, we don't cancel a random timer at shutdown 00059 , timer_id_ (-1) 00060 #endif /* TAO_HAS_CORBA_MESSAGING */ 00061 { 00062 this->reactor_ = 00063 this->orb_->orb_core ()->reactor (); 00064 }
TAO_CEC_Reactive_SupplierControl::~TAO_CEC_Reactive_SupplierControl | ( | void | ) | [virtual] |
int TAO_CEC_Reactive_SupplierControl::activate | ( | void | ) | [virtual] |
Activate any internal threads or timers used to poll the state of the suppliers
Reimplemented from TAO_CEC_SupplierControl.
Definition at line 241 of file CEC_Reactive_SupplierControl.cpp.
References TAO_Pseudo_Var_T< T >::in(), orb_, policy_current_, policy_list_, reactor_, ACE_Reactor::schedule_timer(), timer_id_, and ACE_Time_Value::zero.
00242 { 00243 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00244 try 00245 { 00246 // Get the PolicyCurrent object 00247 CORBA::Object_var tmp = 00248 this->orb_->resolve_initial_references ("PolicyCurrent"); 00249 00250 this->policy_current_ = 00251 CORBA::PolicyCurrent::_narrow (tmp.in ()); 00252 00253 // Pre-compute the policy list to the set the right timeout 00254 // value... 00255 // We need to convert the relative timeout into 100's of nano seconds. 00256 TimeBase::TimeT timeout; 00257 ORBSVCS_Time::Time_Value_to_TimeT (timeout, 00258 this->timeout_); 00259 CORBA::Any any; 00260 any <<= timeout; 00261 00262 this->policy_list_.length (1); 00263 this->policy_list_[0] = 00264 this->orb_->create_policy ( 00265 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE, 00266 any); 00267 00268 // Only schedule the timer, when the rate is not zero 00269 if (this->rate_ != ACE_Time_Value::zero) 00270 { 00271 // Schedule the timer after these policies has been set, because the 00272 // handle_timeout uses these policies, if done in front, the channel 00273 // can crash when the timeout expires before initiazation is ready. 00274 timer_id_ = this->reactor_->schedule_timer (&this->adapter_, 00275 0, 00276 this->rate_, 00277 this->rate_); 00278 if (timer_id_ == -1) 00279 return -1; 00280 } 00281 } 00282 catch (const CORBA::Exception&) 00283 { 00284 return -1; 00285 } 00286 #endif /* TAO_HAS_CORBA_MESSAGING */ 00287 00288 return 0; 00289 }
void TAO_CEC_Reactive_SupplierControl::handle_timeout | ( | const ACE_Time_Value & | tv, | |
const void * | arg | |||
) |
Receive the timeout from the adapter.
Definition at line 201 of file CEC_Reactive_SupplierControl.cpp.
References policy_current_, and query_suppliers().
Referenced by TAO_CEC_SupplierControl_Adapter::handle_timeout().
00204 { 00205 try 00206 { 00207 // Query the state of the Current object *before* we initiate 00208 // the iteration... 00209 CORBA::PolicyTypeSeq types; 00210 CORBA::PolicyList_var policies = 00211 this->policy_current_->get_policy_overrides (types); 00212 00213 // Change the timeout 00214 this->policy_current_->set_policy_overrides (this->policy_list_, 00215 CORBA::ADD_OVERRIDE); 00216 00217 try 00218 { 00219 // Query the state of the suppliers... 00220 this->query_suppliers (); 00221 } 00222 catch (const CORBA::Exception&) 00223 { 00224 // Ignore all exceptions 00225 } 00226 00227 this->policy_current_->set_policy_overrides (policies.in (), 00228 CORBA::SET_OVERRIDE); 00229 for (CORBA::ULong i = 0; i != policies->length (); ++i) 00230 { 00231 policies[i]->destroy (); 00232 } 00233 } 00234 catch (const CORBA::Exception&) 00235 { 00236 // Ignore all exceptions 00237 } 00238 }
bool TAO_CEC_Reactive_SupplierControl::need_to_disconnect | ( | PortableServer::ServantBase * | proxy | ) | [virtual] |
Do we need to disconnect this supplier? The parameter type for proxy is PortableServer::ServantBase* due to the fact that this method will be used for TAO_CEC_ProxyPushSupplier's and TAO_CEC_ProxyPullSupplier's.
Reimplemented from TAO_CEC_SupplierControl.
Definition at line 124 of file CEC_Reactive_SupplierControl.cpp.
00126 { 00127 bool disconnect = true; 00128 00129 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00130 if (this->typed_event_channel_) 00131 { 00132 // Typed EC 00133 TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0; 00134 if (this->typed_event_channel_-> 00135 get_servant_retry_map ().find (proxy, entry) == 0) 00136 { 00137 ++entry->int_id_; 00138 if (entry->int_id_ <= this->retries_) 00139 { 00140 disconnect = false; 00141 } 00142 } 00143 } 00144 else 00145 { 00146 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00147 00148 // Un-typed EC 00149 TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0; 00150 if (this->event_channel_-> 00151 get_servant_retry_map ().find (proxy, entry) == 0) 00152 { 00153 ++entry->int_id_; 00154 if (entry->int_id_ <= this->retries_) 00155 { 00156 disconnect = false; 00157 } 00158 } 00159 00160 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00161 } 00162 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00163 00164 return disconnect; 00165 }
void TAO_CEC_Reactive_SupplierControl::query_suppliers | ( | void | ) | [private] |
Check if the suppliers still exists. It is a helper method for handle_timeout() to isolate the exceptions.
Definition at line 97 of file CEC_Reactive_SupplierControl.cpp.
References event_channel_, TAO_CEC_SupplierAdmin::for_each(), and TAO_CEC_EventChannel::supplier_admin().
Referenced by handle_timeout().
00098 { 00099 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00100 if (this->typed_event_channel_) 00101 { 00102 // Typed EC 00103 TAO_CEC_Ping_Typed_Push_Supplier push_worker (this); 00104 00105 this->typed_event_channel_->typed_supplier_admin ()->for_each (&push_worker); 00106 } 00107 else 00108 { 00109 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00110 00111 // Un-typed EC 00112 TAO_CEC_Ping_Push_Supplier push_worker (this); 00113 this->event_channel_->supplier_admin ()->for_each (&push_worker); 00114 00115 TAO_CEC_Ping_Pull_Supplier pull_worker (this); 00116 this->event_channel_->supplier_admin ()->for_each (&pull_worker); 00117 00118 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00119 } 00120 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00121 }
int TAO_CEC_Reactive_SupplierControl::shutdown | ( | void | ) | [virtual] |
Reimplemented from TAO_CEC_SupplierControl.
Definition at line 292 of file CEC_Reactive_SupplierControl.cpp.
References adapter_, ACE_Reactor::cancel_timer(), ACE_Event_Handler::reactor(), reactor_, and timer_id_.
00293 { 00294 int r = 0; 00295 00296 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00297 r = this->reactor_->cancel_timer (timer_id_); 00298 #endif /* TAO_HAS_CORBA_MESSAGING */ 00299 this->adapter_.reactor (0); 00300 return r; 00301 }
void TAO_CEC_Reactive_SupplierControl::successful_transmission | ( | PortableServer::ServantBase * | proxy | ) | [virtual] |
Allow others to inform us when a send or receive was successful.
Reimplemented from TAO_CEC_SupplierControl.
Definition at line 168 of file CEC_Reactive_SupplierControl.cpp.
00170 { 00171 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00172 if (this->typed_event_channel_) 00173 { 00174 // Typed EC 00175 TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0; 00176 if (this->typed_event_channel_-> 00177 get_servant_retry_map ().find (proxy, entry) == 0) 00178 { 00179 entry->int_id_ = 0; 00180 } 00181 } 00182 else 00183 { 00184 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00185 00186 // Un-typed EC 00187 TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0; 00188 if (this->event_channel_-> 00189 get_servant_retry_map ().find (proxy, entry) == 0) 00190 { 00191 entry->int_id_ = 0; 00192 } 00193 00194 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00195 } 00196 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00197 00198 }
void TAO_CEC_Reactive_SupplierControl::supplier_not_exist | ( | TAO_CEC_ProxyPullConsumer * | proxy | ) | [virtual] |
Invoked by helper classes when they detect that a supplier does not exists (i.e. _non_existent() returns true and/or the CORBA::OBJECT_NOT_EXIST exception has been raised).
Reimplemented from TAO_CEC_SupplierControl.
Definition at line 334 of file CEC_Reactive_SupplierControl.cpp.
References TAO_CEC_ProxyPullConsumer::disconnect_pull_consumer().
00336 { 00337 try 00338 { 00339 proxy->disconnect_pull_consumer (); 00340 } 00341 catch (const CORBA::Exception&) 00342 { 00343 // Ignore all exceptions.. 00344 } 00345 }
void TAO_CEC_Reactive_SupplierControl::supplier_not_exist | ( | TAO_CEC_ProxyPushConsumer * | proxy | ) | [virtual] |
Invoked by helper classes when they detect that a supplier does not exists (i.e. _non_existent() returns true and/or the CORBA::OBJECT_NOT_EXIST exception has been raised).
Reimplemented from TAO_CEC_SupplierControl.
Definition at line 304 of file CEC_Reactive_SupplierControl.cpp.
References TAO_CEC_ProxyPushConsumer::disconnect_push_consumer().
00306 { 00307 try 00308 { 00309 proxy->disconnect_push_consumer (); 00310 } 00311 catch (const CORBA::Exception&) 00312 { 00313 // Ignore all exceptions.. 00314 } 00315 }
void TAO_CEC_Reactive_SupplierControl::system_exception | ( | TAO_CEC_ProxyPullConsumer * | proxy, | |
CORBA::SystemException & | ||||
) | [virtual] |
Some system exception was rasied while trying to push an event.
Reimplemented from TAO_CEC_SupplierControl.
Definition at line 348 of file CEC_Reactive_SupplierControl.cpp.
References TAO_CEC_ProxyPullConsumer::disconnect_pull_consumer().
00351 { 00352 try 00353 { 00354 if (this->need_to_disconnect (proxy)) 00355 { 00356 proxy->disconnect_pull_consumer (); 00357 } 00358 } 00359 catch (const CORBA::Exception&) 00360 { 00361 // Ignore all exceptions.. 00362 } 00363 }
The Adapter for the reactor events.
Definition at line 145 of file CEC_Reactive_SupplierControl.h.
Referenced by shutdown().
The event channel.
Definition at line 148 of file CEC_Reactive_SupplierControl.h.
Referenced by query_suppliers().
CORBA::PolicyCurrent_var TAO_CEC_Reactive_SupplierControl::policy_current_ [private] |
To control the timeout policy in the thread.
Definition at line 159 of file CEC_Reactive_SupplierControl.h.
Referenced by activate(), and handle_timeout().
CORBA::PolicyList TAO_CEC_Reactive_SupplierControl::policy_list_ [private] |
Precomputed policy list to the set timeout.
Definition at line 162 of file CEC_Reactive_SupplierControl.h.
Referenced by activate().
The ORB reactor.
Definition at line 165 of file CEC_Reactive_SupplierControl.h.
Referenced by activate(), and shutdown().
unsigned int TAO_CEC_Reactive_SupplierControl::retries_ [private] |
The number of retries per proxy until it is disconnected.
Definition at line 142 of file CEC_Reactive_SupplierControl.h.
long TAO_CEC_Reactive_SupplierControl::timer_id_ [private] |
The timer id.
Definition at line 169 of file CEC_Reactive_SupplierControl.h.
Referenced by activate(), and shutdown().