TAO_ECG_Reconnect_ConsumerEC_Control Class Reference

Consumer event channel control. More...

#include <ECG_Reconnect_ConsumerEC_Control.h>

Inheritance diagram for TAO_ECG_Reconnect_ConsumerEC_Control:

Inheritance graph
[legend]
Collaboration diagram for TAO_ECG_Reconnect_ConsumerEC_Control:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TAO_ECG_Reconnect_ConsumerEC_Control (const ACE_Time_Value &rate, const ACE_Time_Value &timeout, TAO_EC_Gateway_IIOP *gateway, CORBA::ORB_ptr orb)
virtual ~TAO_ECG_Reconnect_ConsumerEC_Control (void)
 Destructor.

void handle_timeout (const ACE_Time_Value &tv, const void *arg)
 Receive the timeout from the adapter.

virtual int activate (void)
virtual int shutdown (void)
virtual void event_channel_not_exist (TAO_EC_Gateway_IIOP *gateway)
virtual void system_exception (TAO_EC_Gateway_IIOP *gateway, CORBA::SystemException &)

Private Member Functions

void query_eventchannel ()
void try_reconnect ()
 Look if we can reconnect again.

void reconnect ()
 Reconnect to the consumer ec.


Private Attributes

ACE_Time_Value rate_
 The polling rate.

ACE_Time_Value timeout_
 The polling timeout.

TAO_ECG_Reconnect_ConsumerEC_Control_Adapter adapter_
 The Adapter for the reactor events.

TAO_EC_Gateway_IIOPgateway_
 The gateway.

CORBA::ORB_var orb_
 The ORB.

CORBA::PolicyCurrent_var policy_current_
 To control the timeout policy in the thread.

CORBA::PolicyList policy_list_
 Precomputed policy list to the set timeout.

ACE_Reactorreactor_
 The ORB reactor.

int is_consumer_ec_connected_
 Do we have a connection to the consumer ec.

long timer_id_
 The timer id.


Detailed Description

Consumer event channel control.

Defines the interface for the consumer event channel control strategy. This strategy handles misbehaving or failing consumer event channels.

This control tries to reconnect to the consumer ec incase of a connection loss and then does a reconnect when the connection is back again. When also a restart of the consumer ec must be handled correctly, then the consumer ec must use a persistent IOR.

Definition at line 71 of file ECG_Reconnect_ConsumerEC_Control.h.


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_ECG_Reconnect_ConsumerEC_Control::TAO_ECG_Reconnect_ConsumerEC_Control const ACE_Time_Value rate,
const ACE_Time_Value timeout,
TAO_EC_Gateway_IIOP gateway,
CORBA::ORB_ptr  orb
 

Constructor. It does not assume ownership of the event_channel parameter.

Definition at line 14 of file ECG_Reconnect_ConsumerEC_Control.cpp.

References TAO_HAS_CORBA_MESSAGING.

00018   : rate_ (rate),
00019     timeout_ (timeout),
00020     adapter_ (this),
00021     gateway_ (gateway),
00022     orb_ (CORBA::ORB::_duplicate (orb)),
00023     is_consumer_ec_connected_ (1)
00024 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00025     , timer_id_ (-1)
00026 #endif /* TAO_HAS_CORBA_MESSAGING != 0*/
00027 {
00028   this->reactor_ =
00029     this->orb_->orb_core ()->reactor ();
00030 }

TAO_ECG_Reconnect_ConsumerEC_Control::~TAO_ECG_Reconnect_ConsumerEC_Control void   )  [virtual]
 

Destructor.

Definition at line 32 of file ECG_Reconnect_ConsumerEC_Control.cpp.

00033 {
00034 }


Member Function Documentation

int TAO_ECG_Reconnect_ConsumerEC_Control::activate void   )  [virtual]
 

Activate any internal threads or timers used to poll the state of the event channel.

Reimplemented from TAO_ECG_ConsumerEC_Control.

Definition at line 173 of file ECG_Reconnect_ConsumerEC_Control.cpp.

References CORBA::PolicyCurrent::_narrow(), ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_NEW_ENV, TAO::unbounded_object_reference_sequence< Policy, Policy_var >::length(), ACE_Reactor::schedule_timer(), and ACE_Time_Value::usec().

