TAO_CEC_Reactive_ConsumerControl Class Reference

ConsumerControl. More...

#include <CEC_Reactive_ConsumerControl.h>

Inheritance diagram for TAO_CEC_Reactive_ConsumerControl:

Inheritance graph
[legend]
Collaboration diagram for TAO_CEC_Reactive_ConsumerControl:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TAO_CEC_Reactive_ConsumerControl (const ACE_Time_Value &rate, const ACE_Time_Value &timeout, unsigned int retries, TAO_CEC_EventChannel *event_channel, CORBA::ORB_ptr orb)
virtual ~TAO_CEC_Reactive_ConsumerControl (void)
 destructor...

void handle_timeout (const ACE_Time_Value &tv, const void *arg)
 Receive the timeout from the adapter.

virtual int activate (void)
virtual int shutdown (void)
virtual void consumer_not_exist (TAO_CEC_ProxyPushSupplier *proxy)
virtual void consumer_not_exist (TAO_CEC_ProxyPullSupplier *proxy)
virtual void system_exception (TAO_CEC_ProxyPushSupplier *proxy, CORBA::SystemException &)
 Some system exception was rasied while trying to push an event.

virtual bool need_to_disconnect (PortableServer::ServantBase *proxy)
virtual void successful_transmission (PortableServer::ServantBase *proxy)
 Allow others to inform us when a send or receive was successful.


Private Member Functions

void query_consumers ()

Private Attributes

ACE_Time_Value rate_
 The polling rate.

ACE_Time_Value timeout_
 The polling timeout.

unsigned int retries_
 The number of retries per proxy until it is disconnected.

TAO_CEC_ConsumerControl_Adapter adapter_
 The Adapter for the reactor events.

TAO_CEC_EventChannelevent_channel_
 The event channel.

CORBA::ORB_var orb_
 The ORB.

CORBA::PolicyCurrent_var policy_current_
 To control the timeout policy in the thread.

CORBA::PolicyList policy_list_
 Precomputed policy list to the set timeout.

ACE_Reactorreactor_
 The ORB reactor.

long timer_id_
 The timer id.


Detailed Description

ConsumerControl.

Defines the interface for the consumer control strategy. This strategy handles misbehaving or failing consumers. = MEMORY MANAGMENT = LOCKING = TODO

Definition at line 81 of file CEC_Reactive_ConsumerControl.h.


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_CEC_Reactive_ConsumerControl::TAO_CEC_Reactive_ConsumerControl const ACE_Time_Value rate,
const ACE_Time_Value timeout,
unsigned int  retries,
TAO_CEC_EventChannel event_channel,
CORBA::ORB_ptr  orb
 

Constructor. It does not assume ownership of the parameter.

Definition at line 41 of file CEC_Reactive_ConsumerControl.cpp.

References TAO_HAS_CORBA_MESSAGING.

00046   : rate_ (rate),
00047     timeout_ (timeout),
00048     retries_ (retries),
00049     adapter_ (this),
00050     event_channel_ (ec),
00051 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00052     typed_event_channel_ (0),
00053 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00054     orb_ (CORBA::ORB::_duplicate (orb))
00055 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00056    // Initialise timer_id_ to an invalid timer id, so that in case we don't
00057    // schedule a timer, we don't cancel a random timer at shutdown
00058    , timer_id_ (-1)
00059 #endif /* TAO_HAS_CORBA_MESSAGING */
00060 {
00061   this->reactor_ =
00062     this->orb_->orb_core ()->reactor ();
00063 }

TAO_CEC_Reactive_ConsumerControl::~TAO_CEC_Reactive_ConsumerControl void   )  [virtual]
 

destructor...

Definition at line 91 of file CEC_Reactive_ConsumerControl.cpp.

00092 {
00093 }


Member Function Documentation

int TAO_CEC_Reactive_ConsumerControl::activate void   )  [virtual]
 

Activate any internal threads or timers used to poll the state of the consumers

Reimplemented from TAO_CEC_ConsumerControl.

Definition at line 257 of file CEC_Reactive_ConsumerControl.cpp.

References CORBA::PolicyCurrent::_narrow(), ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_NEW_ENV, TAO::unbounded_object_reference_sequence< Policy, Policy_var >::length(), rate_, and ACE_Reactor::schedule_timer().

