TAO_ECG_Reconnect_ConsumerEC_Control Class Reference

Consumer event channel control. More...

#include <ECG_Reconnect_ConsumerEC_Control.h>

Inheritance diagram for TAO_ECG_Reconnect_ConsumerEC_Control:

Inheritance graph
[legend]
Collaboration diagram for TAO_ECG_Reconnect_ConsumerEC_Control:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TAO_ECG_Reconnect_ConsumerEC_Control (const ACE_Time_Value &rate, const ACE_Time_Value &timeout, TAO_EC_Gateway_IIOP *gateway, CORBA::ORB_ptr orb)
virtual ~TAO_ECG_Reconnect_ConsumerEC_Control (void)
 Destructor.
void handle_timeout (const ACE_Time_Value &tv, const void *arg)
 Receive the timeout from the adapter.
virtual int activate (void)
virtual int shutdown (void)
virtual void event_channel_not_exist (TAO_EC_Gateway_IIOP *gateway)
virtual void system_exception (TAO_EC_Gateway_IIOP *gateway, CORBA::SystemException &)

Private Member Functions

void query_eventchannel (void)
void try_reconnect (void)
 Look if we can reconnect again.
void reconnect (void)
 Reconnect to the consumer ec.

Private Attributes

ACE_Time_Value rate_
 The polling rate.
ACE_Time_Value timeout_
 The polling timeout.
TAO_ECG_Reconnect_ConsumerEC_Control_Adapter adapter_
 The Adapter for the reactor events.
TAO_EC_Gateway_IIOPgateway_
 The gateway.
CORBA::ORB_var orb_
 The ORB.
CORBA::PolicyCurrent_var policy_current_
 To control the timeout policy in the thread.
CORBA::PolicyList policy_list_
 Precomputed policy list to the set timeout.
ACE_Reactorreactor_
 The ORB reactor.
int is_consumer_ec_connected_
 Do we have a connection to the consumer ec.
long timer_id_
 The timer id.

Detailed Description

Consumer event channel control.

Defines the interface for the consumer event channel control strategy. This strategy handles misbehaving or failing consumer event channels.

This control tries to reconnect to the consumer ec incase of a connection loss and then does a reconnect when the connection is back again. When also a restart of the consumer ec must be handled correctly, then the consumer ec must use a persistent IOR.

Definition at line 71 of file ECG_Reconnect_ConsumerEC_Control.h.


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_ECG_Reconnect_ConsumerEC_Control::TAO_ECG_Reconnect_ConsumerEC_Control ( const ACE_Time_Value rate,
const ACE_Time_Value timeout,
TAO_EC_Gateway_IIOP gateway,
CORBA::ORB_ptr  orb 
)

Constructor. It does not assume ownership of the event_channel parameter.

Definition at line 14 of file ECG_Reconnect_ConsumerEC_Control.cpp.

00018   : rate_ (rate),
00019     timeout_ (timeout),
00020     adapter_ (this),
00021     gateway_ (gateway),
00022     orb_ (CORBA::ORB::_duplicate (orb)),
00023     is_consumer_ec_connected_ (1)
00024 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00025     , timer_id_ (-1)
00026 #endif /* TAO_HAS_CORBA_MESSAGING != 0*/
00027 {
00028   this->reactor_ =
00029     this->orb_->orb_core ()->reactor ();
00030 }

TAO_ECG_Reconnect_ConsumerEC_Control::~TAO_ECG_Reconnect_ConsumerEC_Control ( void   )  [virtual]

Destructor.

Definition at line 32 of file ECG_Reconnect_ConsumerEC_Control.cpp.

00033 {
00034 }


Member Function Documentation

int TAO_ECG_Reconnect_ConsumerEC_Control::activate ( void   )  [virtual]

Activate any internal threads or timers used to poll the state of the event channel.

Reimplemented from TAO_ECG_ConsumerEC_Control.

Definition at line 148 of file ECG_Reconnect_ConsumerEC_Control.cpp.

References TAO_Pseudo_Var_T< T >::in(), orb_, policy_current_, policy_list_, reactor_, ACE_Reactor::schedule_timer(), timeout_, timer_id_, ACE_Time_Value::usec(), and ACE_Time_Value::zero.

