TAO_ECG_Reactive_ConsumerEC_Control Class Reference

Consumer event channel control. More...

#include <ECG_Reactive_ConsumerEC_Control.h>

Inheritance diagram for TAO_ECG_Reactive_ConsumerEC_Control:

Inheritance graph
[legend]
Collaboration diagram for TAO_ECG_Reactive_ConsumerEC_Control:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TAO_ECG_Reactive_ConsumerEC_Control (const ACE_Time_Value &rate, const ACE_Time_Value &timeout, TAO_EC_Gateway_IIOP *gateway, CORBA::ORB_ptr orb)
virtual ~TAO_ECG_Reactive_ConsumerEC_Control (void)
 Destructor.

void handle_timeout (const ACE_Time_Value &tv, const void *arg)
 Receive the timeout from the adapter.

virtual int activate (void)
virtual int shutdown (void)
virtual void event_channel_not_exist (TAO_EC_Gateway_IIOP *gateway)
virtual void system_exception (TAO_EC_Gateway_IIOP *gateway, CORBA::SystemException &)

Private Member Functions

void query_eventchannel ()

Private Attributes

ACE_Time_Value rate_
 The polling rate.

ACE_Time_Value timeout_
 The polling timeout.

TAO_ECG_Reactive_ConsumerEC_Control_Adapter adapter_
 The Adapter for the reactor events.

TAO_EC_Gateway_IIOPgateway_
 The gateway.

CORBA::ORB_var orb_
 The ORB.

CORBA::PolicyCurrent_var policy_current_
 To control the timeout policy in the thread.

CORBA::PolicyList policy_list_
 Precomputed policy list to the set timeout.

ACE_Reactorreactor_
 The ORB reactor.

long timer_id_
 The timer id.


Detailed Description

Consumer event channel control.

Defines the interface for the consumer event channel control strategy. This strategy handles misbehaving or failing consumer event channels.

Definition at line 66 of file ECG_Reactive_ConsumerEC_Control.h.


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_ECG_Reactive_ConsumerEC_Control::TAO_ECG_Reactive_ConsumerEC_Control const ACE_Time_Value rate,
const ACE_Time_Value timeout,
TAO_EC_Gateway_IIOP gateway,
CORBA::ORB_ptr  orb
 

Constructor. It does not assume ownership of the event_channel parameter.

Definition at line 15 of file ECG_Reactive_ConsumerEC_Control.cpp.

References TAO_HAS_CORBA_MESSAGING.

00019   : rate_ (rate),
00020     timeout_ (timeout),
00021     adapter_ (this),
00022     gateway_ (gateway),
00023     orb_ (CORBA::ORB::_duplicate (orb))
00024 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00025     , timer_id_ (-1)
00026 #endif /* TAO_HAS_CORBA_MESSAGING != 0*/
00027 {
00028   this->reactor_ =
00029     this->orb_->orb_core ()->reactor ();
00030 }

TAO_ECG_Reactive_ConsumerEC_Control::~TAO_ECG_Reactive_ConsumerEC_Control void   )  [virtual]
 

Destructor.

Definition at line 32 of file ECG_Reactive_ConsumerEC_Control.cpp.

00033 {
00034 }


Member Function Documentation

int TAO_ECG_Reactive_ConsumerEC_Control::activate void   )  [virtual]
 

Activate any internal threads or timers used to poll the state of the event channel.

Reimplemented from TAO_ECG_ConsumerEC_Control.

Definition at line 124 of file ECG_Reactive_ConsumerEC_Control.cpp.

References CORBA::PolicyCurrent::_narrow(), ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_NEW_ENV, TAO::unbounded_object_reference_sequence< Policy, Policy_var >::length(), ACE_Reactor::schedule_timer(), and ACE_Time_Value::usec().

00125 {
00126 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00127   ACE_TRY_NEW_ENV
00128     {
00129       // Get the PolicyCurrent object
00130       CORBA::Object_var tmp =
00131         this->orb_->resolve_initial_references ("PolicyCurrent"
00132                                                  ACE_ENV_ARG_PARAMETER);
00133       ACE_TRY_CHECK;
00134 
00135       this->policy_current_ =
00136         CORBA::PolicyCurrent::_narrow (tmp.in ()
00137                                         ACE_ENV_ARG_PARAMETER);
00138       ACE_TRY_CHECK;
00139 
00140       // Timeout for polling state (default = 10 msec)
00141       TimeBase::TimeT timeout = timeout_.usec() * 10;
00142       CORBA::Any any;
00143       any <<= timeout;
00144 
00145       this->policy_list_.length (1);
00146       this->policy_list_[0] =
00147         this->orb_->create_policy (
00148                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00149                any
00150                ACE_ENV_ARG_PARAMETER);
00151       ACE_TRY_CHECK;
00152 
00153       // Only schedule the timer, when the rate is not zero
00154       if (this->rate_ != ACE_Time_Value::zero)
00155       {
00156         // Schedule the timer after these policies has been set, because the
00157         // handle_timeout uses these policies, if done in front, the channel
00158         // can crash when the timeout expires before initiazation is ready.
00159         timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00160                                                     0,
00161                                                     this->rate_,
00162                                                     this->rate_);
00163         if (timer_id_ == -1)
00164           return -1;
00165       }
00166     }
00167   ACE_CATCHANY
00168     {
00169       return -1;
00170     }
00171   ACE_ENDTRY;
00172 #endif /* TAO_HAS_CORBA_MESSAGING */
00173 
00174   return 0;
00175 }

