TAO_EC_Reactive_ConsumerControl Class Reference

ConsumerControl. More...

#include <EC_Reactive_ConsumerControl.h>

Inheritance diagram for TAO_EC_Reactive_ConsumerControl:

Inheritance graph
[legend]
Collaboration diagram for TAO_EC_Reactive_ConsumerControl:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TAO_EC_Reactive_ConsumerControl (const ACE_Time_Value &rate, const ACE_Time_Value &timeout, TAO_EC_Event_Channel_Base *event_channel, CORBA::ORB_ptr orb)
virtual ~TAO_EC_Reactive_ConsumerControl (void)
 Destructor.

void handle_timeout (const ACE_Time_Value &tv, const void *arg)
 Receive the timeout from the adapter.

virtual int activate (void)
virtual int shutdown (void)
virtual void consumer_not_exist (TAO_EC_ProxyPushSupplier *proxy)
virtual void system_exception (TAO_EC_ProxyPushSupplier *proxy, CORBA::SystemException &)

Private Member Functions

void query_consumers (void)

Private Attributes

ACE_Time_Value rate_
 The polling rate.

ACE_Time_Value timeout_
 The polling timeout.

TAO_EC_ConsumerControl_Adapter adapter_
 The Adapter for the reactor events.

TAO_EC_Event_Channel_Baseevent_channel_
 The event channel.

CORBA::ORB_var orb_
 The ORB.

CORBA::PolicyCurrent_var policy_current_
 To control the timeout policy in the thread.

CORBA::PolicyList policy_list_
 Precomputed policy list to the set timeout.

ACE_Reactorreactor_
 The ORB reactor.

long timer_id_
 The timer id.


Detailed Description

ConsumerControl.

Defines the interface for the consumer control strategy. This strategy handles misbehaving or failing consumers.

Definition at line 71 of file EC_Reactive_ConsumerControl.h.


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_EC_Reactive_ConsumerControl::TAO_EC_Reactive_ConsumerControl const ACE_Time_Value rate,
const ACE_Time_Value timeout,
TAO_EC_Event_Channel_Base event_channel,
CORBA::ORB_ptr  orb
 

Constructor. It does not assume ownership of the event_channel parameter.

Definition at line 23 of file EC_Reactive_ConsumerControl.cpp.

References TAO_HAS_CORBA_MESSAGING.

00027   : rate_ (rate),
00028     timeout_ (timeout),
00029     adapter_ (this),
00030     event_channel_ (ec),
00031     orb_ (CORBA::ORB::_duplicate (orb))
00032 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00033     , timer_id_ (-1)
00034 #endif /* TAO_HAS_CORBA_MESSAGING != 0*/
00035 {
00036   this->reactor_ =
00037     this->orb_->orb_core ()->reactor ();
00038 }

TAO_EC_Reactive_ConsumerControl::~TAO_EC_Reactive_ConsumerControl void   )  [virtual]
 

Destructor.

Definition at line 40 of file EC_Reactive_ConsumerControl.cpp.

00041 {
00042 }


Member Function Documentation

int TAO_EC_Reactive_ConsumerControl::activate void   )  [virtual]
 

Activate any internal threads or timers used to poll the state of the consumers

Reimplemented from TAO_EC_ConsumerControl.

Definition at line 93 of file EC_Reactive_ConsumerControl.cpp.

References rate_, ACE_Reactor::schedule_timer(), and ACE_Time_Value::usec().

