TAO_CEC_Reactive_ConsumerControl Class Reference

ConsumerControl. More...

#include <CEC_Reactive_ConsumerControl.h>

Inheritance diagram for TAO_CEC_Reactive_ConsumerControl:

Inheritance graph
[legend]
Collaboration diagram for TAO_CEC_Reactive_ConsumerControl:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TAO_CEC_Reactive_ConsumerControl (const ACE_Time_Value &rate, const ACE_Time_Value &timeout, unsigned int retries, TAO_CEC_EventChannel *event_channel, CORBA::ORB_ptr orb)
virtual ~TAO_CEC_Reactive_ConsumerControl (void)
 destructor...
void handle_timeout (const ACE_Time_Value &tv, const void *arg)
 Receive the timeout from the adapter.
virtual int activate (void)
virtual int shutdown (void)
virtual void consumer_not_exist (TAO_CEC_ProxyPushSupplier *proxy)
virtual void consumer_not_exist (TAO_CEC_ProxyPullSupplier *proxy)
virtual void system_exception (TAO_CEC_ProxyPushSupplier *proxy, CORBA::SystemException &)
 Some system exception was rasied while trying to push an event.
virtual bool need_to_disconnect (PortableServer::ServantBase *proxy)
virtual void successful_transmission (PortableServer::ServantBase *proxy)
 Allow others to inform us when a send or receive was successful.

Private Member Functions

void query_consumers (void)

Private Attributes

ACE_Time_Value rate_
 The polling rate.
ACE_Time_Value timeout_
 The polling timeout.
unsigned int retries_
 The number of retries per proxy until it is disconnected.
TAO_CEC_ConsumerControl_Adapter adapter_
 The Adapter for the reactor events.
TAO_CEC_EventChannelevent_channel_
 The event channel.
CORBA::ORB_var orb_
 The ORB.
CORBA::PolicyCurrent_var policy_current_
 To control the timeout policy in the thread.
CORBA::PolicyList policy_list_
 Precomputed policy list to the set timeout.
ACE_Reactorreactor_
 The ORB reactor.
long timer_id_
 The timer id.

Detailed Description

ConsumerControl.

Defines the interface for the consumer control strategy. This strategy handles misbehaving or failing consumers. = MEMORY MANAGMENT = LOCKING = TODO

Definition at line 81 of file CEC_Reactive_ConsumerControl.h.


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_CEC_Reactive_ConsumerControl::TAO_CEC_Reactive_ConsumerControl ( const ACE_Time_Value rate,
const ACE_Time_Value timeout,
unsigned int  retries,
TAO_CEC_EventChannel event_channel,
CORBA::ORB_ptr  orb 
)

Constructor. It does not assume ownership of the <event_channel> parameter.

Definition at line 41 of file CEC_Reactive_ConsumerControl.cpp.

References orb_.

00046   : rate_ (rate),
00047     timeout_ (timeout),
00048     retries_ (retries),
00049     adapter_ (this),
00050     event_channel_ (ec),
00051 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00052     typed_event_channel_ (0),
00053 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00054     orb_ (CORBA::ORB::_duplicate (orb))
00055 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00056    // Initialise timer_id_ to an invalid timer id, so that in case we don't
00057    // schedule a timer, we don't cancel a random timer at shutdown
00058    , timer_id_ (-1)
00059 #endif /* TAO_HAS_CORBA_MESSAGING */
00060 {
00061   this->reactor_ =
00062     this->orb_->orb_core ()->reactor ();
00063 }

TAO_CEC_Reactive_ConsumerControl::~TAO_CEC_Reactive_ConsumerControl ( void   )  [virtual]

destructor...

Definition at line 91 of file CEC_Reactive_ConsumerControl.cpp.

00092 {
00093 }


Member Function Documentation

int TAO_CEC_Reactive_ConsumerControl::activate ( void   )  [virtual]

Activate any internal threads or timers used to poll the state of the consumers

Reimplemented from TAO_CEC_ConsumerControl.

Definition at line 240 of file CEC_Reactive_ConsumerControl.cpp.

References TAO_Pseudo_Var_T< T >::in(), orb_, policy_current_, policy_list_, reactor_, ACE_Reactor::schedule_timer(), timer_id_, and ACE_Time_Value::zero.