00258 {
00259 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00260   ACE_TRY_NEW_ENV
00261     {
00262       // Get the PolicyCurrent object
00263       CORBA::Object_var tmp =
00264         this->orb_->resolve_initial_references ("PolicyCurrent"
00265                                                 ACE_ENV_ARG_PARAMETER);
00266       ACE_TRY_CHECK;
00267 
00268       this->policy_current_ =
00269         CORBA::PolicyCurrent::_narrow (tmp.in ()
00270                                        ACE_ENV_ARG_PARAMETER);
00271       ACE_TRY_CHECK;
00272 
00273       // Pre-compute the policy list to the set the right timeout
00274       // value...
00275       // We need to convert the relative timeout into 100's of nano seconds.
00276       TimeBase::TimeT timeout;
00277       ORBSVCS_Time::Time_Value_to_TimeT (timeout,
00278                                          this->timeout_);
00279       CORBA::Any any;
00280       any <<= timeout;
00281 
00282       this->policy_list_.length (1);
00283       this->policy_list_[0] =
00284         this->orb_->create_policy (
00285                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00286                any
00287                ACE_ENV_ARG_PARAMETER);
00288       ACE_TRY_CHECK;
00289 
00290       // Only schedule the timer, when the rate is not zero
00291       if (this->rate_ != ACE_Time_Value::zero)
00292       {
00293         // Schedule the timer after these policies has been set, because the
00294         // handle_timeout uses these policies, if done in front, the channel
00295         // can crash when the timeout expires before initiazation is ready.
00296         timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00297                                                     0,
00298                                                     this->rate_,
00299                                                     this->rate_);
00300         if (timer_id_ == -1)
00301          return -1;
00302       }
00303     }
00304   ACE_CATCHANY
00305     {
00306       return -1;
00307     }
00308   ACE_ENDTRY;
00309 #endif /* TAO_HAS_CORBA_MESSAGING */
00310 
00311   return 0;
00312 }

void TAO_CEC_Reactive_ConsumerControl::consumer_not_exist TAO_CEC_ProxyPullSupplier proxy  )  [virtual]
 

Invoked by helper classes when they detect that a consumer no longer exists (i.e. _non_existent() returns true and/or the CORBA::OBJECT_NOT_EXIST exception has been raised).

Reimplemented from TAO_CEC_ConsumerControl.

Definition at line 352 of file CEC_Reactive_ConsumerControl.cpp.

References ACE_ANY_EXCEPTION, ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_PRINT_EXCEPTION, ACE_TEXT(), ACE_TRY, and ACE_TRY_CHECK.

00355 {
00356   ACE_TRY
00357     {
00358       proxy->disconnect_pull_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00359       ACE_TRY_CHECK;
00360     }
00361   ACE_CATCHANY
00362     {
00363       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00364                            ACE_TEXT ("Reactive_ConsumerControl::consumer_not_exist"));
00365       // Ignore all exceptions..
00366     }
00367   ACE_ENDTRY;
00368 }

void TAO_CEC_Reactive_ConsumerControl::consumer_not_exist TAO_CEC_ProxyPushSupplier proxy  )  [virtual]
 

When pushing an event to the consumer a CORBA::OBJECT_NOT_EXIST exception was raised. The only interpretation is that the object has been destroyed. The strategy has to (at the very least), reclaim all the resources attached to that object.

Reimplemented from TAO_CEC_ConsumerControl.

Definition at line 327 of file CEC_Reactive_ConsumerControl.cpp.

References ACE_ANY_EXCEPTION, ACE_CATCHANY, ACE_DEBUG, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_PRINT_EXCEPTION, ACE_TEXT(), ACE_TRY, ACE_TRY_CHECK, LM_DEBUG, and TAO_debug_level.

00330 {
00331   ACE_TRY
00332     {
00333       proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00334       ACE_TRY_CHECK;
00335 
00336       if (TAO_debug_level >= 10)
00337         {
00338           ACE_DEBUG ((LM_DEBUG,
00339                       ACE_TEXT ("ProxyPushSupplier disconnected due to consumer_not_exist\n")));
00340         }
00341     }
00342   ACE_CATCHANY
00343     {
00344       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00345                            ACE_TEXT ("Reactive_ConsumerControl::consumer_not_exist"));
00346       // Ignore all exceptions..
00347     }
00348   ACE_ENDTRY;
00349 }

void TAO_CEC_Reactive_ConsumerControl::handle_timeout const ACE_Time_Value tv,
const void *  arg
 

Receive the timeout from the adapter.

Definition at line 207 of file CEC_Reactive_ConsumerControl.cpp.

References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_CHECK_EX, ACE_TRY_EX, ACE_TRY_NEW_ENV, and query_consumers().

Referenced by TAO_CEC_ConsumerControl_Adapter::handle_timeout().