00094 {
00095 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00096   try
00097     {
00098       // Get the PolicyCurrent object
00099       CORBA::Object_var tmp =
00100         this->orb_->resolve_initial_references ("PolicyCurrent");
00101 
00102       this->policy_current_ =
00103         CORBA::PolicyCurrent::_narrow (tmp.in ());
00104 
00105       // Timeout for polling state (default = 10 msec)
00106       TimeBase::TimeT timeout = timeout_.usec() * 10;
00107       CORBA::Any any;
00108       any <<= timeout;
00109 
00110       this->policy_list_.length (1);
00111       this->policy_list_[0] =
00112         this->orb_->create_policy (
00113                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00114                any);
00115 
00116       // Only schedule the timer, when the rate is not zero
00117       if (this->rate_ != ACE_Time_Value::zero)
00118       {
00119         // Schedule the timer after these policies has been set, because the
00120         // handle_timeout uses these policies, if done in front, the channel
00121         // can crash when the timeout expires before initiazation is ready.
00122         timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00123                                                     0,
00124                                                     this->rate_,
00125                                                     this->rate_);
00126         if (timer_id_ == -1)
00127           return -1;
00128       }
00129     }
00130   catch (const CORBA::Exception&)
00131     {
00132       return -1;
00133     }
00134 #endif /* TAO_HAS_CORBA_MESSAGING */
00135 
00136   return 0;
00137 }

void TAO_EC_Reactive_ConsumerControl::consumer_not_exist TAO_EC_ProxyPushSupplier proxy  )  [virtual]
 

When pushing an event to the consumer a CORBA::OBJECT_NOT_EXIST exception was raised. The only interpretation is that the object has been destroyed. The strategy has to (at the very least), reclaim all the resources attached to that object.

Reimplemented from TAO_EC_ConsumerControl.

Definition at line 152 of file EC_Reactive_ConsumerControl.cpp.

References TAO_EC_ProxyPushSupplier::disconnect_push_supplier().

00154 {
00155   try
00156     {
00157       //ACE_DEBUG ((LM_DEBUG,
00158       //            "EC_Reactive_ConsumerControl(%P|%t) - "
00159       //            "Consumer %x does not exists\n", long(proxy)));
00160       proxy->disconnect_push_supplier ();
00161     }
00162   catch (const CORBA::Exception& ex)
00163     {
00164       ex._tao_print_exception ("Reactive_ConsumerControl::consumer_not_exist");
00165       // Ignore all exceptions..
00166     }
00167 }

void TAO_EC_Reactive_ConsumerControl::handle_timeout const ACE_Time_Value tv,
const void *  arg
 

Receive the timeout from the adapter.

Definition at line 52 of file EC_Reactive_ConsumerControl.cpp.

References query_consumers().

Referenced by TAO_EC_ConsumerControl_Adapter::handle_timeout().

00055 {
00056   // NOTE, setting RELATIVE_RT_TIMEOUT_POLICY for the duration of
00057   // query_consumers () below has greater impact than desired.  For
00058   // example, while we are pinging consumers here, a nested upcall,
00059   // which requires making remote calls may come into the ORB.  Those
00060   // remote calls will be carried out with with
00061   // RELATIVE_RT_TIMEOUT_POLICY set here in effect.
00062 
00063   // @@ TODO: should use Guard to set and reset policies.
00064   try
00065     {
00066       // Query the state of the Current object *before* we initiate
00067       // the iteration...
00068       CORBA::PolicyTypeSeq types;
00069       CORBA::PolicyList_var policies =
00070         this->policy_current_->get_policy_overrides (types);
00071 
00072       // Change the timeout
00073       this->policy_current_->set_policy_overrides (this->policy_list_,
00074                                                    CORBA::ADD_OVERRIDE);
00075 
00076       // Query the state of the consumers...
00077       this->query_consumers ();
00078 
00079       this->policy_current_->set_policy_overrides (policies.in (),
00080                                                    CORBA::SET_OVERRIDE);
00081       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00082         {
00083           policies[i]->destroy ();
00084         }
00085     }
00086   catch (const CORBA::Exception&)
00087     {
00088       // Ignore all exceptions
00089     }
00090 }

void TAO_EC_Reactive_ConsumerControl::query_consumers void   )  [private]
 

Check if the consumers still exists. It is a helper method for handle_timeout() to isolate the exceptions.

Definition at line 45 of file EC_Reactive_ConsumerControl.cpp.

