TAO_EC_Reactive_ConsumerControl Class Reference

ConsumerControl. More...

#include <EC_Reactive_ConsumerControl.h>

Inheritance diagram for TAO_EC_Reactive_ConsumerControl:

Inheritance graph
[legend]
Collaboration diagram for TAO_EC_Reactive_ConsumerControl:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TAO_EC_Reactive_ConsumerControl (const ACE_Time_Value &rate, const ACE_Time_Value &timeout, TAO_EC_Event_Channel_Base *event_channel, CORBA::ORB_ptr orb)
virtual ~TAO_EC_Reactive_ConsumerControl (void)
 Destructor.

void handle_timeout (const ACE_Time_Value &tv, const void *arg)
 Receive the timeout from the adapter.

virtual int activate (void)
virtual int shutdown (void)
virtual void consumer_not_exist (TAO_EC_ProxyPushSupplier *proxy)
virtual void system_exception (TAO_EC_ProxyPushSupplier *proxy, CORBA::SystemException &)

Private Member Functions

void query_consumers ()

Private Attributes

ACE_Time_Value rate_
 The polling rate.

ACE_Time_Value timeout_
 The polling timeout.

TAO_EC_ConsumerControl_Adapter adapter_
 The Adapter for the reactor events.

TAO_EC_Event_Channel_Baseevent_channel_
 The event channel.

CORBA::ORB_var orb_
 The ORB.

CORBA::PolicyCurrent_var policy_current_
 To control the timeout policy in the thread.

CORBA::PolicyList policy_list_
 Precomputed policy list to the set timeout.

ACE_Reactorreactor_
 The ORB reactor.

long timer_id_
 The timer id.


Detailed Description

ConsumerControl.

Defines the interface for the consumer control strategy. This strategy handles misbehaving or failing consumers.

Definition at line 71 of file EC_Reactive_ConsumerControl.h.


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_EC_Reactive_ConsumerControl::TAO_EC_Reactive_ConsumerControl const ACE_Time_Value rate,
const ACE_Time_Value timeout,
TAO_EC_Event_Channel_Base event_channel,
CORBA::ORB_ptr  orb
 

Constructor. It does not assume ownership of the event_channel parameter.

Definition at line 23 of file EC_Reactive_ConsumerControl.cpp.

References TAO_HAS_CORBA_MESSAGING.

00027   : rate_ (rate),
00028     timeout_ (timeout),
00029     adapter_ (this),
00030     event_channel_ (ec),
00031     orb_ (CORBA::ORB::_duplicate (orb))
00032 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00033     , timer_id_ (-1)
00034 #endif /* TAO_HAS_CORBA_MESSAGING != 0*/
00035 {
00036   this->reactor_ =
00037     this->orb_->orb_core ()->reactor ();
00038 }

TAO_EC_Reactive_ConsumerControl::~TAO_EC_Reactive_ConsumerControl void   )  [virtual]
 

Destructor.

Definition at line 40 of file EC_Reactive_ConsumerControl.cpp.

00041 {
00042 }


Member Function Documentation

int TAO_EC_Reactive_ConsumerControl::activate void   )  [virtual]
 

Activate any internal threads or timers used to poll the state of the consumers

Reimplemented from TAO_EC_ConsumerControl.

Definition at line 105 of file EC_Reactive_ConsumerControl.cpp.

References CORBA::PolicyCurrent::_narrow(), ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_NEW_ENV, TAO::unbounded_object_reference_sequence< Policy, Policy_var >::length(), rate_, ACE_Reactor::schedule_timer(), and ACE_Time_Value::usec().