void TAO_ECG_Reactive_ConsumerEC_Control::event_channel_not_exist TAO_EC_Gateway_IIOP gateway  )  [virtual]
 

When pushing an event to the event channel a CORBA::OBJECT_NOT_EXIST exception was raised. The only interpretation is that the object has been destroyed. The strategy has to (at the very least), reclaim all the resources attached to that object.

Reimplemented from TAO_ECG_ConsumerEC_Control.

Definition at line 191 of file ECG_Reactive_ConsumerEC_Control.cpp.

References ACE_ANY_EXCEPTION, ACE_CATCHANY, ACE_DEBUG, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_PRINT_EXCEPTION, ACE_TRY, ACE_TRY_CHECK, and LM_DEBUG.

Referenced by query_eventchannel().

00194 {
00195   ACE_TRY
00196     {
00197       ACE_DEBUG ((LM_DEBUG,
00198                   "EC_Reactive_ConsumerControl(%P|%t) - "
00199                   "channel %x does not exists\n"));
00200       gateway->cleanup_consumer_ec ();
00201 
00202       gateway->cleanup_consumer_proxies (ACE_ENV_SINGLE_ARG_PARAMETER);
00203       ACE_TRY_CHECK;
00204 
00205     }
00206   ACE_CATCHANY
00207     {
00208       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00209                            "TAO_EC_Reactive_ConsumerControl::event_channel_not_exist");
00210       // Ignore all exceptions..
00211     }
00212   ACE_ENDTRY;
00213 }

void TAO_ECG_Reactive_ConsumerEC_Control::handle_timeout const ACE_Time_Value tv,
const void *  arg
 

Receive the timeout from the adapter.

Definition at line 74 of file ECG_Reactive_ConsumerEC_Control.cpp.

References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_NEW_ENV, and query_eventchannel().

Referenced by TAO_ECG_Reactive_ConsumerEC_Control_Adapter::handle_timeout().

00077 {
00078   // NOTE, setting RELATIVE_RT_TIMEOUT_POLICY for the duration of
00079   // query_eventchannel () below has greater impact than desired.  For
00080   // example, while we are pinging ec here, a nested upcall,
00081   // which requires making remote calls may come into the ORB.  Those
00082   // remote calls will be carried out with with
00083   // RELATIVE_RT_TIMEOUT_POLICY set here in effect.
00084 
00085   // @@ TODO: should use Guard to set and reset policies.
00086   ACE_TRY_NEW_ENV
00087     {
00088       // Query the state of the Current object *before* we initiate
00089       // the iteration...
00090       CORBA::PolicyTypeSeq types;
00091       CORBA::PolicyList_var policies =
00092         this->policy_current_->get_policy_overrides (types
00093                                                       ACE_ENV_ARG_PARAMETER);
00094       ACE_TRY_CHECK;
00095 
00096       // Change the timeout
00097       this->policy_current_->set_policy_overrides (this->policy_list_,
00098                                                    CORBA::ADD_OVERRIDE
00099                                                     ACE_ENV_ARG_PARAMETER);
00100       ACE_TRY_CHECK;
00101 
00102       // Query the state of the consumers...
00103       this->query_eventchannel (ACE_ENV_SINGLE_ARG_PARAMETER);
00104       ACE_TRY_CHECK;
00105 
00106       this->policy_current_->set_policy_overrides (policies.in (),
00107                                                    CORBA::SET_OVERRIDE
00108                                                     ACE_ENV_ARG_PARAMETER);
00109       ACE_TRY_CHECK;
00110       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00111         {
00112           policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00113           ACE_TRY_CHECK;
00114         }
00115     }
00116   ACE_CATCHANY
00117     {
00118       // Ignore all exceptions
00119     }
00120   ACE_ENDTRY;
00121 }

void TAO_ECG_Reactive_ConsumerEC_Control::query_eventchannel  )  [private]
 

Check if the consumers still exists. It is a helper method for handle_timeout() to isolate the exceptions.

Definition at line 37 of file ECG_Reactive_ConsumerEC_Control.cpp.

References ACE_CATCH, ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, TAO_EC_Gateway_IIOP::consumer_ec_non_existent(), event_channel_not_exist(), and gateway_.

Referenced by handle_timeout().