00149 {
00150 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00151   try
00152     {
00153       // Get the PolicyCurrent object
00154       CORBA::Object_var tmp =
00155         this->orb_->resolve_initial_references ("PolicyCurrent");
00156 
00157       this->policy_current_ =
00158         CORBA::PolicyCurrent::_narrow (tmp.in ());
00159 
00160       // Timeout for polling state (default = 10 msec)
00161       TimeBase::TimeT timeout = timeout_.usec() * 10;
00162       CORBA::Any any;
00163       any <<= timeout;
00164 
00165       this->policy_list_.length (1);
00166       this->policy_list_[0] =
00167         this->orb_->create_policy (
00168                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00169                any);
00170 
00171       // Only schedule the timer, when the rate is not zero
00172       if (this->rate_ != ACE_Time_Value::zero)
00173       {
00174         // Schedule the timer after these policies has been set, because the
00175         // handle_timeout uses these policies, if done in front, the channel
00176         // can crash when the timeout expires before initiazation is ready.
00177         timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00178                                                     0,
00179                                                     this->rate_,
00180                                                     this->rate_);
00181         if (timer_id_ == -1)
00182           return -1;
00183       }
00184     }
00185   catch (const CORBA::Exception&)
00186     {
00187       return -1;
00188     }
00189 #endif /* TAO_HAS_CORBA_MESSAGING */
00190 
00191   return 0;
00192 }

void TAO_ECG_Reconnect_ConsumerEC_Control::event_channel_not_exist ( TAO_EC_Gateway_IIOP gateway  )  [virtual]

When pushing an event to the event channel a CORBA::OBJECT_NOT_EXIST exception was raised. The only interpretation is that the object has been destroyed. The strategy has to (at the very least), reclaim all the resources attached to that object.

Reimplemented from TAO_ECG_ConsumerEC_Control.

Definition at line 208 of file ECG_Reconnect_ConsumerEC_Control.cpp.

References CORBA::Exception::_tao_print_exception(), TAO_EC_Gateway_IIOP::cleanup_consumer_proxies(), is_consumer_ec_connected_, and TAO_EC_Gateway_IIOP::suspend_supplier_ec().

Referenced by query_eventchannel().

00210 {
00211   try
00212     {
00213       //ACE_DEBUG ((LM_DEBUG,
00214       //            "ECG_Reconnect_ConsumerControl(%P|%t) - "
00215       //            "channel %x does not exists\n"));
00216       is_consumer_ec_connected_ = 0;
00217 
00218       gateway->suspend_supplier_ec ();
00219 
00220       gateway->cleanup_consumer_proxies ();
00221     }
00222   catch (const CORBA::Exception& ex)
00223     {
00224       ex._tao_print_exception (
00225         "TAO_EC_Reconnect_ConsumerControl::event_channel_not_exist");
00226       // Ignore all exceptions..
00227     }
00228 }

void TAO_ECG_Reconnect_ConsumerEC_Control::handle_timeout ( const ACE_Time_Value tv,
const void *  arg 
)

Receive the timeout from the adapter.

Definition at line 108 of file ECG_Reconnect_ConsumerEC_Control.cpp.

References policy_current_, and query_eventchannel().

Referenced by TAO_ECG_Reconnect_ConsumerEC_Control_Adapter::handle_timeout().

00111 {
00112   // NOTE, setting RELATIVE_RT_TIMEOUT_POLICY for the duration of
00113   // query_eventchannel () below has greater impact than desired.  For
00114   // example, while we are pinging consumers here, a nested upcall,
00115   // which requires making remote calls may come into the ORB.  Those
00116   // remote calls will be carried out with with
00117   // RELATIVE_RT_TIMEOUT_POLICY set here in effect.
00118   // @@ TODO: should use Guard to set and reset policies.
00119   try
00120     {
00121       // Query the state of the Current object *before* we initiate
00122       // the iteration...
00123       CORBA::PolicyTypeSeq types;
00124       CORBA::PolicyList_var policies =
00125         this->policy_current_->get_policy_overrides (types);
00126 
00127       // Change the timeout
00128       this->policy_current_->set_policy_overrides (this->policy_list_,
00129                                                    CORBA::ADD_OVERRIDE);
00130 
00131       // Query the state of the consumers...
00132       this->query_eventchannel ();
00133 
00134       this->policy_current_->set_policy_overrides (policies.in (),
00135                                                    CORBA::SET_OVERRIDE);
00136       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00137         {
00138           policies[i]->destroy ();
00139         }
00140     }
00141   catch (const CORBA::Exception&)
00142     {
00143       // Ignore all exceptions
00144     }
00145 }