00106 {
00107 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00108   ACE_TRY_NEW_ENV
00109     {
00110       // Get the PolicyCurrent object
00111       CORBA::Object_var tmp =
00112         this->orb_->resolve_initial_references ("PolicyCurrent"
00113                                                  ACE_ENV_ARG_PARAMETER);
00114       ACE_TRY_CHECK;
00115 
00116       this->policy_current_ =
00117         CORBA::PolicyCurrent::_narrow (tmp.in ()
00118                                         ACE_ENV_ARG_PARAMETER);
00119       ACE_TRY_CHECK;
00120 
00121       // Timeout for polling state (default = 10 msec)
00122       TimeBase::TimeT timeout = timeout_.usec() * 10;
00123       CORBA::Any any;
00124       any <<= timeout;
00125 
00126       this->policy_list_.length (1);
00127       this->policy_list_[0] =
00128         this->orb_->create_policy (
00129                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00130                any
00131                 ACE_ENV_ARG_PARAMETER);
00132       ACE_TRY_CHECK;
00133 
00134       // Only schedule the timer, when the rate is not zero
00135       if (this->rate_ != ACE_Time_Value::zero)
00136       {
00137         // Schedule the timer after these policies has been set, because the
00138         // handle_timeout uses these policies, if done in front, the channel
00139         // can crash when the timeout expires before initiazation is ready.
00140         timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00141                                                     0,
00142                                                     this->rate_,
00143                                                     this->rate_);
00144         if (timer_id_ == -1)
00145           return -1;
00146       }
00147     }
00148   ACE_CATCHANY
00149     {
00150       return -1;
00151     }
00152   ACE_ENDTRY;
00153 #endif /* TAO_HAS_CORBA_MESSAGING */
00154 
00155   return 0;
00156 }

void TAO_EC_Reactive_ConsumerControl::consumer_not_exist TAO_EC_ProxyPushSupplier proxy  )  [virtual]
 

When pushing an event to the consumer a CORBA::OBJECT_NOT_EXIST exception was raised. The only interpretation is that the object has been destroyed. The strategy has to (at the very least), reclaim all the resources attached to that object.

Reimplemented from TAO_EC_ConsumerControl.

Definition at line 171 of file EC_Reactive_ConsumerControl.cpp.

References ACE_ANY_EXCEPTION, ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_PRINT_EXCEPTION, ACE_TRY, and ACE_TRY_CHECK.

00174 {
00175   ACE_TRY
00176     {
00177       //ACE_DEBUG ((LM_DEBUG,
00178       //            "EC_Reactive_ConsumerControl(%P|%t) - "
00179       //            "Consumer %x does not exists\n", long(proxy)));
00180       proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00181       ACE_TRY_CHECK;
00182     }
00183   ACE_CATCHANY
00184     {
00185       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00186                            "Reactive_ConsumerControl::consumer_not_exist");
00187       // Ignore all exceptions..
00188     }
00189   ACE_ENDTRY;
00190 }

void TAO_EC_Reactive_ConsumerControl::handle_timeout const ACE_Time_Value tv,
const void *  arg
 

Receive the timeout from the adapter.

Definition at line 55 of file EC_Reactive_ConsumerControl.cpp.

References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_NEW_ENV, and query_consumers().

Referenced by TAO_EC_ConsumerControl_Adapter::handle_timeout().

00058 {
00059   // NOTE, setting RELATIVE_RT_TIMEOUT_POLICY for the duration of
00060   // query_consumers () below has greater impact than desired.  For
00061   // example, while we are pinging consumers here, a nested upcall,
00062   // which requires making remote calls may come into the ORB.  Those
00063   // remote calls will be carried out with with
00064   // RELATIVE_RT_TIMEOUT_POLICY set here in effect.
00065 
00066   // @@ TODO: should use Guard to set and reset policies.
00067   ACE_TRY_NEW_ENV
00068     {
00069       // Query the state of the Current object *before* we initiate
00070       // the iteration...
00071       CORBA::PolicyTypeSeq types;
00072       CORBA::PolicyList_var policies =
00073         this->policy_current_->get_policy_overrides (types
00074                                                       ACE_ENV_ARG_PARAMETER);
00075       ACE_TRY_CHECK;
00076 
00077       // Change the timeout
00078       this->policy_current_->set_policy_overrides (this->policy_list_,
00079                                                    CORBA::ADD_OVERRIDE
00080                                                     ACE_ENV_ARG_PARAMETER);
00081       ACE_TRY_CHECK;
00082 
00083       // Query the state of the consumers...
00084       this->query_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
00085       ACE_TRY_CHECK;
00086 
00087       this->policy_current_->set_policy_overrides (policies.in (),
00088                                                    CORBA::SET_OVERRIDE
00089                                                     ACE_ENV_ARG_PARAMETER);
00090       ACE_TRY_CHECK;
00091       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00092         {
00093           policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00094           ACE_TRY_CHECK;
00095         }
00096     }
00097   ACE_CATCHANY
00098     {
00099       // Ignore all exceptions
00100     }
00101   ACE_ENDTRY;
00102 }

