#include <ECG_Reactive_ConsumerEC_Control.h>
Inheritance diagram for TAO_ECG_Reactive_ConsumerEC_Control:
Public Member Functions | |
TAO_ECG_Reactive_ConsumerEC_Control (const ACE_Time_Value &rate, const ACE_Time_Value &timeout, TAO_EC_Gateway_IIOP *gateway, CORBA::ORB_ptr orb) | |
virtual | ~TAO_ECG_Reactive_ConsumerEC_Control (void) |
Destructor. | |
void | handle_timeout (const ACE_Time_Value &tv, const void *arg) |
Receive the timeout from the adapter. | |
virtual int | activate (void) |
virtual int | shutdown (void) |
virtual void | event_channel_not_exist (TAO_EC_Gateway_IIOP *gateway) |
virtual void | system_exception (TAO_EC_Gateway_IIOP *gateway, CORBA::SystemException &) |
Private Member Functions | |
void | query_eventchannel (void) |
Private Attributes | |
ACE_Time_Value | rate_ |
The polling rate. | |
ACE_Time_Value | timeout_ |
The polling timeout. | |
TAO_ECG_Reactive_ConsumerEC_Control_Adapter | adapter_ |
The Adapter for the reactor events. | |
TAO_EC_Gateway_IIOP * | gateway_ |
The gateway. | |
CORBA::ORB_var | orb_ |
The ORB. | |
CORBA::PolicyCurrent_var | policy_current_ |
To control the timeout policy in the thread. | |
CORBA::PolicyList | policy_list_ |
Precomputed policy list to the set timeout. | |
ACE_Reactor * | reactor_ |
The ORB reactor. | |
long | timer_id_ |
The timer id. |
Defines the interface for the consumer event channel control strategy. This strategy handles misbehaving or failing consumer event channels.
Definition at line 66 of file ECG_Reactive_ConsumerEC_Control.h.
TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_ECG_Reactive_ConsumerEC_Control::TAO_ECG_Reactive_ConsumerEC_Control | ( | const ACE_Time_Value & | rate, | |
const ACE_Time_Value & | timeout, | |||
TAO_EC_Gateway_IIOP * | gateway, | |||
CORBA::ORB_ptr | orb | |||
) |
Constructor. It does not assume ownership of the event_channel parameter.
Definition at line 15 of file ECG_Reactive_ConsumerEC_Control.cpp.
00019 : rate_ (rate), 00020 timeout_ (timeout), 00021 adapter_ (this), 00022 gateway_ (gateway), 00023 orb_ (CORBA::ORB::_duplicate (orb)) 00024 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00025 , timer_id_ (-1) 00026 #endif /* TAO_HAS_CORBA_MESSAGING != 0*/ 00027 { 00028 this->reactor_ = 00029 this->orb_->orb_core ()->reactor (); 00030 }
TAO_ECG_Reactive_ConsumerEC_Control::~TAO_ECG_Reactive_ConsumerEC_Control | ( | void | ) | [virtual] |
int TAO_ECG_Reactive_ConsumerEC_Control::activate | ( | void | ) | [virtual] |
Activate any internal threads or timers used to poll the state of the event channel.
Reimplemented from TAO_ECG_ConsumerEC_Control.
Definition at line 108 of file ECG_Reactive_ConsumerEC_Control.cpp.
References TAO_Pseudo_Var_T< T >::in(), orb_, policy_current_, policy_list_, reactor_, ACE_Reactor::schedule_timer(), timeout_, timer_id_, ACE_Time_Value::usec(), and ACE_Time_Value::zero.
00109 { 00110 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00111 try 00112 { 00113 // Get the PolicyCurrent object 00114 CORBA::Object_var tmp = 00115 this->orb_->resolve_initial_references ("PolicyCurrent"); 00116 00117 this->policy_current_ = 00118 CORBA::PolicyCurrent::_narrow (tmp.in ()); 00119 00120 // Timeout for polling state (default = 10 msec) 00121 TimeBase::TimeT timeout = timeout_.usec() * 10; 00122 CORBA::Any any; 00123 any <<= timeout; 00124 00125 this->policy_list_.length (1); 00126 this->policy_list_[0] = 00127 this->orb_->create_policy ( 00128 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE, 00129 any); 00130 00131 // Only schedule the timer, when the rate is not zero 00132 if (this->rate_ != ACE_Time_Value::zero) 00133 { 00134 // Schedule the timer after these policies has been set, because the 00135 // handle_timeout uses these policies, if done in front, the channel 00136 // can crash when the timeout expires before initiazation is ready. 00137 timer_id_ = this->reactor_->schedule_timer (&this->adapter_, 00138 0, 00139 this->rate_, 00140 this->rate_); 00141 if (timer_id_ == -1) 00142 return -1; 00143 } 00144 } 00145 catch (const CORBA::Exception&) 00146 { 00147 return -1; 00148 } 00149 #endif /* TAO_HAS_CORBA_MESSAGING */ 00150 00151 return 0; 00152 }
void TAO_ECG_Reactive_ConsumerEC_Control::event_channel_not_exist | ( | TAO_EC_Gateway_IIOP * | gateway | ) | [virtual] |
When pushing an event to the event channel a CORBA::OBJECT_NOT_EXIST exception was raised. The only interpretation is that the object has been destroyed. The strategy has to (at the very least), reclaim all the resources attached to that object.
Reimplemented from TAO_ECG_ConsumerEC_Control.
Definition at line 168 of file ECG_Reactive_ConsumerEC_Control.cpp.
References CORBA::Exception::_tao_print_exception(), ACE_DEBUG, TAO_EC_Gateway_IIOP::cleanup_consumer_ec(), TAO_EC_Gateway_IIOP::cleanup_consumer_proxies(), and LM_DEBUG.
Referenced by query_eventchannel().
00170 { 00171 try 00172 { 00173 ACE_DEBUG ((LM_DEBUG, 00174 "EC_Reactive_ConsumerControl(%P|%t) - " 00175 "channel %x does not exists\n")); 00176 gateway->cleanup_consumer_ec (); 00177 00178 gateway->cleanup_consumer_proxies (); 00179 00180 } 00181 catch (const CORBA::Exception& ex) 00182 { 00183 ex._tao_print_exception ( 00184 "TAO_EC_Reactive_ConsumerControl::event_channel_not_exist"); 00185 // Ignore all exceptions.. 00186 } 00187 }
void TAO_ECG_Reactive_ConsumerEC_Control::handle_timeout | ( | const ACE_Time_Value & | tv, | |
const void * | arg | |||
) |
Receive the timeout from the adapter.
Definition at line 67 of file ECG_Reactive_ConsumerEC_Control.cpp.
References policy_current_, and query_eventchannel().
Referenced by TAO_ECG_Reactive_ConsumerEC_Control_Adapter::handle_timeout().
00070 { 00071 // NOTE, setting RELATIVE_RT_TIMEOUT_POLICY for the duration of 00072 // query_eventchannel () below has greater impact than desired. For 00073 // example, while we are pinging ec here, a nested upcall, 00074 // which requires making remote calls may come into the ORB. Those 00075 // remote calls will be carried out with with 00076 // RELATIVE_RT_TIMEOUT_POLICY set here in effect. 00077 00078 // @@ TODO: should use Guard to set and reset policies. 00079 try 00080 { 00081 // Query the state of the Current object *before* we initiate 00082 // the iteration... 00083 CORBA::PolicyTypeSeq types; 00084 CORBA::PolicyList_var policies = 00085 this->policy_current_->get_policy_overrides (types); 00086 00087 // Change the timeout 00088 this->policy_current_->set_policy_overrides (this->policy_list_, 00089 CORBA::ADD_OVERRIDE); 00090 00091 // Query the state of the consumers... 00092 this->query_eventchannel (); 00093 00094 this->policy_current_->set_policy_overrides (policies.in (), 00095 CORBA::SET_OVERRIDE); 00096 for (CORBA::ULong i = 0; i != policies->length (); ++i) 00097 { 00098 policies[i]->destroy (); 00099 } 00100 } 00101 catch (const CORBA::Exception&) 00102 { 00103 // Ignore all exceptions 00104 } 00105 }
void TAO_ECG_Reactive_ConsumerEC_Control::query_eventchannel | ( | void | ) | [private] |
Check if the consumers still exists. It is a helper method for handle_timeout() to isolate the exceptions.
Definition at line 37 of file ECG_Reactive_ConsumerEC_Control.cpp.
References TAO_EC_Gateway_IIOP::consumer_ec_non_existent(), event_channel_not_exist(), and gateway_.
Referenced by handle_timeout().
00038 { 00039 try 00040 { 00041 CORBA::Boolean disconnected; 00042 CORBA::Boolean non_existent = 00043 gateway_->consumer_ec_non_existent (disconnected); 00044 if (non_existent && !disconnected) 00045 { 00046 this->event_channel_not_exist (gateway_); 00047 } 00048 } 00049 catch (const CORBA::OBJECT_NOT_EXIST&) 00050 { 00051 this->event_channel_not_exist (gateway_); 00052 } 00053 catch (const CORBA::TRANSIENT&) 00054 { 00055 // This is TAO's minor code for a failed connection, we may 00056 // want to be more lenient in the future.. 00057 // if (transient.minor () == 0x54410085) 00058 this->event_channel_not_exist (gateway_); 00059 } 00060 catch (const CORBA::Exception&) 00061 { 00062 // Ignore all exceptions 00063 } 00064 }
int TAO_ECG_Reactive_ConsumerEC_Control::shutdown | ( | void | ) | [virtual] |
Shutdown any internal threads or timers used to poll the state of the event channel.
Reimplemented from TAO_ECG_ConsumerEC_Control.
Definition at line 155 of file ECG_Reactive_ConsumerEC_Control.cpp.
References adapter_, ACE_Reactor::cancel_timer(), ACE_Event_Handler::reactor(), reactor_, and timer_id_.
00156 { 00157 int r = 0; 00158 00159 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00160 r = this->reactor_->cancel_timer (timer_id_); 00161 #endif /* TAO_HAS_CORBA_MESSAGING */ 00162 00163 this->adapter_.reactor (0); 00164 return r; 00165 }
void TAO_ECG_Reactive_ConsumerEC_Control::system_exception | ( | TAO_EC_Gateway_IIOP * | gateway, | |
CORBA::SystemException & | ||||
) | [virtual] |
Some system exception was raised while trying to contact the event channel
Reimplemented from TAO_ECG_ConsumerEC_Control.
Definition at line 190 of file ECG_Reactive_ConsumerEC_Control.cpp.
References TAO_EC_Gateway_IIOP::cleanup_consumer_ec(), and TAO_EC_Gateway_IIOP::cleanup_consumer_proxies().
00193 { 00194 try 00195 { 00196 gateway->cleanup_consumer_ec (); 00197 00198 gateway->cleanup_consumer_proxies (); 00199 00200 } 00201 catch (const CORBA::Exception&) 00202 { 00203 // Ignore all exceptions.. 00204 } 00205 }
The Adapter for the reactor events.
Definition at line 104 of file ECG_Reactive_ConsumerEC_Control.h.
Referenced by shutdown().
The gateway.
Definition at line 107 of file ECG_Reactive_ConsumerEC_Control.h.
Referenced by query_eventchannel().
The ORB.
Definition at line 110 of file ECG_Reactive_ConsumerEC_Control.h.
Referenced by activate().
CORBA::PolicyCurrent_var TAO_ECG_Reactive_ConsumerEC_Control::policy_current_ [private] |
To control the timeout policy in the thread.
Definition at line 113 of file ECG_Reactive_ConsumerEC_Control.h.
Referenced by activate(), and handle_timeout().
CORBA::PolicyList TAO_ECG_Reactive_ConsumerEC_Control::policy_list_ [private] |
Precomputed policy list to the set timeout.
Definition at line 116 of file ECG_Reactive_ConsumerEC_Control.h.
Referenced by activate().
The ORB reactor.
Definition at line 119 of file ECG_Reactive_ConsumerEC_Control.h.
Referenced by activate(), and shutdown().
The polling timeout.
Definition at line 101 of file ECG_Reactive_ConsumerEC_Control.h.
Referenced by activate().
long TAO_ECG_Reactive_ConsumerEC_Control::timer_id_ [private] |
The timer id.
Definition at line 123 of file ECG_Reactive_ConsumerEC_Control.h.
Referenced by activate(), and shutdown().