References TAO_EC_Event_Channel_Base::for_each_consumer().

Referenced by handle_timeout().

00046 {
00047   TAO_EC_Ping_Consumer worker (this);
00048   this->event_channel_->for_each_consumer (&worker);
00049 }

int TAO_EC_Reactive_ConsumerControl::shutdown void   )  [virtual]
 

Reimplemented from TAO_EC_ConsumerControl.

Definition at line 140 of file EC_Reactive_ConsumerControl.cpp.

References ACE_Reactor::cancel_timer(), and ACE_Event_Handler::reactor().

00141 {
00142   int r = 0;
00143 
00144 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00145   r = this->reactor_->cancel_timer (timer_id_);
00146 #endif /* TAO_HAS_CORBA_MESSAGING */
00147   this->adapter_.reactor (0);
00148   return r;
00149 }

void TAO_EC_Reactive_ConsumerControl::system_exception TAO_EC_ProxyPushSupplier proxy,
CORBA::SystemException
[virtual]
 

Some system exception was raised while trying to contact the consumer

Reimplemented from TAO_EC_ConsumerControl.

Definition at line 170 of file EC_Reactive_ConsumerControl.cpp.

References TAO_EC_ProxyPushSupplier::disconnect_push_supplier().

00173 {
00174   try
00175     {
00176       // The current implementation is very strict, and kicks out a
00177       // client on the first system exception. We may
00178       // want to be more lenient in the future, for example,
00179       // this is TAO's minor code for a failed connection.
00180       //
00181       // if (CORBA::TRANSIENT::_narrow (&exception) != 0
00182       //     && exception->minor () == 0x54410085)
00183       //   return;
00184 
00185       // Anything else is serious, including timeouts...
00186       proxy->disconnect_push_supplier ();
00187     }
00188   catch (const CORBA::Exception&)
00189     {
00190       // Ignore all exceptions..
00191     }
00192 }


Member Data Documentation

TAO_EC_ConsumerControl_Adapter TAO_EC_Reactive_ConsumerControl::adapter_ [private]
 

The Adapter for the reactor events.

Definition at line 108 of file EC_Reactive_ConsumerControl.h.

TAO_EC_Event_Channel_Base* TAO_EC_Reactive_ConsumerControl::event_channel_ [private]
 

The event channel.

Definition at line 111 of file EC_Reactive_ConsumerControl.h.

CORBA::ORB_var TAO_EC_Reactive_ConsumerControl::orb_ [private]
 

The ORB.

Definition at line 114 of file EC_Reactive_ConsumerControl.h.

CORBA::PolicyCurrent_var TAO_EC_Reactive_ConsumerControl::policy_current_ [private]
 

To control the timeout policy in the thread.

Definition at line 117 of file EC_Reactive_ConsumerControl.h.

CORBA::PolicyList TAO_EC_Reactive_ConsumerControl::policy_list_ [private]
 

Precomputed policy list to the set timeout.

Definition at line 120 of file EC_Reactive_ConsumerControl.h.

ACE_Time_Value TAO_EC_Reactive_ConsumerControl::rate_ [private]
 

The polling rate.

Definition at line 102 of file EC_Reactive_ConsumerControl.h.

Referenced by activate().

ACE_Reactor* TAO_EC_Reactive_ConsumerControl::reactor_ [private]
 

The ORB reactor.

Definition at line 123 of file EC_Reactive_ConsumerControl.h.

ACE_Time_Value TAO_EC_Reactive_ConsumerControl::timeout_ [private]
 

The polling timeout.

Definition at line 105 of file EC_Reactive_ConsumerControl.h.

long TAO_EC_Reactive_ConsumerControl::timer_id_ [private]
 

The timer id.

Definition at line 127 of file EC_Reactive_ConsumerControl.h.


The documentation for this class was generated from the following files:
Generated on Sun Jan 27 13:42:52 2008 for TAO_RTEvent by doxygen 1.3.6