TAO_CEC_Reactive_SupplierControl Class Reference

SupplierControl. More...

#include <CEC_Reactive_SupplierControl.h>

Inheritance diagram for TAO_CEC_Reactive_SupplierControl:

Inheritance graph
[legend]
Collaboration diagram for TAO_CEC_Reactive_SupplierControl:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TAO_CEC_Reactive_SupplierControl (const ACE_Time_Value &rate, const ACE_Time_Value &timeout, unsigned int retries, TAO_CEC_EventChannel *event_channel, CORBA::ORB_ptr orb)
virtual ~TAO_CEC_Reactive_SupplierControl (void)
 destructor...

void handle_timeout (const ACE_Time_Value &tv, const void *arg)
 Receive the timeout from the adapter.

virtual int activate (void)
virtual int shutdown (void)
virtual void supplier_not_exist (TAO_CEC_ProxyPushConsumer *proxy)
virtual void supplier_not_exist (TAO_CEC_ProxyPullConsumer *proxy)
virtual void system_exception (TAO_CEC_ProxyPullConsumer *proxy, CORBA::SystemException &)
 Some system exception was rasied while trying to push an event.

virtual bool need_to_disconnect (PortableServer::ServantBase *proxy)
virtual void successful_transmission (PortableServer::ServantBase *proxy)
 Allow others to inform us when a send or receive was successful.


Private Member Functions

void query_suppliers ()

Private Attributes

ACE_Time_Value rate_
 The polling rate.

ACE_Time_Value timeout_
 The polling timeout.

unsigned int retries_
 The number of retries per proxy until it is disconnected.

TAO_CEC_SupplierControl_Adapter adapter_
 The Adapter for the reactor events.

TAO_CEC_EventChannelevent_channel_
 The event channel.

CORBA::ORB_var orb_
 The ORB.

CORBA::PolicyCurrent_var policy_current_
 To control the timeout policy in the thread.

CORBA::PolicyList policy_list_
 Precomputed policy list to the set timeout.

ACE_Reactorreactor_
 The ORB reactor.

long timer_id_
 The timer id.


Detailed Description

SupplierControl.

Defines the interface for the supplier control strategy. This strategy handles misbehaving or failing suppliers. = MEMORY MANAGMENT = LOCKING = TODO

Definition at line 79 of file CEC_Reactive_SupplierControl.h.


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_CEC_Reactive_SupplierControl::TAO_CEC_Reactive_SupplierControl const ACE_Time_Value rate,
const ACE_Time_Value timeout,
unsigned int  retries,
TAO_CEC_EventChannel event_channel,
CORBA::ORB_ptr  orb
 

Constructor. It does not assume ownership of the parameter.

Definition at line 42 of file CEC_Reactive_SupplierControl.cpp.

References TAO_HAS_CORBA_MESSAGING.

00047   : rate_ (rate),
00048     timeout_ (timeout),
00049     retries_ (retries),
00050     adapter_ (this),
00051     event_channel_ (ec),
00052 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00053     typed_event_channel_ (0),
00054 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00055     orb_ (CORBA::ORB::_duplicate (orb))
00056 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00057    // Initialise timer_id_ to an invalid timer id, so that in case we don't
00058    // schedule a timer, we don't cancel a random timer at shutdown
00059    , timer_id_ (-1)
00060 #endif /* TAO_HAS_CORBA_MESSAGING */
00061 {
00062   this->reactor_ =
00063     this->orb_->orb_core ()->reactor ();
00064 }

TAO_CEC_Reactive_SupplierControl::~TAO_CEC_Reactive_SupplierControl void   )  [virtual]
 

destructor...

Definition at line 92 of file CEC_Reactive_SupplierControl.cpp.

00093 {
00094 }


Member Function Documentation

int TAO_CEC_Reactive_SupplierControl::activate void   )  [virtual]
 

Activate any internal threads or timers used to poll the state of the suppliers

Reimplemented from TAO_CEC_SupplierControl.

Definition at line 258 of file CEC_Reactive_SupplierControl.cpp.

References CORBA::PolicyCurrent::_narrow(), ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_NEW_ENV, TAO::unbounded_object_reference_sequence< Policy, Policy_var >::length(), and ACE_Reactor::schedule_timer().

