#include <ECG_Reconnect_ConsumerEC_Control.h>
Inheritance diagram for TAO_ECG_Reconnect_ConsumerEC_Control:
Public Member Functions | |
TAO_ECG_Reconnect_ConsumerEC_Control (const ACE_Time_Value &rate, const ACE_Time_Value &timeout, TAO_EC_Gateway_IIOP *gateway, CORBA::ORB_ptr orb) | |
virtual | ~TAO_ECG_Reconnect_ConsumerEC_Control (void) |
Destructor. | |
void | handle_timeout (const ACE_Time_Value &tv, const void *arg) |
Receive the timeout from the adapter. | |
virtual int | activate (void) |
virtual int | shutdown (void) |
virtual void | event_channel_not_exist (TAO_EC_Gateway_IIOP *gateway) |
virtual void | system_exception (TAO_EC_Gateway_IIOP *gateway, CORBA::SystemException &) |
Private Member Functions | |
void | query_eventchannel (void) |
void | try_reconnect (void) |
Look if we can reconnect again. | |
void | reconnect (void) |
Reconnect to the consumer ec. | |
Private Attributes | |
ACE_Time_Value | rate_ |
The polling rate. | |
ACE_Time_Value | timeout_ |
The polling timeout. | |
TAO_ECG_Reconnect_ConsumerEC_Control_Adapter | adapter_ |
The Adapter for the reactor events. | |
TAO_EC_Gateway_IIOP * | gateway_ |
The gateway. | |
CORBA::ORB_var | orb_ |
The ORB. | |
CORBA::PolicyCurrent_var | policy_current_ |
To control the timeout policy in the thread. | |
CORBA::PolicyList | policy_list_ |
Precomputed policy list to the set timeout. | |
ACE_Reactor * | reactor_ |
The ORB reactor. | |
int | is_consumer_ec_connected_ |
Do we have a connection to the consumer ec. | |
long | timer_id_ |
The timer id. |
Defines the interface for the consumer event channel control strategy. This strategy handles misbehaving or failing consumer event channels.
This control tries to reconnect to the consumer ec incase of a connection loss and then does a reconnect when the connection is back again. When also a restart of the consumer ec must be handled correctly, then the consumer ec must use a persistent IOR.
Definition at line 71 of file ECG_Reconnect_ConsumerEC_Control.h.
TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_ECG_Reconnect_ConsumerEC_Control::TAO_ECG_Reconnect_ConsumerEC_Control | ( | const ACE_Time_Value & | rate, | |
const ACE_Time_Value & | timeout, | |||
TAO_EC_Gateway_IIOP * | gateway, | |||
CORBA::ORB_ptr | orb | |||
) |
Constructor. It does not assume ownership of the event_channel parameter.
Definition at line 14 of file ECG_Reconnect_ConsumerEC_Control.cpp.
00018 : rate_ (rate), 00019 timeout_ (timeout), 00020 adapter_ (this), 00021 gateway_ (gateway), 00022 orb_ (CORBA::ORB::_duplicate (orb)), 00023 is_consumer_ec_connected_ (1) 00024 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00025 , timer_id_ (-1) 00026 #endif /* TAO_HAS_CORBA_MESSAGING != 0*/ 00027 { 00028 this->reactor_ = 00029 this->orb_->orb_core ()->reactor (); 00030 }
TAO_ECG_Reconnect_ConsumerEC_Control::~TAO_ECG_Reconnect_ConsumerEC_Control | ( | void | ) | [virtual] |
int TAO_ECG_Reconnect_ConsumerEC_Control::activate | ( | void | ) | [virtual] |
Activate any internal threads or timers used to poll the state of the event channel.
Reimplemented from TAO_ECG_ConsumerEC_Control.
Definition at line 148 of file ECG_Reconnect_ConsumerEC_Control.cpp.
References TAO_Pseudo_Var_T< T >::in(), orb_, policy_current_, policy_list_, reactor_, ACE_Reactor::schedule_timer(), timeout_, timer_id_, ACE_Time_Value::usec(), and ACE_Time_Value::zero.
00149 { 00150 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00151 try 00152 { 00153 // Get the PolicyCurrent object 00154 CORBA::Object_var tmp = 00155 this->orb_->resolve_initial_references ("PolicyCurrent"); 00156 00157 this->policy_current_ = 00158 CORBA::PolicyCurrent::_narrow (tmp.in ()); 00159 00160 // Timeout for polling state (default = 10 msec) 00161 TimeBase::TimeT timeout = timeout_.usec() * 10; 00162 CORBA::Any any; 00163 any <<= timeout; 00164 00165 this->policy_list_.length (1); 00166 this->policy_list_[0] = 00167 this->orb_->create_policy ( 00168 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE, 00169 any); 00170 00171 // Only schedule the timer, when the rate is not zero 00172 if (this->rate_ != ACE_Time_Value::zero) 00173 { 00174 // Schedule the timer after these policies has been set, because the 00175 // handle_timeout uses these policies, if done in front, the channel 00176 // can crash when the timeout expires before initiazation is ready. 00177 timer_id_ = this->reactor_->schedule_timer (&this->adapter_, 00178 0, 00179 this->rate_, 00180 this->rate_); 00181 if (timer_id_ == -1) 00182 return -1; 00183 } 00184 } 00185 catch (const CORBA::Exception&) 00186 { 00187 return -1; 00188 } 00189 #endif /* TAO_HAS_CORBA_MESSAGING */ 00190 00191 return 0; 00192 }
void TAO_ECG_Reconnect_ConsumerEC_Control::event_channel_not_exist | ( | TAO_EC_Gateway_IIOP * | gateway | ) | [virtual] |
When pushing an event to the event channel a CORBA::OBJECT_NOT_EXIST exception was raised. The only interpretation is that the object has been destroyed. The strategy has to (at the very least), reclaim all the resources attached to that object.
Reimplemented from TAO_ECG_ConsumerEC_Control.
Definition at line 208 of file ECG_Reconnect_ConsumerEC_Control.cpp.
References CORBA::Exception::_tao_print_exception(), TAO_EC_Gateway_IIOP::cleanup_consumer_proxies(), is_consumer_ec_connected_, and TAO_EC_Gateway_IIOP::suspend_supplier_ec().
Referenced by query_eventchannel().
00210 { 00211 try 00212 { 00213 //ACE_DEBUG ((LM_DEBUG, 00214 // "ECG_Reconnect_ConsumerControl(%P|%t) - " 00215 // "channel %x does not exists\n")); 00216 is_consumer_ec_connected_ = 0; 00217 00218 gateway->suspend_supplier_ec (); 00219 00220 gateway->cleanup_consumer_proxies (); 00221 } 00222 catch (const CORBA::Exception& ex) 00223 { 00224 ex._tao_print_exception ( 00225 "TAO_EC_Reconnect_ConsumerControl::event_channel_not_exist"); 00226 // Ignore all exceptions.. 00227 } 00228 }
void TAO_ECG_Reconnect_ConsumerEC_Control::handle_timeout | ( | const ACE_Time_Value & | tv, | |
const void * | arg | |||
) |
Receive the timeout from the adapter.
Definition at line 108 of file ECG_Reconnect_ConsumerEC_Control.cpp.
References policy_current_, and query_eventchannel().
Referenced by TAO_ECG_Reconnect_ConsumerEC_Control_Adapter::handle_timeout().
00111 { 00112 // NOTE, setting RELATIVE_RT_TIMEOUT_POLICY for the duration of 00113 // query_eventchannel () below has greater impact than desired. For 00114 // example, while we are pinging consumers here, a nested upcall, 00115 // which requires making remote calls may come into the ORB. Those 00116 // remote calls will be carried out with with 00117 // RELATIVE_RT_TIMEOUT_POLICY set here in effect. 00118 // @@ TODO: should use Guard to set and reset policies. 00119 try 00120 { 00121 // Query the state of the Current object *before* we initiate 00122 // the iteration... 00123 CORBA::PolicyTypeSeq types; 00124 CORBA::PolicyList_var policies = 00125 this->policy_current_->get_policy_overrides (types); 00126 00127 // Change the timeout 00128 this->policy_current_->set_policy_overrides (this->policy_list_, 00129 CORBA::ADD_OVERRIDE); 00130 00131 // Query the state of the consumers... 00132 this->query_eventchannel (); 00133 00134 this->policy_current_->set_policy_overrides (policies.in (), 00135 CORBA::SET_OVERRIDE); 00136 for (CORBA::ULong i = 0; i != policies->length (); ++i) 00137 { 00138 policies[i]->destroy (); 00139 } 00140 } 00141 catch (const CORBA::Exception&) 00142 { 00143 // Ignore all exceptions 00144 } 00145 }
void TAO_ECG_Reconnect_ConsumerEC_Control::query_eventchannel | ( | void | ) | [private] |
Check if the consumers still exists. It is a helper method for handle_timeout() to isolate the exceptions.
Definition at line 71 of file ECG_Reconnect_ConsumerEC_Control.cpp.
References TAO_EC_Gateway_IIOP::consumer_ec_non_existent(), event_channel_not_exist(), gateway_, is_consumer_ec_connected_, and try_reconnect().
Referenced by handle_timeout().
00072 { 00073 try 00074 { 00075 if (is_consumer_ec_connected_ == 1) 00076 { 00077 CORBA::Boolean disconnected; 00078 CORBA::Boolean non_existent = 00079 gateway_->consumer_ec_non_existent (disconnected); 00080 if (non_existent && !disconnected) 00081 { 00082 this->event_channel_not_exist (gateway_); 00083 } 00084 } 00085 else 00086 { 00087 this->try_reconnect(); 00088 } 00089 } 00090 catch (const CORBA::OBJECT_NOT_EXIST&) 00091 { 00092 this->event_channel_not_exist (gateway_); 00093 } 00094 catch (const CORBA::TRANSIENT&) 00095 { 00096 // This is TAO's minor code for a failed connection, we may 00097 // want to be more lenient in the future.. 00098 // if (transient.minor () == 0x54410085) 00099 this->event_channel_not_exist (gateway_); 00100 } 00101 catch (const CORBA::Exception&) 00102 { 00103 // Ignore all exceptions 00104 } 00105 }
void TAO_ECG_Reconnect_ConsumerEC_Control::reconnect | ( | void | ) | [private] |
Reconnect to the consumer ec.
Definition at line 56 of file ECG_Reconnect_ConsumerEC_Control.cpp.
References gateway_, is_consumer_ec_connected_, and TAO_EC_Gateway_IIOP::reconnect_consumer_ec().
Referenced by try_reconnect().
00057 { 00058 try 00059 { 00060 is_consumer_ec_connected_ = 1; 00061 00062 gateway_->reconnect_consumer_ec(); 00063 } 00064 catch (const CORBA::Exception&) 00065 { 00066 // Ignore all exceptions 00067 } 00068 }
int TAO_ECG_Reconnect_ConsumerEC_Control::shutdown | ( | void | ) | [virtual] |
Shutdown any internal threads or timers used to poll the state of the event channel.
Reimplemented from TAO_ECG_ConsumerEC_Control.
Definition at line 195 of file ECG_Reconnect_ConsumerEC_Control.cpp.
References adapter_, ACE_Reactor::cancel_timer(), ACE_Event_Handler::reactor(), reactor_, and timer_id_.
00196 { 00197 int r = 0; 00198 00199 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00200 r = this->reactor_->cancel_timer (timer_id_); 00201 #endif /* TAO_HAS_CORBA_MESSAGING */ 00202 00203 this->adapter_.reactor (0); 00204 return r; 00205 }
void TAO_ECG_Reconnect_ConsumerEC_Control::system_exception | ( | TAO_EC_Gateway_IIOP * | gateway, | |
CORBA::SystemException & | ||||
) | [virtual] |
Some system exception was raised while trying to contact the event channel
Reimplemented from TAO_ECG_ConsumerEC_Control.
Definition at line 231 of file ECG_Reconnect_ConsumerEC_Control.cpp.
References TAO_EC_Gateway_IIOP::cleanup_consumer_proxies(), is_consumer_ec_connected_, and TAO_EC_Gateway_IIOP::suspend_supplier_ec().
00234 { 00235 try 00236 { 00237 // The current implementation is very strict, and kicks out a 00238 // client on the first system exception. We may 00239 // want to be more lenient in the future, for example, 00240 // this is TAO's minor code for a failed connection. 00241 // 00242 // if (CORBA::TRANSIENT::_narrow (&exception) != 0 00243 // && exception->minor () == 0x54410085) 00244 // return; 00245 00246 // Anything else is serious, including timeouts... 00247 //ACE_DEBUG ((LM_DEBUG, 00248 // "ECG_Reconnect_ConsumerControl(%P|%t) - " 00249 // "channel %x does not exists system except\n")); 00250 is_consumer_ec_connected_ = 0; 00251 00252 gateway->suspend_supplier_ec (); 00253 00254 gateway->cleanup_consumer_proxies (); 00255 } 00256 catch (const CORBA::Exception&) 00257 { 00258 // Ignore all exceptions.. 00259 } 00260 }
void TAO_ECG_Reconnect_ConsumerEC_Control::try_reconnect | ( | void | ) | [private] |
Look if we can reconnect again.
Definition at line 37 of file ECG_Reconnect_ConsumerEC_Control.cpp.
References TAO_EC_Gateway_IIOP::consumer_ec_non_existent(), gateway_, and reconnect().
Referenced by query_eventchannel().
00038 { 00039 try 00040 { 00041 CORBA::Boolean disconnected; 00042 CORBA::Boolean non_existent = 00043 gateway_->consumer_ec_non_existent (disconnected); 00044 if (!non_existent) 00045 { 00046 this->reconnect(); 00047 } 00048 } 00049 catch (const CORBA::Exception&) 00050 { 00051 // Ignore all exceptions 00052 } 00053 }
TAO_ECG_Reconnect_ConsumerEC_Control_Adapter TAO_ECG_Reconnect_ConsumerEC_Control::adapter_ [private] |
The Adapter for the reactor events.
Definition at line 115 of file ECG_Reconnect_ConsumerEC_Control.h.
Referenced by shutdown().
The gateway.
Definition at line 118 of file ECG_Reconnect_ConsumerEC_Control.h.
Referenced by query_eventchannel(), reconnect(), and try_reconnect().
Do we have a connection to the consumer ec.
Definition at line 133 of file ECG_Reconnect_ConsumerEC_Control.h.
Referenced by event_channel_not_exist(), query_eventchannel(), reconnect(), and system_exception().
The ORB.
Definition at line 121 of file ECG_Reconnect_ConsumerEC_Control.h.
Referenced by activate().
CORBA::PolicyCurrent_var TAO_ECG_Reconnect_ConsumerEC_Control::policy_current_ [private] |
To control the timeout policy in the thread.
Definition at line 124 of file ECG_Reconnect_ConsumerEC_Control.h.
Referenced by activate(), and handle_timeout().
CORBA::PolicyList TAO_ECG_Reconnect_ConsumerEC_Control::policy_list_ [private] |
Precomputed policy list to the set timeout.
Definition at line 127 of file ECG_Reconnect_ConsumerEC_Control.h.
Referenced by activate().
The ORB reactor.
Definition at line 130 of file ECG_Reconnect_ConsumerEC_Control.h.
Referenced by activate(), and shutdown().
The polling timeout.
Definition at line 112 of file ECG_Reconnect_ConsumerEC_Control.h.
Referenced by activate().
long TAO_ECG_Reconnect_ConsumerEC_Control::timer_id_ [private] |
The timer id.
Definition at line 137 of file ECG_Reconnect_ConsumerEC_Control.h.
Referenced by activate(), and shutdown().