void TAO_EC_Reactive_ConsumerControl::query_consumers  )  [private]
 

Check if the consumers still exists. It is a helper method for handle_timeout() to isolate the exceptions.

Definition at line 45 of file EC_Reactive_ConsumerControl.cpp.

References ACE_CHECK, ACE_ENV_ARG_PARAMETER, and TAO_EC_Event_Channel_Base::for_each_consumer().

Referenced by handle_timeout().

00047 {
00048   TAO_EC_Ping_Consumer worker (this);
00049   this->event_channel_->for_each_consumer (&worker
00050                                            ACE_ENV_ARG_PARAMETER);
00051   ACE_CHECK;
00052 }

int TAO_EC_Reactive_ConsumerControl::shutdown void   )  [virtual]
 

Reimplemented from TAO_EC_ConsumerControl.

Definition at line 159 of file EC_Reactive_ConsumerControl.cpp.

References ACE_Reactor::cancel_timer(), and ACE_Event_Handler::reactor().

00160 {
00161   int r = 0;
00162 
00163 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00164   r = this->reactor_->cancel_timer (timer_id_);
00165 #endif /* TAO_HAS_CORBA_MESSAGING */
00166   this->adapter_.reactor (0);
00167   return r;
00168 }

void TAO_EC_Reactive_ConsumerControl::system_exception TAO_EC_ProxyPushSupplier proxy,
CORBA::SystemException
[virtual]
 

Some system exception was raised while trying to contact the consumer

Reimplemented from TAO_EC_ConsumerControl.

Definition at line 193 of file EC_Reactive_ConsumerControl.cpp.

References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, and TAO_EC_ProxyPushSupplier::disconnect_push_supplier().

00197 {
00198   ACE_TRY
00199     {
00200       // The current implementation is very strict, and kicks out a
00201       // client on the first system exception. We may
00202       // want to be more lenient in the future, for example,
00203       // this is TAO's minor code for a failed connection.
00204       //
00205       // if (CORBA::TRANSIENT::_narrow (&exception) != 0
00206       //     && exception->minor () == 0x54410085)
00207       //   return;
00208 
00209       // Anything else is serious, including timeouts...
00210       proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00211       ACE_TRY_CHECK;
00212     }
00213   ACE_CATCHANY
00214     {
00215       // Ignore all exceptions..
00216     }
00217   ACE_ENDTRY;
00218 }


Member Data Documentation

TAO_EC_ConsumerControl_Adapter TAO_EC_Reactive_ConsumerControl::adapter_ [private]
 

The Adapter for the reactor events.

Definition at line 110 of file EC_Reactive_ConsumerControl.h.

TAO_EC_Event_Channel_Base* TAO_EC_Reactive_ConsumerControl::event_channel_ [private]
 

The event channel.

Definition at line 113 of file EC_Reactive_ConsumerControl.h.

CORBA::ORB_var TAO_EC_Reactive_ConsumerControl::orb_ [private]
 

The ORB.

Definition at line 116 of file EC_Reactive_ConsumerControl.h.

CORBA::PolicyCurrent_var TAO_EC_Reactive_ConsumerControl::policy_current_ [private]
 

To control the timeout policy in the thread.

Definition at line 119 of file EC_Reactive_ConsumerControl.h.

CORBA::PolicyList TAO_EC_Reactive_ConsumerControl::policy_list_ [private]
 

Precomputed policy list to the set timeout.

Definition at line 122 of file EC_Reactive_ConsumerControl.h.

ACE_Time_Value TAO_EC_Reactive_ConsumerControl::rate_ [private]
 

The polling rate.

Definition at line 104 of file EC_Reactive_ConsumerControl.h.

Referenced by activate().

ACE_Reactor* TAO_EC_Reactive_ConsumerControl::reactor_ [private]
 

The ORB reactor.

Definition at line 125 of file EC_Reactive_ConsumerControl.h.

ACE_Time_Value TAO_EC_Reactive_ConsumerControl::timeout_ [private]
 

The polling timeout.

Definition at line 107 of file EC_Reactive_ConsumerControl.h.

long TAO_EC_Reactive_ConsumerControl::timer_id_ [private]
 

The timer id.

Definition at line 129 of file EC_Reactive_ConsumerControl.h.


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 13:15:27 2006 for TAO_RTEvent by doxygen 1.3.6