00241 {
00242 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00243   try
00244     {
00245       // Get the PolicyCurrent object
00246       CORBA::Object_var tmp =
00247         this->orb_->resolve_initial_references ("PolicyCurrent");
00248 
00249       this->policy_current_ =
00250         CORBA::PolicyCurrent::_narrow (tmp.in ());
00251 
00252       // Pre-compute the policy list to the set the right timeout
00253       // value...
00254       // We need to convert the relative timeout into 100's of nano seconds.
00255       TimeBase::TimeT timeout;
00256       ORBSVCS_Time::Time_Value_to_TimeT (timeout,
00257                                          this->timeout_);
00258       CORBA::Any any;
00259       any <<= timeout;
00260 
00261       this->policy_list_.length (1);
00262       this->policy_list_[0] =
00263         this->orb_->create_policy (
00264                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00265                any);
00266 
00267       // Only schedule the timer, when the rate is not zero
00268       if (this->rate_ != ACE_Time_Value::zero)
00269       {
00270         // Schedule the timer after these policies has been set, because the
00271         // handle_timeout uses these policies, if done in front, the channel
00272         // can crash when the timeout expires before initiazation is ready.
00273         timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00274                                                     0,
00275                                                     this->rate_,
00276                                                     this->rate_);
00277         if (timer_id_ == -1)
00278          return -1;
00279       }
00280     }
00281   catch (const CORBA::Exception&)
00282     {
00283       return -1;
00284     }
00285 #endif /* TAO_HAS_CORBA_MESSAGING */
00286 
00287   return 0;
00288 }

void TAO_CEC_Reactive_ConsumerControl::consumer_not_exist ( TAO_CEC_ProxyPullSupplier proxy  )  [virtual]

Invoked by helper classes when they detect that a consumer no longer exists (i.e. _non_existent() returns true and/or the CORBA::OBJECT_NOT_EXIST exception has been raised).

Reimplemented from TAO_CEC_ConsumerControl.

Definition at line 325 of file CEC_Reactive_ConsumerControl.cpp.

References CORBA::Exception::_tao_print_exception(), ACE_TEXT(), and TAO_CEC_ProxyPullSupplier::disconnect_pull_supplier().

00327 {
00328   try
00329     {
00330       proxy->disconnect_pull_supplier ();
00331     }
00332   catch (const CORBA::Exception& ex)
00333     {
00334       ex._tao_print_exception (
00335         ACE_TEXT ("Reactive_ConsumerControl::consumer_not_exist"));
00336       // Ignore all exceptions..
00337     }
00338 }

void TAO_CEC_Reactive_ConsumerControl::consumer_not_exist ( TAO_CEC_ProxyPushSupplier proxy  )  [virtual]

When pushing an event to the consumer a CORBA::OBJECT_NOT_EXIST exception was raised. The only interpretation is that the object has been destroyed. The strategy has to (at the very least), reclaim all the resources attached to that object.

Reimplemented from TAO_CEC_ConsumerControl.

Definition at line 303 of file CEC_Reactive_ConsumerControl.cpp.

References CORBA::Exception::_tao_print_exception(), ACE_DEBUG, ACE_TEXT(), TAO_CEC_ProxyPushSupplier::disconnect_push_supplier(), LM_DEBUG, and TAO_debug_level.

00305 {
00306   try
00307     {
00308       proxy->disconnect_push_supplier ();
00309 
00310       if (TAO_debug_level >= 10)
00311         {
00312           ACE_DEBUG ((LM_DEBUG,
00313                       ACE_TEXT ("ProxyPushSupplier disconnected due to consumer_not_exist\n")));
00314         }
00315     }
00316   catch (const CORBA::Exception& ex)
00317     {
00318       ex._tao_print_exception (
00319         ACE_TEXT ("Reactive_ConsumerControl::consumer_not_exist"));
00320       // Ignore all exceptions..
00321     }
00322 }

void TAO_CEC_Reactive_ConsumerControl::handle_timeout ( const ACE_Time_Value tv,
const void *  arg 
)

Receive the timeout from the adapter.

Definition at line 200 of file CEC_Reactive_ConsumerControl.cpp.

References policy_current_, and query_consumers().

Referenced by TAO_CEC_ConsumerControl_Adapter::handle_timeout().