00259 {
00260 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00261   ACE_TRY_NEW_ENV
00262     {
00263       // Get the PolicyCurrent object
00264       CORBA::Object_var tmp =
00265         this->orb_->resolve_initial_references ("PolicyCurrent"
00266                                                 ACE_ENV_ARG_PARAMETER);
00267       ACE_TRY_CHECK;
00268 
00269       this->policy_current_ =
00270         CORBA::PolicyCurrent::_narrow (tmp.in ()
00271                                        ACE_ENV_ARG_PARAMETER);
00272       ACE_TRY_CHECK;
00273 
00274       // Pre-compute the policy list to the set the right timeout
00275       // value...
00276       // We need to convert the relative timeout into 100's of nano seconds.
00277       TimeBase::TimeT timeout;
00278       ORBSVCS_Time::Time_Value_to_TimeT (timeout,
00279                                          this->timeout_);
00280       CORBA::Any any;
00281       any <<= timeout;
00282 
00283       this->policy_list_.length (1);
00284       this->policy_list_[0] =
00285         this->orb_->create_policy (
00286                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00287                any
00288                ACE_ENV_ARG_PARAMETER);
00289       ACE_TRY_CHECK;
00290 
00291       // Only schedule the timer, when the rate is not zero
00292       if (this->rate_ != ACE_Time_Value::zero)
00293       {
00294         // Schedule the timer after these policies has been set, because the
00295         // handle_timeout uses these policies, if done in front, the channel
00296         // can crash when the timeout expires before initiazation is ready.
00297         timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00298                                                     0,
00299                                                     this->rate_,
00300                                                     this->rate_);
00301         if (timer_id_ == -1)
00302           return -1;
00303       }
00304     }
00305   ACE_CATCHANY
00306     {
00307       return -1;
00308     }
00309   ACE_ENDTRY;
00310 #endif /* TAO_HAS_CORBA_MESSAGING */
00311 
00312   return 0;
00313 }

void TAO_CEC_Reactive_SupplierControl::handle_timeout const ACE_Time_Value tv,
const void *  arg
 

Receive the timeout from the adapter.

Definition at line 208 of file CEC_Reactive_SupplierControl.cpp.

References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_CHECK_EX, ACE_TRY_EX, ACE_TRY_NEW_ENV, and query_suppliers().

Referenced by TAO_CEC_SupplierControl_Adapter::handle_timeout().

00211 {
00212   ACE_TRY_NEW_ENV
00213     {
00214       // Query the state of the Current object *before* we initiate
00215       // the iteration...
00216       CORBA::PolicyTypeSeq types;
00217       CORBA::PolicyList_var policies =
00218         this->policy_current_->get_policy_overrides (types
00219                                                      ACE_ENV_ARG_PARAMETER);
00220       ACE_TRY_CHECK;
00221 
00222       // Change the timeout
00223       this->policy_current_->set_policy_overrides (this->policy_list_,
00224                                                    CORBA::ADD_OVERRIDE
00225                                                    ACE_ENV_ARG_PARAMETER);
00226       ACE_TRY_CHECK;
00227 
00228       ACE_TRY_EX (query)
00229         {
00230           // Query the state of the suppliers...
00231           this->query_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
00232           ACE_TRY_CHECK_EX (query);
00233         }
00234       ACE_CATCHANY
00235         {
00236           // Ignore all exceptions
00237         }
00238       ACE_ENDTRY;
00239 
00240       this->policy_current_->set_policy_overrides (policies.in (),
00241                                                    CORBA::SET_OVERRIDE
00242                                                    ACE_ENV_ARG_PARAMETER);
00243       ACE_TRY_CHECK;
00244       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00245         {
00246           policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00247           ACE_TRY_CHECK;
00248         }
00249     }
00250   ACE_CATCHANY
00251     {
00252       // Ignore all exceptions
00253     }
00254   ACE_ENDTRY;
00255 }

bool TAO_CEC_Reactive_SupplierControl::need_to_disconnect PortableServer::ServantBase proxy  )  [virtual]
 

Do we need to disconnect this supplier? The parameter type for proxy is PortableServer::ServantBase* due to the fact that this method will be used for TAO_CEC_ProxyPushSupplier's and TAO_CEC_ProxyPullSupplier's.

Reimplemented from TAO_CEC_SupplierControl.