00174 {
00175 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00176   ACE_TRY_NEW_ENV
00177     {
00178       // Get the PolicyCurrent object
00179       CORBA::Object_var tmp =
00180         this->orb_->resolve_initial_references ("PolicyCurrent"
00181                                                  ACE_ENV_ARG_PARAMETER);
00182       ACE_TRY_CHECK;
00183 
00184       this->policy_current_ =
00185         CORBA::PolicyCurrent::_narrow (tmp.in ()
00186                                         ACE_ENV_ARG_PARAMETER);
00187       ACE_TRY_CHECK;
00188 
00189       // Timeout for polling state (default = 10 msec)
00190       TimeBase::TimeT timeout = timeout_.usec() * 10;
00191       CORBA::Any any;
00192       any <<= timeout;
00193 
00194       this->policy_list_.length (1);
00195       this->policy_list_[0] =
00196         this->orb_->create_policy (
00197                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00198                any
00199                ACE_ENV_ARG_PARAMETER);
00200       ACE_TRY_CHECK;
00201 
00202       // Only schedule the timer, when the rate is not zero
00203       if (this->rate_ != ACE_Time_Value::zero)
00204       {
00205         // Schedule the timer after these policies has been set, because the
00206         // handle_timeout uses these policies, if done in front, the channel
00207         // can crash when the timeout expires before initiazation is ready.
00208         timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00209                                                     0,
00210                                                     this->rate_,
00211                                                     this->rate_);
00212         if (timer_id_ == -1)
00213           return -1;
00214       }
00215     }
00216   ACE_CATCHANY
00217     {
00218       return -1;
00219     }
00220   ACE_ENDTRY;
00221 #endif /* TAO_HAS_CORBA_MESSAGING */
00222 
00223   return 0;
00224 }

void TAO_ECG_Reconnect_ConsumerEC_Control::event_channel_not_exist TAO_EC_Gateway_IIOP gateway  )  [virtual]
 

When pushing an event to the event channel a CORBA::OBJECT_NOT_EXIST exception was raised. The only interpretation is that the object has been destroyed. The strategy has to (at the very least), reclaim all the resources attached to that object.

Reimplemented from TAO_ECG_ConsumerEC_Control.

Definition at line 240 of file ECG_Reconnect_ConsumerEC_Control.cpp.

References ACE_ANY_EXCEPTION, ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_PRINT_EXCEPTION, ACE_TRY, ACE_TRY_CHECK, and is_consumer_ec_connected_.

Referenced by query_eventchannel().

00243 {
00244   ACE_TRY
00245     {
00246       //ACE_DEBUG ((LM_DEBUG,
00247       //            "ECG_Reconnect_ConsumerControl(%P|%t) - "
00248       //            "channel %x does not exists\n"));
00249       is_consumer_ec_connected_ = 0;
00250 
00251       gateway->suspend_supplier_ec (ACE_ENV_SINGLE_ARG_PARAMETER);
00252       ACE_TRY_CHECK;
00253 
00254       gateway->cleanup_consumer_proxies (ACE_ENV_SINGLE_ARG_PARAMETER);
00255       ACE_TRY_CHECK;
00256     }
00257   ACE_CATCHANY
00258     {
00259       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00260                            "TAO_EC_Reconnect_ConsumerControl::event_channel_not_exist");
00261       // Ignore all exceptions..
00262     }
00263   ACE_ENDTRY;
00264 }

void TAO_ECG_Reconnect_ConsumerEC_Control::handle_timeout const ACE_Time_Value tv,
const void *  arg
 

Receive the timeout from the adapter.

Definition at line 124 of file ECG_Reconnect_ConsumerEC_Control.cpp.

References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_NEW_ENV, and query_eventchannel().

Referenced by TAO_ECG_Reconnect_ConsumerEC_Control_Adapter::handle_timeout().

00127 {
00128   // NOTE, setting RELATIVE_RT_TIMEOUT_POLICY for the duration of
00129   // query_eventchannel () below has greater impact than desired.  For
00130   // example, while we are pinging consumers here, a nested upcall,
00131   // which requires making remote calls may come into the ORB.  Those
00132   // remote calls will be carried out with with
00133   // RELATIVE_RT_TIMEOUT_POLICY set here in effect.
00134   // @@ TODO: should use Guard to set and reset policies.
00135   ACE_TRY_NEW_ENV
00136     {
00137       // Query the state of the Current object *before* we initiate
00138       // the iteration...
00139       CORBA::PolicyTypeSeq types;
00140       CORBA::PolicyList_var policies =
00141         this->policy_current_->get_policy_overrides (types
00142                                                       ACE_ENV_ARG_PARAMETER);
00143       ACE_TRY_CHECK;
00144 
00145       // Change the timeout
00146       this->policy_current_->set_policy_overrides (this->policy_list_,
00147                                                    CORBA::ADD_OVERRIDE
00148                                                     ACE_ENV_ARG_PARAMETER);
00149       ACE_TRY_CHECK;
00150 
00151       // Query the state of the consumers...
00152       this->query_eventchannel (ACE_ENV_SINGLE_ARG_PARAMETER);
00153       ACE_TRY_CHECK;
00154 
00155       this->policy_current_->set_policy_overrides (policies.in (),
00156                                                    CORBA::SET_OVERRIDE
00157                                                     ACE_ENV_ARG_PARAMETER);
00158       ACE_TRY_CHECK;
00159       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00160         {
00161           policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00162           ACE_TRY_CHECK;
00163         }
00164     }
00165   ACE_CATCHANY
00166     {
00167       // Ignore all exceptions
00168     }
00169   ACE_ENDTRY;
00170 }