00210 {
00211   ACE_TRY_NEW_ENV
00212     {
00213       // Query the state of the Current object *before* we initiate
00214       // the iteration...
00215       CORBA::PolicyTypeSeq types;
00216       CORBA::PolicyList_var policies =
00217         this->policy_current_->get_policy_overrides (types
00218                                                      ACE_ENV_ARG_PARAMETER);
00219       ACE_TRY_CHECK;
00220 
00221       // Change the timeout
00222       this->policy_current_->set_policy_overrides (this->policy_list_,
00223                                                    CORBA::ADD_OVERRIDE
00224                                                    ACE_ENV_ARG_PARAMETER);
00225       ACE_TRY_CHECK;
00226 
00227       ACE_TRY_EX (query)
00228         {
00229           // Query the state of the consumers...
00230           this->query_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
00231           ACE_TRY_CHECK_EX (query);
00232         }
00233       ACE_CATCHANY
00234         {
00235           // Ignore all exceptions
00236         }
00237       ACE_ENDTRY;
00238 
00239       this->policy_current_->set_policy_overrides (policies.in (),
00240                                                    CORBA::SET_OVERRIDE
00241                                                    ACE_ENV_ARG_PARAMETER);
00242       ACE_TRY_CHECK;
00243       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00244         {
00245           policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00246           ACE_TRY_CHECK;
00247         }
00248     }
00249   ACE_CATCHANY
00250     {
00251       // Ignore all exceptions
00252     }
00253   ACE_ENDTRY;
00254 }

bool TAO_CEC_Reactive_ConsumerControl::need_to_disconnect PortableServer::ServantBase proxy  )  [virtual]
 

Do we need to disconnect this supplier? The parameter type for proxy is PortableServer::ServantBase* due to the fact that this method will be used for TAO_CEC_ProxyPushSupplier's and TAO_CEC_ProxyPullSupplier's.

Reimplemented from TAO_CEC_ConsumerControl.

Definition at line 129 of file CEC_Reactive_ConsumerControl.cpp.

References retries_.

Referenced by system_exception().

00131 {
00132   bool disconnect = true;
00133 
00134 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00135   if (this->typed_event_channel_)
00136     {
00137       // Typed EC
00138       TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00139       if (this->typed_event_channel_->
00140           get_servant_retry_map ().find (proxy, entry) == 0)
00141         {
00142           ++entry->int_id_;
00143           if (entry->int_id_ <= this->retries_)
00144             {
00145               disconnect = false;
00146             }
00147         }
00148     }
00149   else
00150     {
00151 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00152 
00153   // Un-typed EC
00154   TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00155   if (this->event_channel_->
00156       get_servant_retry_map ().find (proxy, entry) == 0)
00157     {
00158       ++entry->int_id_;
00159       if (entry->int_id_ <= this->retries_)
00160         {
00161           disconnect = false;
00162         }
00163     }
00164 
00165 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00166     }
00167 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00168 
00169   return disconnect;
00170 }

void TAO_CEC_Reactive_ConsumerControl::query_consumers  )  [private]
 

Check if the consumers still exists. It is a helper method for handle_timeout() to isolate the exceptions.

Definition at line 96 of file CEC_Reactive_ConsumerControl.cpp.

References ACE_CHECK, ACE_ENV_ARG_PARAMETER, TAO_CEC_EventChannel::consumer_admin(), and TAO_CEC_ConsumerAdmin::for_each().

Referenced by handle_timeout().

00098 {
00099   TAO_CEC_Ping_Push_Consumer push_worker (this);
00100 
00101 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00102   if (this->typed_event_channel_)
00103     {
00104       // Typed EC
00105       this->typed_event_channel_->typed_consumer_admin ()->for_each (&push_worker
00106                                                                      ACE_ENV_ARG_PARAMETER);
00107       ACE_CHECK;
00108     }
00109   else
00110     {
00111 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00112 
00113   // Un-typed EC
00114   this->event_channel_->consumer_admin ()->for_each (&push_worker
00115                                                      ACE_ENV_ARG_PARAMETER);
00116   ACE_CHECK;
00117 
00118   TAO_CEC_Ping_Pull_Consumer pull_worker (this);
00119   this->event_channel_->consumer_admin ()->for_each (&pull_worker
00120                                                      ACE_ENV_ARG_PARAMETER);
00121   ACE_CHECK;
00122 
00123 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00124     }
00125 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00126 }

int TAO_CEC_Reactive_ConsumerControl::shutdown void   )  [virtual]
 

Reimplemented from TAO_CEC_ConsumerControl.

Definition at line 315 of file CEC_Reactive_ConsumerControl.cpp.

References ACE_Reactor::cancel_timer(), and ACE_Event_Handler::reactor().