00203 {
00204   try
00205     {
00206       // Query the state of the Current object *before* we initiate
00207       // the iteration...
00208       CORBA::PolicyTypeSeq types;
00209       CORBA::PolicyList_var policies =
00210         this->policy_current_->get_policy_overrides (types);
00211 
00212       // Change the timeout
00213       this->policy_current_->set_policy_overrides (this->policy_list_,
00214                                                    CORBA::ADD_OVERRIDE);
00215 
00216       try
00217         {
00218           // Query the state of the consumers...
00219           this->query_consumers ();
00220         }
00221       catch (const CORBA::Exception&)
00222         {
00223           // Ignore all exceptions
00224         }
00225 
00226       this->policy_current_->set_policy_overrides (policies.in (),
00227                                                    CORBA::SET_OVERRIDE);
00228       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00229         {
00230           policies[i]->destroy ();
00231         }
00232     }
00233   catch (const CORBA::Exception&)
00234     {
00235       // Ignore all exceptions
00236     }
00237 }

bool TAO_CEC_Reactive_ConsumerControl::need_to_disconnect ( PortableServer::ServantBase proxy  )  [virtual]

Do we need to disconnect this supplier? The parameter type for proxy is PortableServer::ServantBase* due to the fact that this method will be used for TAO_CEC_ProxyPushSupplier's and TAO_CEC_ProxyPullSupplier's.

Reimplemented from TAO_CEC_ConsumerControl.

Definition at line 122 of file CEC_Reactive_ConsumerControl.cpp.

00124 {
00125   bool disconnect = true;
00126 
00127 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00128   if (this->typed_event_channel_)
00129     {
00130       // Typed EC
00131       TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00132       if (this->typed_event_channel_->
00133           get_servant_retry_map ().find (proxy, entry) == 0)
00134         {
00135           ++entry->int_id_;
00136           if (entry->int_id_ <= this->retries_)
00137             {
00138               disconnect = false;
00139             }
00140         }
00141     }
00142   else
00143     {
00144 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00145 
00146   // Un-typed EC
00147   TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00148   if (this->event_channel_->
00149       get_servant_retry_map ().find (proxy, entry) == 0)
00150     {
00151       ++entry->int_id_;
00152       if (entry->int_id_ <= this->retries_)
00153         {
00154           disconnect = false;
00155         }
00156     }
00157 
00158 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00159     }
00160 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00161 
00162   return disconnect;
00163 }

void TAO_CEC_Reactive_ConsumerControl::query_consumers ( void   )  [private]

Check if the consumers still exists. It is a helper method for handle_timeout() to isolate the exceptions.

Definition at line 96 of file CEC_Reactive_ConsumerControl.cpp.

References TAO_CEC_EventChannel::consumer_admin(), event_channel_, and TAO_CEC_ConsumerAdmin::for_each().

Referenced by handle_timeout().

00097 {
00098   TAO_CEC_Ping_Push_Consumer push_worker (this);
00099 
00100 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00101   if (this->typed_event_channel_)
00102     {
00103       // Typed EC
00104       this->typed_event_channel_->typed_consumer_admin ()->for_each (&push_worker);
00105     }
00106   else
00107     {
00108 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00109 
00110   // Un-typed EC
00111   this->event_channel_->consumer_admin ()->for_each (&push_worker);
00112 
00113   TAO_CEC_Ping_Pull_Consumer pull_worker (this);
00114   this->event_channel_->consumer_admin ()->for_each (&pull_worker);
00115 
00116 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00117     }
00118 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00119 }

int TAO_CEC_Reactive_ConsumerControl::shutdown ( void   )  [virtual]

Reimplemented from TAO_CEC_ConsumerControl.

Definition at line 291 of file CEC_Reactive_ConsumerControl.cpp.

References adapter_, ACE_Reactor::cancel_timer(), ACE_Event_Handler::reactor(), reactor_, and timer_id_.

00292 {
00293   int r = 0;
00294 
00295 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00296   r = this->reactor_->cancel_timer (timer_id_);
00297 #endif /* TAO_HAS_CORBA_MESSAGING */
00298   this->adapter_.reactor (0);
00299   return r;
00300 }

void TAO_CEC_Reactive_ConsumerControl::successful_transmission ( PortableServer::ServantBase proxy  )  [virtual]

Allow others to inform us when a send or receive was successful.