Definition at line 131 of file CEC_Reactive_SupplierControl.cpp.

Referenced by system_exception().

00133 {
00134   bool disconnect = true;
00135 
00136 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00137   if (this->typed_event_channel_)
00138     {
00139       // Typed EC
00140       TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00141       if (this->typed_event_channel_->
00142           get_servant_retry_map ().find (proxy, entry) == 0)
00143         {
00144           ++entry->int_id_;
00145           if (entry->int_id_ <= this->retries_)
00146             {
00147               disconnect = false;
00148             }
00149         }
00150     }
00151   else
00152     {
00153 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00154 
00155   // Un-typed EC
00156   TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00157   if (this->event_channel_->
00158       get_servant_retry_map ().find (proxy, entry) == 0)
00159     {
00160       ++entry->int_id_;
00161       if (entry->int_id_ <= this->retries_)
00162         {
00163           disconnect = false;
00164         }
00165     }
00166 
00167 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00168     }
00169 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00170 
00171   return disconnect;
00172 }

void TAO_CEC_Reactive_SupplierControl::query_suppliers  )  [private]
 

Check if the suppliers still exists. It is a helper method for handle_timeout() to isolate the exceptions.

Definition at line 97 of file CEC_Reactive_SupplierControl.cpp.

References ACE_CHECK, ACE_ENV_ARG_PARAMETER, TAO_CEC_SupplierAdmin::for_each(), and TAO_CEC_EventChannel::supplier_admin().

Referenced by handle_timeout().

00099 {
00100 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00101   if (this->typed_event_channel_)
00102     {
00103       // Typed EC
00104       TAO_CEC_Ping_Typed_Push_Supplier push_worker (this);
00105 
00106       this->typed_event_channel_->typed_supplier_admin ()->for_each (&push_worker
00107                                                                      ACE_ENV_ARG_PARAMETER);
00108       ACE_CHECK;
00109     }
00110   else
00111     {
00112 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00113 
00114   // Un-typed EC
00115   TAO_CEC_Ping_Push_Supplier push_worker (this);
00116   this->event_channel_->supplier_admin ()->for_each (&push_worker
00117                                                      ACE_ENV_ARG_PARAMETER);
00118   ACE_CHECK;
00119 
00120   TAO_CEC_Ping_Pull_Supplier pull_worker (this);
00121   this->event_channel_->supplier_admin ()->for_each (&pull_worker
00122                                                      ACE_ENV_ARG_PARAMETER);
00123   ACE_CHECK;
00124 
00125 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00126     }
00127 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00128 }

int TAO_CEC_Reactive_SupplierControl::shutdown void   )  [virtual]
 

Reimplemented from TAO_CEC_SupplierControl.

Definition at line 316 of file CEC_Reactive_SupplierControl.cpp.

References ACE_Reactor::cancel_timer(), and ACE_Event_Handler::reactor().

00317 {
00318   int r = 0;
00319 
00320 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00321   r = this->reactor_->cancel_timer (timer_id_);
00322 #endif /* TAO_HAS_CORBA_MESSAGING */
00323   this->adapter_.reactor (0);
00324   return r;
00325 }

void TAO_CEC_Reactive_SupplierControl::successful_transmission PortableServer::ServantBase proxy  )  [virtual]
 

Allow others to inform us when a send or receive was successful.

Reimplemented from TAO_CEC_SupplierControl.

Definition at line 175 of file CEC_Reactive_SupplierControl.cpp.

00177 {
00178 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00179   if (this->typed_event_channel_)
00180     {
00181       // Typed EC
00182       TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00183       if (this->typed_event_channel_->
00184           get_servant_retry_map ().find (proxy, entry) == 0)
00185         {
00186           entry->int_id_ = 0;
00187         }
00188     }
00189   else
00190     {
00191 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00192 
00193   // Un-typed EC
00194   TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00195   if (this->event_channel_->
00196       get_servant_retry_map ().find (proxy, entry) == 0)
00197     {
00198       entry->int_id_ = 0;
00199     }
00200 
00201 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00202     }
00203 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00204 
00205 }

void TAO_CEC_Reactive_SupplierControl::supplier_not_exist TAO_CEC_ProxyPullConsumer proxy  )  [virtual]
 