00316 {
00317   int r = 0;
00318 
00319 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00320   r = this->reactor_->cancel_timer (timer_id_);
00321 #endif /* TAO_HAS_CORBA_MESSAGING */
00322   this->adapter_.reactor (0);
00323   return r;
00324 }

void TAO_CEC_Reactive_ConsumerControl::successful_transmission PortableServer::ServantBase proxy  )  [virtual]
 

Allow others to inform us when a send or receive was successful.

Reimplemented from TAO_CEC_ConsumerControl.

Definition at line 173 of file CEC_Reactive_ConsumerControl.cpp.

00175 {
00176 
00177 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00178   if (this->typed_event_channel_)
00179     {
00180       // Typed EC
00181       TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00182       if (this->typed_event_channel_->
00183           get_servant_retry_map ().find (proxy, entry) == 0)
00184         {
00185           entry->int_id_ = 0;
00186         }
00187     }
00188   else
00189     {
00190 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00191 
00192   // Un-typed EC
00193   TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00194   if (this->event_channel_->
00195       get_servant_retry_map ().find (proxy, entry) == 0)
00196     {
00197       entry->int_id_ = 0;
00198     }
00199 
00200 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00201     }
00202 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00203 
00204 }

void TAO_CEC_Reactive_ConsumerControl::system_exception TAO_CEC_ProxyPushSupplier proxy,
CORBA::SystemException
[virtual]
 

Some system exception was rasied while trying to push an event.

Reimplemented from TAO_CEC_ConsumerControl.

Definition at line 371 of file CEC_Reactive_ConsumerControl.cpp.

References ACE_CATCHANY, ACE_DEBUG, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TEXT(), ACE_TRY, ACE_TRY_CHECK, LM_DEBUG, need_to_disconnect(), and TAO_debug_level.

00375 {
00376   ACE_TRY
00377     {
00378       if (this->need_to_disconnect (proxy))
00379         {
00380           proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00381           ACE_TRY_CHECK;
00382 
00383           if (TAO_debug_level >= 10)
00384             {
00385               ACE_DEBUG ((LM_DEBUG,
00386                       ACE_TEXT ("ProxyPushSupplier disconnected due to consumer_not_exist\n")));
00387             }
00388         }
00389     }
00390   ACE_CATCHANY
00391     {
00392       // Ignore all exceptions..
00393     }
00394   ACE_ENDTRY;
00395 }


Member Data Documentation

TAO_CEC_ConsumerControl_Adapter TAO_CEC_Reactive_ConsumerControl::adapter_ [private]
 

The Adapter for the reactor events.

Definition at line 147 of file CEC_Reactive_ConsumerControl.h.

TAO_CEC_EventChannel* TAO_CEC_Reactive_ConsumerControl::event_channel_ [private]
 

The event channel.

Definition at line 150 of file CEC_Reactive_ConsumerControl.h.

CORBA::ORB_var TAO_CEC_Reactive_ConsumerControl::orb_ [private]
 

The ORB.

Definition at line 158 of file CEC_Reactive_ConsumerControl.h.

CORBA::PolicyCurrent_var TAO_CEC_Reactive_ConsumerControl::policy_current_ [private]
 

To control the timeout policy in the thread.

Definition at line 161 of file CEC_Reactive_ConsumerControl.h.

CORBA::PolicyList TAO_CEC_Reactive_ConsumerControl::policy_list_ [private]
 

Precomputed policy list to the set timeout.

Definition at line 164 of file CEC_Reactive_ConsumerControl.h.

ACE_Time_Value TAO_CEC_Reactive_ConsumerControl::rate_ [private]
 

The polling rate.

Definition at line 138 of file CEC_Reactive_ConsumerControl.h.

Referenced by activate().

ACE_Reactor* TAO_CEC_Reactive_ConsumerControl::reactor_ [private]
 

The ORB reactor.

Definition at line 167 of file CEC_Reactive_ConsumerControl.h.

unsigned int TAO_CEC_Reactive_ConsumerControl::retries_ [private]
 

The number of retries per proxy until it is disconnected.

Definition at line 144 of file CEC_Reactive_ConsumerControl.h.

Referenced by need_to_disconnect().

ACE_Time_Value TAO_CEC_Reactive_ConsumerControl::timeout_ [private]
 

The polling timeout.

Definition at line 141 of file CEC_Reactive_ConsumerControl.h.

long TAO_CEC_Reactive_ConsumerControl::timer_id_ [private]
 

The timer id.

Definition at line 171 of file CEC_Reactive_ConsumerControl.h.


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 13:19:46 2006 for TAO_CosEvent by doxygen 1.3.6