void TAO_ECG_Reconnect_ConsumerEC_Control::query_eventchannel  )  [private]
 

Check if the consumers still exists. It is a helper method for handle_timeout() to isolate the exceptions.

Definition at line 79 of file ECG_Reconnect_ConsumerEC_Control.cpp.

References ACE_CATCH, ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, TAO_EC_Gateway_IIOP::consumer_ec_non_existent(), event_channel_not_exist(), is_consumer_ec_connected_, and try_reconnect().

Referenced by handle_timeout().

00081 {
00082   ACE_TRY
00083     {
00084       if (is_consumer_ec_connected_ == 1)
00085         {
00086           CORBA::Boolean disconnected;
00087           CORBA::Boolean non_existent =
00088             gateway_->consumer_ec_non_existent (disconnected
00089                                                 ACE_ENV_ARG_PARAMETER);
00090           ACE_TRY_CHECK;
00091           if (non_existent && !disconnected)
00092             {
00093               this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER);
00094               ACE_TRY_CHECK;
00095             }
00096         }
00097       else
00098         {
00099            this->try_reconnect(ACE_ENV_SINGLE_ARG_PARAMETER);
00100            ACE_TRY_CHECK;
00101         }
00102     }
00103   ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00104     {
00105       this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER);
00106       ACE_TRY_CHECK;
00107     }
00108   ACE_CATCH (CORBA::TRANSIENT, transient)
00109     {
00110       // This is TAO's minor code for a failed connection, we may
00111       // want to be more lenient in the future..
00112       // if (transient.minor () == 0x54410085)
00113       this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER);
00114       ACE_TRY_CHECK;
00115     }
00116   ACE_CATCHANY
00117     {
00118       // Ignore all exceptions
00119     }
00120   ACE_ENDTRY;
00121 }

void TAO_ECG_Reconnect_ConsumerEC_Control::reconnect  )  [private]
 

Reconnect to the consumer ec.

Definition at line 61 of file ECG_Reconnect_ConsumerEC_Control.cpp.

References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, is_consumer_ec_connected_, and TAO_EC_Gateway_IIOP::reconnect_consumer_ec().

Referenced by try_reconnect().

00063 {
00064   ACE_TRY
00065     {
00066       is_consumer_ec_connected_ = 1;
00067 
00068       gateway_->reconnect_consumer_ec(ACE_ENV_SINGLE_ARG_PARAMETER);
00069       ACE_TRY_CHECK;
00070     }
00071   ACE_CATCHANY
00072     {
00073       // Ignore all exceptions
00074     }
00075   ACE_ENDTRY;
00076 }

int TAO_ECG_Reconnect_ConsumerEC_Control::shutdown void   )  [virtual]
 

Shutdown any internal threads or timers used to poll the state of the event channel.

Reimplemented from TAO_ECG_ConsumerEC_Control.

Definition at line 227 of file ECG_Reconnect_ConsumerEC_Control.cpp.

References ACE_Reactor::cancel_timer(), and ACE_Event_Handler::reactor().

00228 {
00229   int r = 0;
00230 
00231 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00232   r = this->reactor_->cancel_timer (timer_id_);
00233 #endif /* TAO_HAS_CORBA_MESSAGING */
00234 
00235   this->adapter_.reactor (0);
00236   return r;
00237 }

void TAO_ECG_Reconnect_ConsumerEC_Control::system_exception TAO_EC_Gateway_IIOP gateway,
CORBA::SystemException
[virtual]
 

Some system exception was raised while trying to contact the event channel

Reimplemented from TAO_ECG_ConsumerEC_Control.

Definition at line 267 of file ECG_Reconnect_ConsumerEC_Control.cpp.

References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, TAO_EC_Gateway_IIOP::cleanup_consumer_proxies(), is_consumer_ec_connected_, and TAO_EC_Gateway_IIOP::suspend_supplier_ec().