Invoked by helper classes when they detect that a supplier does not exists (i.e. _non_existent() returns true and/or the CORBA::OBJECT_NOT_EXIST exception has been raised).

Reimplemented from TAO_CEC_SupplierControl.

Definition at line 364 of file CEC_Reactive_SupplierControl.cpp.

References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, and ACE_TRY_CHECK.

00367 {
00368   ACE_TRY
00369     {
00370       proxy->disconnect_pull_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00371       ACE_TRY_CHECK;
00372     }
00373   ACE_CATCHANY
00374     {
00375       // Ignore all exceptions..
00376     }
00377   ACE_ENDTRY;
00378 }

void TAO_CEC_Reactive_SupplierControl::supplier_not_exist TAO_CEC_ProxyPushConsumer proxy  )  [virtual]
 

Invoked by helper classes when they detect that a supplier does not exists (i.e. _non_existent() returns true and/or the CORBA::OBJECT_NOT_EXIST exception has been raised).

Reimplemented from TAO_CEC_SupplierControl.

Definition at line 328 of file CEC_Reactive_SupplierControl.cpp.

References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, and ACE_TRY_CHECK.

00331 {
00332   ACE_TRY
00333     {
00334       proxy->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00335       ACE_TRY_CHECK;
00336     }
00337   ACE_CATCHANY
00338     {
00339       // Ignore all exceptions..
00340     }
00341   ACE_ENDTRY;
00342 }

void TAO_CEC_Reactive_SupplierControl::system_exception TAO_CEC_ProxyPullConsumer proxy,
CORBA::SystemException
[virtual]
 

Some system exception was rasied while trying to push an event.

Reimplemented from TAO_CEC_SupplierControl.

Definition at line 381 of file CEC_Reactive_SupplierControl.cpp.

References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, and need_to_disconnect().

00385 {
00386   ACE_TRY
00387     {
00388       if (this->need_to_disconnect (proxy))
00389         {
00390           proxy->disconnect_pull_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00391           ACE_TRY_CHECK;
00392         }
00393     }
00394   ACE_CATCHANY
00395     {
00396       // Ignore all exceptions..
00397     }
00398   ACE_ENDTRY;
00399 }


Member Data Documentation

TAO_CEC_SupplierControl_Adapter TAO_CEC_Reactive_SupplierControl::adapter_ [private]
 

The Adapter for the reactor events.

Definition at line 149 of file CEC_Reactive_SupplierControl.h.

TAO_CEC_EventChannel* TAO_CEC_Reactive_SupplierControl::event_channel_ [private]
 

The event channel.

Definition at line 152 of file CEC_Reactive_SupplierControl.h.

CORBA::ORB_var TAO_CEC_Reactive_SupplierControl::orb_ [private]
 

The ORB.

Definition at line 160 of file CEC_Reactive_SupplierControl.h.

CORBA::PolicyCurrent_var TAO_CEC_Reactive_SupplierControl::policy_current_ [private]
 

To control the timeout policy in the thread.

Definition at line 163 of file CEC_Reactive_SupplierControl.h.

CORBA::PolicyList TAO_CEC_Reactive_SupplierControl::policy_list_ [private]
 

Precomputed policy list to the set timeout.

Definition at line 166 of file CEC_Reactive_SupplierControl.h.

ACE_Time_Value TAO_CEC_Reactive_SupplierControl::rate_ [private]
 

The polling rate.

Definition at line 140 of file CEC_Reactive_SupplierControl.h.

ACE_Reactor* TAO_CEC_Reactive_SupplierControl::reactor_ [private]
 

The ORB reactor.

Definition at line 169 of file CEC_Reactive_SupplierControl.h.

unsigned int TAO_CEC_Reactive_SupplierControl::retries_ [private]
 

The number of retries per proxy until it is disconnected.

Definition at line 146 of file CEC_Reactive_SupplierControl.h.

ACE_Time_Value TAO_CEC_Reactive_SupplierControl::timeout_ [private]
 

The polling timeout.

Definition at line 143 of file CEC_Reactive_SupplierControl.h.

long TAO_CEC_Reactive_SupplierControl::timer_id_ [private]
 

The timer id.

Definition at line 173 of file CEC_Reactive_SupplierControl.h.


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 13:19:49 2006 for TAO_CosEvent by doxygen 1.3.6