00039 {
00040   ACE_TRY
00041     {
00042       CORBA::Boolean disconnected;
00043       CORBA::Boolean non_existent =
00044         gateway_->consumer_ec_non_existent (disconnected
00045                                             ACE_ENV_ARG_PARAMETER);
00046       ACE_TRY_CHECK;
00047       if (non_existent && !disconnected)
00048         {
00049           this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER);
00050           ACE_TRY_CHECK;
00051         }
00052     }
00053   ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00054     {
00055       this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER);
00056       ACE_TRY_CHECK;
00057     }
00058   ACE_CATCH (CORBA::TRANSIENT, transient)
00059     {
00060       // This is TAO's minor code for a failed connection, we may
00061       // want to be more lenient in the future..
00062       // if (transient.minor () == 0x54410085)
00063       this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER);
00064       ACE_TRY_CHECK;
00065     }
00066   ACE_CATCHANY
00067     {
00068       // Ignore all exceptions
00069     }
00070   ACE_ENDTRY;
00071 }

int TAO_ECG_Reactive_ConsumerEC_Control::shutdown void   )  [virtual]
 

Shutdown any internal threads or timers used to poll the state of the event channel.

Reimplemented from TAO_ECG_ConsumerEC_Control.

Definition at line 178 of file ECG_Reactive_ConsumerEC_Control.cpp.

References ACE_Reactor::cancel_timer(), and ACE_Event_Handler::reactor().

00179 {
00180   int r = 0;
00181 
00182 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00183   r = this->reactor_->cancel_timer (timer_id_);
00184 #endif /* TAO_HAS_CORBA_MESSAGING */
00185 
00186   this->adapter_.reactor (0);
00187   return r;
00188 }

void TAO_ECG_Reactive_ConsumerEC_Control::system_exception TAO_EC_Gateway_IIOP gateway,
CORBA::SystemException
[virtual]
 

Some system exception was raised while trying to contact the event channel

Reimplemented from TAO_ECG_ConsumerEC_Control.

Definition at line 216 of file ECG_Reactive_ConsumerEC_Control.cpp.

References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, TAO_EC_Gateway_IIOP::cleanup_consumer_ec(), and TAO_EC_Gateway_IIOP::cleanup_consumer_proxies().

00220 {
00221   ACE_TRY
00222     {
00223       gateway->cleanup_consumer_ec ();
00224 
00225       gateway->cleanup_consumer_proxies (ACE_ENV_SINGLE_ARG_PARAMETER);
00226       ACE_TRY_CHECK;
00227 
00228     }
00229   ACE_CATCHANY
00230     {
00231       // Ignore all exceptions..
00232     }
00233   ACE_ENDTRY;
00234 }


Member Data Documentation

TAO_ECG_Reactive_ConsumerEC_Control_Adapter TAO_ECG_Reactive_ConsumerEC_Control::adapter_ [private]
 

The Adapter for the reactor events.

Definition at line 106 of file ECG_Reactive_ConsumerEC_Control.h.

TAO_EC_Gateway_IIOP* TAO_ECG_Reactive_ConsumerEC_Control::gateway_ [private]
 

The gateway.

Definition at line 109 of file ECG_Reactive_ConsumerEC_Control.h.

Referenced by query_eventchannel().

CORBA::ORB_var TAO_ECG_Reactive_ConsumerEC_Control::orb_ [private]
 

The ORB.

Definition at line 112 of file ECG_Reactive_ConsumerEC_Control.h.

CORBA::PolicyCurrent_var TAO_ECG_Reactive_ConsumerEC_Control::policy_current_ [private]
 

To control the timeout policy in the thread.

Definition at line 115 of file ECG_Reactive_ConsumerEC_Control.h.

CORBA::PolicyList TAO_ECG_Reactive_ConsumerEC_Control::policy_list_ [private]
 

Precomputed policy list to the set timeout.

Definition at line 118 of file ECG_Reactive_ConsumerEC_Control.h.

ACE_Time_Value TAO_ECG_Reactive_ConsumerEC_Control::rate_ [private]
 

The polling rate.

Definition at line 100 of file ECG_Reactive_ConsumerEC_Control.h.

ACE_Reactor* TAO_ECG_Reactive_ConsumerEC_Control::reactor_ [private]
 

The ORB reactor.

Definition at line 121 of file ECG_Reactive_ConsumerEC_Control.h.

ACE_Time_Value TAO_ECG_Reactive_ConsumerEC_Control::timeout_ [private]
 

The polling timeout.

Definition at line 103 of file ECG_Reactive_ConsumerEC_Control.h.

long TAO_ECG_Reactive_ConsumerEC_Control::timer_id_ [private]
 

The timer id.

Definition at line 125 of file ECG_Reactive_ConsumerEC_Control.h.


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 13:16:17 2006 for TAO_RTEvent by doxygen 1.3.6