void TAO_ECG_Reconnect_ConsumerEC_Control::query_eventchannel ( void   )  [private]

Check if the consumers still exists. It is a helper method for handle_timeout() to isolate the exceptions.

Definition at line 71 of file ECG_Reconnect_ConsumerEC_Control.cpp.

References TAO_EC_Gateway_IIOP::consumer_ec_non_existent(), event_channel_not_exist(), gateway_, is_consumer_ec_connected_, and try_reconnect().

Referenced by handle_timeout().

00072 {
00073   try
00074     {
00075       if (is_consumer_ec_connected_ == 1)
00076         {
00077           CORBA::Boolean disconnected;
00078           CORBA::Boolean non_existent =
00079             gateway_->consumer_ec_non_existent (disconnected);
00080           if (non_existent && !disconnected)
00081             {
00082               this->event_channel_not_exist (gateway_);
00083             }
00084         }
00085       else
00086         {
00087            this->try_reconnect();
00088         }
00089     }
00090   catch (const CORBA::OBJECT_NOT_EXIST&)
00091     {
00092       this->event_channel_not_exist (gateway_);
00093     }
00094   catch (const CORBA::TRANSIENT&)
00095     {
00096       // This is TAO's minor code for a failed connection, we may
00097       // want to be more lenient in the future..
00098       // if (transient.minor () == 0x54410085)
00099       this->event_channel_not_exist (gateway_);
00100     }
00101   catch (const CORBA::Exception&)
00102     {
00103       // Ignore all exceptions
00104     }
00105 }

void TAO_ECG_Reconnect_ConsumerEC_Control::reconnect ( void   )  [private]

Reconnect to the consumer ec.

Definition at line 56 of file ECG_Reconnect_ConsumerEC_Control.cpp.

References gateway_, is_consumer_ec_connected_, and TAO_EC_Gateway_IIOP::reconnect_consumer_ec().

Referenced by try_reconnect().

00057 {
00058   try
00059     {
00060       is_consumer_ec_connected_ = 1;
00061 
00062       gateway_->reconnect_consumer_ec();
00063     }
00064   catch (const CORBA::Exception&)
00065     {
00066       // Ignore all exceptions
00067     }
00068 }

int TAO_ECG_Reconnect_ConsumerEC_Control::shutdown ( void   )  [virtual]

Shutdown any internal threads or timers used to poll the state of the event channel.

Reimplemented from TAO_ECG_ConsumerEC_Control.

Definition at line 195 of file ECG_Reconnect_ConsumerEC_Control.cpp.

References adapter_, ACE_Reactor::cancel_timer(), ACE_Event_Handler::reactor(), reactor_, and timer_id_.

00196 {
00197   int r = 0;
00198 
00199 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00200   r = this->reactor_->cancel_timer (timer_id_);
00201 #endif /* TAO_HAS_CORBA_MESSAGING */
00202 
00203   this->adapter_.reactor (0);
00204   return r;
00205 }

void TAO_ECG_Reconnect_ConsumerEC_Control::system_exception ( TAO_EC_Gateway_IIOP gateway,
CORBA::SystemException  
) [virtual]

Some system exception was raised while trying to contact the event channel

Reimplemented from TAO_ECG_ConsumerEC_Control.

Definition at line 231 of file ECG_Reconnect_ConsumerEC_Control.cpp.

References TAO_EC_Gateway_IIOP::cleanup_consumer_proxies(), is_consumer_ec_connected_, and TAO_EC_Gateway_IIOP::suspend_supplier_ec().

