#include <CEC_Reactive_SupplierControl.h>
Inheritance diagram for TAO_CEC_Reactive_SupplierControl:
Public Member Functions | |
TAO_CEC_Reactive_SupplierControl (const ACE_Time_Value &rate, const ACE_Time_Value &timeout, unsigned int retries, TAO_CEC_EventChannel *event_channel, CORBA::ORB_ptr orb) | |
virtual | ~TAO_CEC_Reactive_SupplierControl (void) |
destructor... | |
void | handle_timeout (const ACE_Time_Value &tv, const void *arg) |
Receive the timeout from the adapter. | |
virtual int | activate (void) |
virtual int | shutdown (void) |
virtual void | supplier_not_exist (TAO_CEC_ProxyPushConsumer *proxy) |
virtual void | supplier_not_exist (TAO_CEC_ProxyPullConsumer *proxy) |
virtual void | system_exception (TAO_CEC_ProxyPullConsumer *proxy, CORBA::SystemException &) |
Some system exception was rasied while trying to push an event. | |
virtual bool | need_to_disconnect (PortableServer::ServantBase *proxy) |
virtual void | successful_transmission (PortableServer::ServantBase *proxy) |
Allow others to inform us when a send or receive was successful. | |
Private Member Functions | |
void | query_suppliers () |
Private Attributes | |
ACE_Time_Value | rate_ |
The polling rate. | |
ACE_Time_Value | timeout_ |
The polling timeout. | |
unsigned int | retries_ |
The number of retries per proxy until it is disconnected. | |
TAO_CEC_SupplierControl_Adapter | adapter_ |
The Adapter for the reactor events. | |
TAO_CEC_EventChannel * | event_channel_ |
The event channel. | |
CORBA::ORB_var | orb_ |
The ORB. | |
CORBA::PolicyCurrent_var | policy_current_ |
To control the timeout policy in the thread. | |
CORBA::PolicyList | policy_list_ |
Precomputed policy list to the set timeout. | |
ACE_Reactor * | reactor_ |
The ORB reactor. | |
long | timer_id_ |
The timer id. |
Defines the interface for the supplier control strategy. This strategy handles misbehaving or failing suppliers. = MEMORY MANAGMENT = LOCKING = TODO
Definition at line 79 of file CEC_Reactive_SupplierControl.h.
|
Constructor. It does not assume ownership of the parameter. Definition at line 42 of file CEC_Reactive_SupplierControl.cpp. References TAO_HAS_CORBA_MESSAGING.
00047 : rate_ (rate), 00048 timeout_ (timeout), 00049 retries_ (retries), 00050 adapter_ (this), 00051 event_channel_ (ec), 00052 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00053 typed_event_channel_ (0), 00054 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00055 orb_ (CORBA::ORB::_duplicate (orb)) 00056 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00057 // Initialise timer_id_ to an invalid timer id, so that in case we don't 00058 // schedule a timer, we don't cancel a random timer at shutdown 00059 , timer_id_ (-1) 00060 #endif /* TAO_HAS_CORBA_MESSAGING */ 00061 { 00062 this->reactor_ = 00063 this->orb_->orb_core ()->reactor (); 00064 } |
|
destructor...
Definition at line 92 of file CEC_Reactive_SupplierControl.cpp.
00093 { 00094 } |
|
Activate any internal threads or timers used to poll the state of the suppliers Reimplemented from TAO_CEC_SupplierControl. Definition at line 258 of file CEC_Reactive_SupplierControl.cpp. References CORBA::PolicyCurrent::_narrow(), ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_NEW_ENV, TAO::unbounded_object_reference_sequence< Policy, Policy_var >::length(), and ACE_Reactor::schedule_timer().
00259 { 00260 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00261 ACE_TRY_NEW_ENV 00262 { 00263 // Get the PolicyCurrent object 00264 CORBA::Object_var tmp = 00265 this->orb_->resolve_initial_references ("PolicyCurrent" 00266 ACE_ENV_ARG_PARAMETER); 00267 ACE_TRY_CHECK; 00268 00269 this->policy_current_ = 00270 CORBA::PolicyCurrent::_narrow (tmp.in () 00271 ACE_ENV_ARG_PARAMETER); 00272 ACE_TRY_CHECK; 00273 00274 // Pre-compute the policy list to the set the right timeout 00275 // value... 00276 // We need to convert the relative timeout into 100's of nano seconds. 00277 TimeBase::TimeT timeout; 00278 ORBSVCS_Time::Time_Value_to_TimeT (timeout, 00279 this->timeout_); 00280 CORBA::Any any; 00281 any <<= timeout; 00282 00283 this->policy_list_.length (1); 00284 this->policy_list_[0] = 00285 this->orb_->create_policy ( 00286 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE, 00287 any 00288 ACE_ENV_ARG_PARAMETER); 00289 ACE_TRY_CHECK; 00290 00291 // Only schedule the timer, when the rate is not zero 00292 if (this->rate_ != ACE_Time_Value::zero) 00293 { 00294 // Schedule the timer after these policies has been set, because the 00295 // handle_timeout uses these policies, if done in front, the channel 00296 // can crash when the timeout expires before initiazation is ready. 00297 timer_id_ = this->reactor_->schedule_timer (&this->adapter_, 00298 0, 00299 this->rate_, 00300 this->rate_); 00301 if (timer_id_ == -1) 00302 return -1; 00303 } 00304 } 00305 ACE_CATCHANY 00306 { 00307 return -1; 00308 } 00309 ACE_ENDTRY; 00310 #endif /* TAO_HAS_CORBA_MESSAGING */ 00311 00312 return 0; 00313 } |
|
Receive the timeout from the adapter.
Definition at line 208 of file CEC_Reactive_SupplierControl.cpp. References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_CHECK_EX, ACE_TRY_EX, ACE_TRY_NEW_ENV, and query_suppliers(). Referenced by TAO_CEC_SupplierControl_Adapter::handle_timeout().
00211 { 00212 ACE_TRY_NEW_ENV 00213 { 00214 // Query the state of the Current object *before* we initiate 00215 // the iteration... 00216 CORBA::PolicyTypeSeq types; 00217 CORBA::PolicyList_var policies = 00218 this->policy_current_->get_policy_overrides (types 00219 ACE_ENV_ARG_PARAMETER); 00220 ACE_TRY_CHECK; 00221 00222 // Change the timeout 00223 this->policy_current_->set_policy_overrides (this->policy_list_, 00224 CORBA::ADD_OVERRIDE 00225 ACE_ENV_ARG_PARAMETER); 00226 ACE_TRY_CHECK; 00227 00228 ACE_TRY_EX (query) 00229 { 00230 // Query the state of the suppliers... 00231 this->query_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER); 00232 ACE_TRY_CHECK_EX (query); 00233 } 00234 ACE_CATCHANY 00235 { 00236 // Ignore all exceptions 00237 } 00238 ACE_ENDTRY; 00239 00240 this->policy_current_->set_policy_overrides (policies.in (), 00241 CORBA::SET_OVERRIDE 00242 ACE_ENV_ARG_PARAMETER); 00243 ACE_TRY_CHECK; 00244 for (CORBA::ULong i = 0; i != policies->length (); ++i) 00245 { 00246 policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); 00247 ACE_TRY_CHECK; 00248 } 00249 } 00250 ACE_CATCHANY 00251 { 00252 // Ignore all exceptions 00253 } 00254 ACE_ENDTRY; 00255 } |
|
Do we need to disconnect this supplier? The parameter type for proxy is PortableServer::ServantBase* due to the fact that this method will be used for TAO_CEC_ProxyPushSupplier's and TAO_CEC_ProxyPullSupplier's. Reimplemented from TAO_CEC_SupplierControl. Definition at line 131 of file CEC_Reactive_SupplierControl.cpp. Referenced by system_exception().
00133 { 00134 bool disconnect = true; 00135 00136 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00137 if (this->typed_event_channel_) 00138 { 00139 // Typed EC 00140 TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0; 00141 if (this->typed_event_channel_-> 00142 get_servant_retry_map ().find (proxy, entry) == 0) 00143 { 00144 ++entry->int_id_; 00145 if (entry->int_id_ <= this->retries_) 00146 { 00147 disconnect = false; 00148 } 00149 } 00150 } 00151 else 00152 { 00153 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00154 00155 // Un-typed EC 00156 TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0; 00157 if (this->event_channel_-> 00158 get_servant_retry_map ().find (proxy, entry) == 0) 00159 { 00160 ++entry->int_id_; 00161 if (entry->int_id_ <= this->retries_) 00162 { 00163 disconnect = false; 00164 } 00165 } 00166 00167 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00168 } 00169 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00170 00171 return disconnect; 00172 } |
|
Check if the suppliers still exists. It is a helper method for handle_timeout() to isolate the exceptions. Definition at line 97 of file CEC_Reactive_SupplierControl.cpp. References ACE_CHECK, ACE_ENV_ARG_PARAMETER, TAO_CEC_SupplierAdmin::for_each(), and TAO_CEC_EventChannel::supplier_admin(). Referenced by handle_timeout().
00099 { 00100 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00101 if (this->typed_event_channel_) 00102 { 00103 // Typed EC 00104 TAO_CEC_Ping_Typed_Push_Supplier push_worker (this); 00105 00106 this->typed_event_channel_->typed_supplier_admin ()->for_each (&push_worker 00107 ACE_ENV_ARG_PARAMETER); 00108 ACE_CHECK; 00109 } 00110 else 00111 { 00112 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00113 00114 // Un-typed EC 00115 TAO_CEC_Ping_Push_Supplier push_worker (this); 00116 this->event_channel_->supplier_admin ()->for_each (&push_worker 00117 ACE_ENV_ARG_PARAMETER); 00118 ACE_CHECK; 00119 00120 TAO_CEC_Ping_Pull_Supplier pull_worker (this); 00121 this->event_channel_->supplier_admin ()->for_each (&pull_worker 00122 ACE_ENV_ARG_PARAMETER); 00123 ACE_CHECK; 00124 00125 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00126 } 00127 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00128 } |
|
Reimplemented from TAO_CEC_SupplierControl. Definition at line 316 of file CEC_Reactive_SupplierControl.cpp. References ACE_Reactor::cancel_timer(), and ACE_Event_Handler::reactor().
00317 { 00318 int r = 0; 00319 00320 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00321 r = this->reactor_->cancel_timer (timer_id_); 00322 #endif /* TAO_HAS_CORBA_MESSAGING */ 00323 this->adapter_.reactor (0); 00324 return r; 00325 } |
|
Allow others to inform us when a send or receive was successful.
Reimplemented from TAO_CEC_SupplierControl. Definition at line 175 of file CEC_Reactive_SupplierControl.cpp.
00177 { 00178 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00179 if (this->typed_event_channel_) 00180 { 00181 // Typed EC 00182 TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0; 00183 if (this->typed_event_channel_-> 00184 get_servant_retry_map ().find (proxy, entry) == 0) 00185 { 00186 entry->int_id_ = 0; 00187 } 00188 } 00189 else 00190 { 00191 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00192 00193 // Un-typed EC 00194 TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0; 00195 if (this->event_channel_-> 00196 get_servant_retry_map ().find (proxy, entry) == 0) 00197 { 00198 entry->int_id_ = 0; 00199 } 00200 00201 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00202 } 00203 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00204 00205 } |
|
Invoked by helper classes when they detect that a supplier does not exists (i.e. _non_existent() returns true and/or the CORBA::OBJECT_NOT_EXIST exception has been raised). Reimplemented from TAO_CEC_SupplierControl. Definition at line 364 of file CEC_Reactive_SupplierControl.cpp. References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, and ACE_TRY_CHECK.
00367 { 00368 ACE_TRY 00369 { 00370 proxy->disconnect_pull_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); 00371 ACE_TRY_CHECK; 00372 } 00373 ACE_CATCHANY 00374 { 00375 // Ignore all exceptions.. 00376 } 00377 ACE_ENDTRY; 00378 } |
|
Invoked by helper classes when they detect that a supplier does not exists (i.e. _non_existent() returns true and/or the CORBA::OBJECT_NOT_EXIST exception has been raised). Reimplemented from TAO_CEC_SupplierControl. Definition at line 328 of file CEC_Reactive_SupplierControl.cpp. References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, and ACE_TRY_CHECK.
00331 { 00332 ACE_TRY 00333 { 00334 proxy->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); 00335 ACE_TRY_CHECK; 00336 } 00337 ACE_CATCHANY 00338 { 00339 // Ignore all exceptions.. 00340 } 00341 ACE_ENDTRY; 00342 } |
|
Some system exception was rasied while trying to push an event.
Reimplemented from TAO_CEC_SupplierControl. Definition at line 381 of file CEC_Reactive_SupplierControl.cpp. References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, and need_to_disconnect().
00385 { 00386 ACE_TRY 00387 { 00388 if (this->need_to_disconnect (proxy)) 00389 { 00390 proxy->disconnect_pull_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); 00391 ACE_TRY_CHECK; 00392 } 00393 } 00394 ACE_CATCHANY 00395 { 00396 // Ignore all exceptions.. 00397 } 00398 ACE_ENDTRY; 00399 } |
|
The Adapter for the reactor events.
Definition at line 149 of file CEC_Reactive_SupplierControl.h. |
|
The event channel.
Definition at line 152 of file CEC_Reactive_SupplierControl.h. |
|
The ORB.
Definition at line 160 of file CEC_Reactive_SupplierControl.h. |
|
To control the timeout policy in the thread.
Definition at line 163 of file CEC_Reactive_SupplierControl.h. |
|
Precomputed policy list to the set timeout.
Definition at line 166 of file CEC_Reactive_SupplierControl.h. |
|
The polling rate.
Definition at line 140 of file CEC_Reactive_SupplierControl.h. |
|
The ORB reactor.
Definition at line 169 of file CEC_Reactive_SupplierControl.h. |
|
The number of retries per proxy until it is disconnected.
Definition at line 146 of file CEC_Reactive_SupplierControl.h. |
|
The polling timeout.
Definition at line 143 of file CEC_Reactive_SupplierControl.h. |
|
The timer id.
Definition at line 173 of file CEC_Reactive_SupplierControl.h. |