Reimplemented from TAO_CEC_ConsumerControl.

Definition at line 166 of file CEC_Reactive_ConsumerControl.cpp.

00168 {
00169 
00170 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00171   if (this->typed_event_channel_)
00172     {
00173       // Typed EC
00174       TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00175       if (this->typed_event_channel_->
00176           get_servant_retry_map ().find (proxy, entry) == 0)
00177         {
00178           entry->int_id_ = 0;
00179         }
00180     }
00181   else
00182     {
00183 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00184 
00185   // Un-typed EC
00186   TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00187   if (this->event_channel_->
00188       get_servant_retry_map ().find (proxy, entry) == 0)
00189     {
00190       entry->int_id_ = 0;
00191     }
00192 
00193 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00194     }
00195 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00196 
00197 }

void TAO_CEC_Reactive_ConsumerControl::system_exception ( TAO_CEC_ProxyPushSupplier proxy,
CORBA::SystemException  
) [virtual]

Some system exception was rasied while trying to push an event.

Reimplemented from TAO_CEC_ConsumerControl.

Definition at line 341 of file CEC_Reactive_ConsumerControl.cpp.

References ACE_DEBUG, ACE_TEXT(), TAO_CEC_ProxyPushSupplier::disconnect_push_supplier(), LM_DEBUG, and TAO_debug_level.

00344 {
00345   try
00346     {
00347       if (this->need_to_disconnect (proxy))
00348         {
00349           proxy->disconnect_push_supplier ();
00350 
00351           if (TAO_debug_level >= 10)
00352             {
00353               ACE_DEBUG ((LM_DEBUG,
00354                       ACE_TEXT ("ProxyPushSupplier disconnected due to consumer_not_exist\n")));
00355             }
00356         }
00357     }
00358   catch (const CORBA::Exception&)
00359     {
00360       // Ignore all exceptions..
00361     }
00362 }


Member Data Documentation

TAO_CEC_ConsumerControl_Adapter TAO_CEC_Reactive_ConsumerControl::adapter_ [private]

The Adapter for the reactor events.

Definition at line 144 of file CEC_Reactive_ConsumerControl.h.

Referenced by shutdown().

TAO_CEC_EventChannel* TAO_CEC_Reactive_ConsumerControl::event_channel_ [private]

The event channel.

Definition at line 147 of file CEC_Reactive_ConsumerControl.h.

Referenced by query_consumers().

CORBA::ORB_var TAO_CEC_Reactive_ConsumerControl::orb_ [private]

The ORB.

Definition at line 155 of file CEC_Reactive_ConsumerControl.h.

Referenced by activate().

CORBA::PolicyCurrent_var TAO_CEC_Reactive_ConsumerControl::policy_current_ [private]

To control the timeout policy in the thread.

Definition at line 158 of file CEC_Reactive_ConsumerControl.h.

Referenced by activate(), and handle_timeout().

CORBA::PolicyList TAO_CEC_Reactive_ConsumerControl::policy_list_ [private]

Precomputed policy list to the set timeout.

Definition at line 161 of file CEC_Reactive_ConsumerControl.h.

Referenced by activate().

ACE_Time_Value TAO_CEC_Reactive_ConsumerControl::rate_ [private]

The polling rate.

Definition at line 135 of file CEC_Reactive_ConsumerControl.h.

ACE_Reactor* TAO_CEC_Reactive_ConsumerControl::reactor_ [private]

The ORB reactor.

Definition at line 164 of file CEC_Reactive_ConsumerControl.h.

Referenced by activate(), and shutdown().

unsigned int TAO_CEC_Reactive_ConsumerControl::retries_ [private]

The number of retries per proxy until it is disconnected.

Definition at line 141 of file CEC_Reactive_ConsumerControl.h.

ACE_Time_Value TAO_CEC_Reactive_ConsumerControl::timeout_ [private]

The polling timeout.

Definition at line 138 of file CEC_Reactive_ConsumerControl.h.

long TAO_CEC_Reactive_ConsumerControl::timer_id_ [private]

The timer id.

Definition at line 168 of file CEC_Reactive_ConsumerControl.h.

Referenced by activate(), and shutdown().


The documentation for this class was generated from the following files:
Generated on Tue Feb 2 17:45:01 2010 for TAO_CosEvent by  doxygen 1.4.7