00271 {
00272   ACE_TRY
00273     {
00274       // The current implementation is very strict, and kicks out a
00275       // client on the first system exception. We may
00276       // want to be more lenient in the future, for example,
00277       // this is TAO's minor code for a failed connection.
00278       //
00279       // if (CORBA::TRANSIENT::_narrow (&exception) != 0
00280       //     && exception->minor () == 0x54410085)
00281       //   return;
00282 
00283       // Anything else is serious, including timeouts...
00284       //ACE_DEBUG ((LM_DEBUG,
00285       //            "ECG_Reconnect_ConsumerControl(%P|%t) - "
00286       //            "channel %x does not exists system except\n"));
00287       is_consumer_ec_connected_ = 0;
00288 
00289       gateway->suspend_supplier_ec (ACE_ENV_SINGLE_ARG_PARAMETER);
00290       ACE_TRY_CHECK;
00291 
00292       gateway->cleanup_consumer_proxies (ACE_ENV_SINGLE_ARG_PARAMETER);
00293       ACE_TRY_CHECK;
00294     }
00295   ACE_CATCHANY
00296     {
00297       // Ignore all exceptions..
00298     }
00299   ACE_ENDTRY;
00300 }

void TAO_ECG_Reconnect_ConsumerEC_Control::try_reconnect  )  [private]
 

Look if we can reconnect again.

Definition at line 37 of file ECG_Reconnect_ConsumerEC_Control.cpp.

References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, TAO_EC_Gateway_IIOP::consumer_ec_non_existent(), and reconnect().

Referenced by query_eventchannel().

00039 {
00040   ACE_TRY
00041     {
00042       CORBA::Boolean disconnected;
00043       CORBA::Boolean non_existent =
00044       gateway_->consumer_ec_non_existent (disconnected
00045                                            ACE_ENV_ARG_PARAMETER);
00046       ACE_TRY_CHECK;
00047       if (!non_existent)
00048         {
00049           this->reconnect(ACE_ENV_SINGLE_ARG_PARAMETER);
00050           ACE_TRY_CHECK;
00051         }
00052     }
00053   ACE_CATCHANY
00054     {
00055       // Ignore all exceptions
00056     }
00057   ACE_ENDTRY;
00058 }


Member Data Documentation

TAO_ECG_Reconnect_ConsumerEC_Control_Adapter TAO_ECG_Reconnect_ConsumerEC_Control::adapter_ [private]
 

The Adapter for the reactor events.

Definition at line 117 of file ECG_Reconnect_ConsumerEC_Control.h.

TAO_EC_Gateway_IIOP* TAO_ECG_Reconnect_ConsumerEC_Control::gateway_ [private]
 

The gateway.

Definition at line 120 of file ECG_Reconnect_ConsumerEC_Control.h.

int TAO_ECG_Reconnect_ConsumerEC_Control::is_consumer_ec_connected_ [private]
 

Do we have a connection to the consumer ec.

Definition at line 135 of file ECG_Reconnect_ConsumerEC_Control.h.

Referenced by event_channel_not_exist(), query_eventchannel(), reconnect(), and system_exception().

CORBA::ORB_var TAO_ECG_Reconnect_ConsumerEC_Control::orb_ [private]
 

The ORB.

Definition at line 123 of file ECG_Reconnect_ConsumerEC_Control.h.

CORBA::PolicyCurrent_var TAO_ECG_Reconnect_ConsumerEC_Control::policy_current_ [private]
 

To control the timeout policy in the thread.

Definition at line 126 of file ECG_Reconnect_ConsumerEC_Control.h.

CORBA::PolicyList TAO_ECG_Reconnect_ConsumerEC_Control::policy_list_ [private]
 

Precomputed policy list to the set timeout.

Definition at line 129 of file ECG_Reconnect_ConsumerEC_Control.h.

ACE_Time_Value TAO_ECG_Reconnect_ConsumerEC_Control::rate_ [private]
 

The polling rate.

Definition at line 111 of file ECG_Reconnect_ConsumerEC_Control.h.

ACE_Reactor* TAO_ECG_Reconnect_ConsumerEC_Control::reactor_ [private]
 

The ORB reactor.

Definition at line 132 of file ECG_Reconnect_ConsumerEC_Control.h.

ACE_Time_Value TAO_ECG_Reconnect_ConsumerEC_Control::timeout_ [private]
 

The polling timeout.

Definition at line 114 of file ECG_Reconnect_ConsumerEC_Control.h.

long TAO_ECG_Reconnect_ConsumerEC_Control::timer_id_ [private]
 

The timer id.

Definition at line 139 of file ECG_Reconnect_ConsumerEC_Control.h.


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 13:16:20 2006 for TAO_RTEvent by doxygen 1.3.6