#include <CEC_Reactive_SupplierControl.h>
Inheritance diagram for TAO_CEC_Reactive_SupplierControl:


Public Member Functions | |
| TAO_CEC_Reactive_SupplierControl (const ACE_Time_Value &rate, const ACE_Time_Value &timeout, unsigned int retries, TAO_CEC_EventChannel *event_channel, CORBA::ORB_ptr orb) | |
| virtual | ~TAO_CEC_Reactive_SupplierControl (void) |
| destructor... | |
| void | handle_timeout (const ACE_Time_Value &tv, const void *arg) |
| Receive the timeout from the adapter. | |
| virtual int | activate (void) |
| virtual int | shutdown (void) |
| virtual void | supplier_not_exist (TAO_CEC_ProxyPushConsumer *proxy) |
| virtual void | supplier_not_exist (TAO_CEC_ProxyPullConsumer *proxy) |
| virtual void | system_exception (TAO_CEC_ProxyPullConsumer *proxy, CORBA::SystemException &) |
| Some system exception was rasied while trying to push an event. | |
| virtual bool | need_to_disconnect (PortableServer::ServantBase *proxy) |
| virtual void | successful_transmission (PortableServer::ServantBase *proxy) |
| Allow others to inform us when a send or receive was successful. | |
Private Member Functions | |
| void | query_suppliers () |
Private Attributes | |
| ACE_Time_Value | rate_ |
| The polling rate. | |
| ACE_Time_Value | timeout_ |
| The polling timeout. | |
| unsigned int | retries_ |
| The number of retries per proxy until it is disconnected. | |
| TAO_CEC_SupplierControl_Adapter | adapter_ |
| The Adapter for the reactor events. | |
| TAO_CEC_EventChannel * | event_channel_ |
| The event channel. | |
| CORBA::ORB_var | orb_ |
| The ORB. | |
| CORBA::PolicyCurrent_var | policy_current_ |
| To control the timeout policy in the thread. | |
| CORBA::PolicyList | policy_list_ |
| Precomputed policy list to the set timeout. | |
| ACE_Reactor * | reactor_ |
| The ORB reactor. | |
| long | timer_id_ |
| The timer id. | |
Defines the interface for the supplier control strategy. This strategy handles misbehaving or failing suppliers. = MEMORY MANAGMENT = LOCKING = TODO
Definition at line 79 of file CEC_Reactive_SupplierControl.h.
|
||||||||||||||||||||||||
|
Constructor. It does not assume ownership of the parameter. Definition at line 42 of file CEC_Reactive_SupplierControl.cpp. References TAO_HAS_CORBA_MESSAGING.
00047 : rate_ (rate), 00048 timeout_ (timeout), 00049 retries_ (retries), 00050 adapter_ (this), 00051 event_channel_ (ec), 00052 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL) 00053 typed_event_channel_ (0), 00054 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */ 00055 orb_ (CORBA::ORB::_duplicate (orb)) 00056 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00057 // Initialise timer_id_ to an invalid timer id, so that in case we don't 00058 // schedule a timer, we don't cancel a random timer at shutdown 00059 , timer_id_ (-1) 00060 #endif /* TAO_HAS_CORBA_MESSAGING */ 00061 { 00062 this->reactor_ = 00063 this->orb_->orb_core ()->reactor (); 00064 } |
|
|
destructor...
Definition at line 92 of file CEC_Reactive_SupplierControl.cpp.
00093 {
00094 }
|
|
|
Activate any internal threads or timers used to poll the state of the suppliers Reimplemented from TAO_CEC_SupplierControl. Definition at line 258 of file CEC_Reactive_SupplierControl.cpp. References CORBA::PolicyCurrent::_narrow(), ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_NEW_ENV, TAO::unbounded_object_reference_sequence< Policy, Policy_var >::length(), and ACE_Reactor::schedule_timer().
00259 {
00260 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00261 ACE_TRY_NEW_ENV
00262 {
00263 // Get the PolicyCurrent object
00264 CORBA::Object_var tmp =
00265 this->orb_->resolve_initial_references ("PolicyCurrent"
00266 ACE_ENV_ARG_PARAMETER);
00267 ACE_TRY_CHECK;
00268
00269 this->policy_current_ =
00270 CORBA::PolicyCurrent::_narrow (tmp.in ()
00271 ACE_ENV_ARG_PARAMETER);
00272 ACE_TRY_CHECK;
00273
00274 // Pre-compute the policy list to the set the right timeout
00275 // value...
00276 // We need to convert the relative timeout into 100's of nano seconds.
00277 TimeBase::TimeT timeout;
00278 ORBSVCS_Time::Time_Value_to_TimeT (timeout,
00279 this->timeout_);
00280 CORBA::Any any;
00281 any <<= timeout;
00282
00283 this->policy_list_.length (1);
00284 this->policy_list_[0] =
00285 this->orb_->create_policy (
00286 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00287 any
00288 ACE_ENV_ARG_PARAMETER);
00289 ACE_TRY_CHECK;
00290
00291 // Only schedule the timer, when the rate is not zero
00292 if (this->rate_ != ACE_Time_Value::zero)
00293 {
00294 // Schedule the timer after these policies has been set, because the
00295 // handle_timeout uses these policies, if done in front, the channel
00296 // can crash when the timeout expires before initiazation is ready.
00297 timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00298 0,
00299 this->rate_,
00300 this->rate_);
00301 if (timer_id_ == -1)
00302 return -1;
00303 }
00304 }
00305 ACE_CATCHANY
00306 {
00307 return -1;
00308 }
00309 ACE_ENDTRY;
00310 #endif /* TAO_HAS_CORBA_MESSAGING */
00311
00312 return 0;
00313 }
|
|
||||||||||||
|
Receive the timeout from the adapter.
Definition at line 208 of file CEC_Reactive_SupplierControl.cpp. References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_CHECK_EX, ACE_TRY_EX, ACE_TRY_NEW_ENV, and query_suppliers(). Referenced by TAO_CEC_SupplierControl_Adapter::handle_timeout().
00211 {
00212 ACE_TRY_NEW_ENV
00213 {
00214 // Query the state of the Current object *before* we initiate
00215 // the iteration...
00216 CORBA::PolicyTypeSeq types;
00217 CORBA::PolicyList_var policies =
00218 this->policy_current_->get_policy_overrides (types
00219 ACE_ENV_ARG_PARAMETER);
00220 ACE_TRY_CHECK;
00221
00222 // Change the timeout
00223 this->policy_current_->set_policy_overrides (this->policy_list_,
00224 CORBA::ADD_OVERRIDE
00225 ACE_ENV_ARG_PARAMETER);
00226 ACE_TRY_CHECK;
00227
00228 ACE_TRY_EX (query)
00229 {
00230 // Query the state of the suppliers...
00231 this->query_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
00232 ACE_TRY_CHECK_EX (query);
00233 }
00234 ACE_CATCHANY
00235 {
00236 // Ignore all exceptions
00237 }
00238 ACE_ENDTRY;
00239
00240 this->policy_current_->set_policy_overrides (policies.in (),
00241 CORBA::SET_OVERRIDE
00242 ACE_ENV_ARG_PARAMETER);
00243 ACE_TRY_CHECK;
00244 for (CORBA::ULong i = 0; i != policies->length (); ++i)
00245 {
00246 policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00247 ACE_TRY_CHECK;
00248 }
00249 }
00250 ACE_CATCHANY
00251 {
00252 // Ignore all exceptions
00253 }
00254 ACE_ENDTRY;
00255 }
|
|
|
Do we need to disconnect this supplier? The parameter type for proxy is PortableServer::ServantBase* due to the fact that this method will be used for TAO_CEC_ProxyPushSupplier's and TAO_CEC_ProxyPullSupplier's. Reimplemented from TAO_CEC_SupplierControl. Definition at line 131 of file CEC_Reactive_SupplierControl.cpp. Referenced by system_exception().
00133 {
00134 bool disconnect = true;
00135
00136 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00137 if (this->typed_event_channel_)
00138 {
00139 // Typed EC
00140 TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00141 if (this->typed_event_channel_->
00142 get_servant_retry_map ().find (proxy, entry) == 0)
00143 {
00144 ++entry->int_id_;
00145 if (entry->int_id_ <= this->retries_)
00146 {
00147 disconnect = false;
00148 }
00149 }
00150 }
00151 else
00152 {
00153 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00154
00155 // Un-typed EC
00156 TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00157 if (this->event_channel_->
00158 get_servant_retry_map ().find (proxy, entry) == 0)
00159 {
00160 ++entry->int_id_;
00161 if (entry->int_id_ <= this->retries_)
00162 {
00163 disconnect = false;
00164 }
00165 }
00166
00167 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00168 }
00169 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00170
00171 return disconnect;
00172 }
|
|
|
Check if the suppliers still exists. It is a helper method for handle_timeout() to isolate the exceptions. Definition at line 97 of file CEC_Reactive_SupplierControl.cpp. References ACE_CHECK, ACE_ENV_ARG_PARAMETER, TAO_CEC_SupplierAdmin::for_each(), and TAO_CEC_EventChannel::supplier_admin(). Referenced by handle_timeout().
00099 {
00100 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00101 if (this->typed_event_channel_)
00102 {
00103 // Typed EC
00104 TAO_CEC_Ping_Typed_Push_Supplier push_worker (this);
00105
00106 this->typed_event_channel_->typed_supplier_admin ()->for_each (&push_worker
00107 ACE_ENV_ARG_PARAMETER);
00108 ACE_CHECK;
00109 }
00110 else
00111 {
00112 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00113
00114 // Un-typed EC
00115 TAO_CEC_Ping_Push_Supplier push_worker (this);
00116 this->event_channel_->supplier_admin ()->for_each (&push_worker
00117 ACE_ENV_ARG_PARAMETER);
00118 ACE_CHECK;
00119
00120 TAO_CEC_Ping_Pull_Supplier pull_worker (this);
00121 this->event_channel_->supplier_admin ()->for_each (&pull_worker
00122 ACE_ENV_ARG_PARAMETER);
00123 ACE_CHECK;
00124
00125 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00126 }
00127 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00128 }
|
|
|
Reimplemented from TAO_CEC_SupplierControl. Definition at line 316 of file CEC_Reactive_SupplierControl.cpp. References ACE_Reactor::cancel_timer(), and ACE_Event_Handler::reactor().
00317 {
00318 int r = 0;
00319
00320 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00321 r = this->reactor_->cancel_timer (timer_id_);
00322 #endif /* TAO_HAS_CORBA_MESSAGING */
00323 this->adapter_.reactor (0);
00324 return r;
00325 }
|
|
|
Allow others to inform us when a send or receive was successful.
Reimplemented from TAO_CEC_SupplierControl. Definition at line 175 of file CEC_Reactive_SupplierControl.cpp.
00177 {
00178 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00179 if (this->typed_event_channel_)
00180 {
00181 // Typed EC
00182 TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00183 if (this->typed_event_channel_->
00184 get_servant_retry_map ().find (proxy, entry) == 0)
00185 {
00186 entry->int_id_ = 0;
00187 }
00188 }
00189 else
00190 {
00191 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00192
00193 // Un-typed EC
00194 TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00195 if (this->event_channel_->
00196 get_servant_retry_map ().find (proxy, entry) == 0)
00197 {
00198 entry->int_id_ = 0;
00199 }
00200
00201 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00202 }
00203 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00204
00205 }
|
|
|
Invoked by helper classes when they detect that a supplier does not exists (i.e. _non_existent() returns true and/or the CORBA::OBJECT_NOT_EXIST exception has been raised). Reimplemented from TAO_CEC_SupplierControl. Definition at line 364 of file CEC_Reactive_SupplierControl.cpp. References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, and ACE_TRY_CHECK.
00367 {
00368 ACE_TRY
00369 {
00370 proxy->disconnect_pull_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00371 ACE_TRY_CHECK;
00372 }
00373 ACE_CATCHANY
00374 {
00375 // Ignore all exceptions..
00376 }
00377 ACE_ENDTRY;
00378 }
|
|
|
Invoked by helper classes when they detect that a supplier does not exists (i.e. _non_existent() returns true and/or the CORBA::OBJECT_NOT_EXIST exception has been raised). Reimplemented from TAO_CEC_SupplierControl. Definition at line 328 of file CEC_Reactive_SupplierControl.cpp. References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, and ACE_TRY_CHECK.
00331 {
00332 ACE_TRY
00333 {
00334 proxy->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00335 ACE_TRY_CHECK;
00336 }
00337 ACE_CATCHANY
00338 {
00339 // Ignore all exceptions..
00340 }
00341 ACE_ENDTRY;
00342 }
|
|
||||||||||||
|
Some system exception was rasied while trying to push an event.
Reimplemented from TAO_CEC_SupplierControl. Definition at line 381 of file CEC_Reactive_SupplierControl.cpp. References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, and need_to_disconnect().
00385 {
00386 ACE_TRY
00387 {
00388 if (this->need_to_disconnect (proxy))
00389 {
00390 proxy->disconnect_pull_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00391 ACE_TRY_CHECK;
00392 }
00393 }
00394 ACE_CATCHANY
00395 {
00396 // Ignore all exceptions..
00397 }
00398 ACE_ENDTRY;
00399 }
|
|
|
The Adapter for the reactor events.
Definition at line 149 of file CEC_Reactive_SupplierControl.h. |
|
|
The event channel.
Definition at line 152 of file CEC_Reactive_SupplierControl.h. |
|
|
The ORB.
Definition at line 160 of file CEC_Reactive_SupplierControl.h. |
|
|
To control the timeout policy in the thread.
Definition at line 163 of file CEC_Reactive_SupplierControl.h. |
|
|
Precomputed policy list to the set timeout.
Definition at line 166 of file CEC_Reactive_SupplierControl.h. |
|
|
The polling rate.
Definition at line 140 of file CEC_Reactive_SupplierControl.h. |
|
|
The ORB reactor.
Definition at line 169 of file CEC_Reactive_SupplierControl.h. |
|
|
The number of retries per proxy until it is disconnected.
Definition at line 146 of file CEC_Reactive_SupplierControl.h. |
|
|
The polling timeout.
Definition at line 143 of file CEC_Reactive_SupplierControl.h. |
|
|
The timer id.
Definition at line 173 of file CEC_Reactive_SupplierControl.h. |
1.3.6