00234 {
00235   try
00236     {
00237       // The current implementation is very strict, and kicks out a
00238       // client on the first system exception. We may
00239       // want to be more lenient in the future, for example,
00240       // this is TAO's minor code for a failed connection.
00241       //
00242       // if (CORBA::TRANSIENT::_narrow (&exception) != 0
00243       //     && exception->minor () == 0x54410085)
00244       //   return;
00245 
00246       // Anything else is serious, including timeouts...
00247       //ACE_DEBUG ((LM_DEBUG,
00248       //            "ECG_Reconnect_ConsumerControl(%P|%t) - "
00249       //            "channel %x does not exists system except\n"));
00250       is_consumer_ec_connected_ = 0;
00251 
00252       gateway->suspend_supplier_ec ();
00253 
00254       gateway->cleanup_consumer_proxies ();
00255     }
00256   catch (const CORBA::Exception&)
00257     {
00258       // Ignore all exceptions..
00259     }
00260 }

void TAO_ECG_Reconnect_ConsumerEC_Control::try_reconnect ( void   )  [private]

Look if we can reconnect again.

Definition at line 37 of file ECG_Reconnect_ConsumerEC_Control.cpp.

References TAO_EC_Gateway_IIOP::consumer_ec_non_existent(), gateway_, and reconnect().

Referenced by query_eventchannel().

00038 {
00039   try
00040     {
00041       CORBA::Boolean disconnected;
00042       CORBA::Boolean non_existent =
00043       gateway_->consumer_ec_non_existent (disconnected);
00044       if (!non_existent)
00045         {
00046           this->reconnect();
00047         }
00048     }
00049   catch (const CORBA::Exception&)
00050     {
00051       // Ignore all exceptions
00052     }
00053 }


Member Data Documentation

TAO_ECG_Reconnect_ConsumerEC_Control_Adapter TAO_ECG_Reconnect_ConsumerEC_Control::adapter_ [private]

The Adapter for the reactor events.

Definition at line 115 of file ECG_Reconnect_ConsumerEC_Control.h.

Referenced by shutdown().

TAO_EC_Gateway_IIOP* TAO_ECG_Reconnect_ConsumerEC_Control::gateway_ [private]

The gateway.

Definition at line 118 of file ECG_Reconnect_ConsumerEC_Control.h.

Referenced by query_eventchannel(), reconnect(), and try_reconnect().

int TAO_ECG_Reconnect_ConsumerEC_Control::is_consumer_ec_connected_ [private]

Do we have a connection to the consumer ec.

Definition at line 133 of file ECG_Reconnect_ConsumerEC_Control.h.

Referenced by event_channel_not_exist(), query_eventchannel(), reconnect(), and system_exception().

CORBA::ORB_var TAO_ECG_Reconnect_ConsumerEC_Control::orb_ [private]

The ORB.

Definition at line 121 of file ECG_Reconnect_ConsumerEC_Control.h.

Referenced by activate().

CORBA::PolicyCurrent_var TAO_ECG_Reconnect_ConsumerEC_Control::policy_current_ [private]

To control the timeout policy in the thread.

Definition at line 124 of file ECG_Reconnect_ConsumerEC_Control.h.

Referenced by activate(), and handle_timeout().

CORBA::PolicyList TAO_ECG_Reconnect_ConsumerEC_Control::policy_list_ [private]

Precomputed policy list to the set timeout.

Definition at line 127 of file ECG_Reconnect_ConsumerEC_Control.h.

Referenced by activate().

ACE_Time_Value TAO_ECG_Reconnect_ConsumerEC_Control::rate_ [private]

The polling rate.

Definition at line 109 of file ECG_Reconnect_ConsumerEC_Control.h.

ACE_Reactor* TAO_ECG_Reconnect_ConsumerEC_Control::reactor_ [private]

The ORB reactor.

Definition at line 130 of file ECG_Reconnect_ConsumerEC_Control.h.

Referenced by activate(), and shutdown().

ACE_Time_Value TAO_ECG_Reconnect_ConsumerEC_Control::timeout_ [private]

The polling timeout.

Definition at line 112 of file ECG_Reconnect_ConsumerEC_Control.h.

Referenced by activate().

long TAO_ECG_Reconnect_ConsumerEC_Control::timer_id_ [private]

The timer id.

Definition at line 137 of file ECG_Reconnect_ConsumerEC_Control.h.

Referenced by activate(), and shutdown().


The documentation for this class was generated from the following files:
Generated on Tue Feb 2 17:44:39 2010 for TAO_RTEvent by  doxygen 1.4.7