#include <ECG_Reconnect_ConsumerEC_Control.h>
Inheritance diagram for TAO_ECG_Reconnect_ConsumerEC_Control:
Public Member Functions | |
TAO_ECG_Reconnect_ConsumerEC_Control (const ACE_Time_Value &rate, const ACE_Time_Value &timeout, TAO_EC_Gateway_IIOP *gateway, CORBA::ORB_ptr orb) | |
virtual | ~TAO_ECG_Reconnect_ConsumerEC_Control (void) |
Destructor. | |
void | handle_timeout (const ACE_Time_Value &tv, const void *arg) |
Receive the timeout from the adapter. | |
virtual int | activate (void) |
virtual int | shutdown (void) |
virtual void | event_channel_not_exist (TAO_EC_Gateway_IIOP *gateway) |
virtual void | system_exception (TAO_EC_Gateway_IIOP *gateway, CORBA::SystemException &) |
Private Member Functions | |
void | query_eventchannel () |
void | try_reconnect () |
Look if we can reconnect again. | |
void | reconnect () |
Reconnect to the consumer ec. | |
Private Attributes | |
ACE_Time_Value | rate_ |
The polling rate. | |
ACE_Time_Value | timeout_ |
The polling timeout. | |
TAO_ECG_Reconnect_ConsumerEC_Control_Adapter | adapter_ |
The Adapter for the reactor events. | |
TAO_EC_Gateway_IIOP * | gateway_ |
The gateway. | |
CORBA::ORB_var | orb_ |
The ORB. | |
CORBA::PolicyCurrent_var | policy_current_ |
To control the timeout policy in the thread. | |
CORBA::PolicyList | policy_list_ |
Precomputed policy list to the set timeout. | |
ACE_Reactor * | reactor_ |
The ORB reactor. | |
int | is_consumer_ec_connected_ |
Do we have a connection to the consumer ec. | |
long | timer_id_ |
The timer id. |
Defines the interface for the consumer event channel control strategy. This strategy handles misbehaving or failing consumer event channels.
This control tries to reconnect to the consumer ec incase of a connection loss and then does a reconnect when the connection is back again. When also a restart of the consumer ec must be handled correctly, then the consumer ec must use a persistent IOR.
Definition at line 71 of file ECG_Reconnect_ConsumerEC_Control.h.
|
Constructor. It does not assume ownership of the event_channel parameter. Definition at line 14 of file ECG_Reconnect_ConsumerEC_Control.cpp. References TAO_HAS_CORBA_MESSAGING.
00018 : rate_ (rate), 00019 timeout_ (timeout), 00020 adapter_ (this), 00021 gateway_ (gateway), 00022 orb_ (CORBA::ORB::_duplicate (orb)), 00023 is_consumer_ec_connected_ (1) 00024 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00025 , timer_id_ (-1) 00026 #endif /* TAO_HAS_CORBA_MESSAGING != 0*/ 00027 { 00028 this->reactor_ = 00029 this->orb_->orb_core ()->reactor (); 00030 } |
|
Destructor.
Definition at line 32 of file ECG_Reconnect_ConsumerEC_Control.cpp.
00033 { 00034 } |
|
Activate any internal threads or timers used to poll the state of the event channel. Reimplemented from TAO_ECG_ConsumerEC_Control. Definition at line 173 of file ECG_Reconnect_ConsumerEC_Control.cpp. References CORBA::PolicyCurrent::_narrow(), ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_NEW_ENV, TAO::unbounded_object_reference_sequence< Policy, Policy_var >::length(), ACE_Reactor::schedule_timer(), and ACE_Time_Value::usec().
00174 { 00175 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00176 ACE_TRY_NEW_ENV 00177 { 00178 // Get the PolicyCurrent object 00179 CORBA::Object_var tmp = 00180 this->orb_->resolve_initial_references ("PolicyCurrent" 00181 ACE_ENV_ARG_PARAMETER); 00182 ACE_TRY_CHECK; 00183 00184 this->policy_current_ = 00185 CORBA::PolicyCurrent::_narrow (tmp.in () 00186 ACE_ENV_ARG_PARAMETER); 00187 ACE_TRY_CHECK; 00188 00189 // Timeout for polling state (default = 10 msec) 00190 TimeBase::TimeT timeout = timeout_.usec() * 10; 00191 CORBA::Any any; 00192 any <<= timeout; 00193 00194 this->policy_list_.length (1); 00195 this->policy_list_[0] = 00196 this->orb_->create_policy ( 00197 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE, 00198 any 00199 ACE_ENV_ARG_PARAMETER); 00200 ACE_TRY_CHECK; 00201 00202 // Only schedule the timer, when the rate is not zero 00203 if (this->rate_ != ACE_Time_Value::zero) 00204 { 00205 // Schedule the timer after these policies has been set, because the 00206 // handle_timeout uses these policies, if done in front, the channel 00207 // can crash when the timeout expires before initiazation is ready. 00208 timer_id_ = this->reactor_->schedule_timer (&this->adapter_, 00209 0, 00210 this->rate_, 00211 this->rate_); 00212 if (timer_id_ == -1) 00213 return -1; 00214 } 00215 } 00216 ACE_CATCHANY 00217 { 00218 return -1; 00219 } 00220 ACE_ENDTRY; 00221 #endif /* TAO_HAS_CORBA_MESSAGING */ 00222 00223 return 0; 00224 } |
|
When pushing an event to the event channel a CORBA::OBJECT_NOT_EXIST exception was raised. The only interpretation is that the object has been destroyed. The strategy has to (at the very least), reclaim all the resources attached to that object. Reimplemented from TAO_ECG_ConsumerEC_Control. Definition at line 240 of file ECG_Reconnect_ConsumerEC_Control.cpp. References ACE_ANY_EXCEPTION, ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_PRINT_EXCEPTION, ACE_TRY, ACE_TRY_CHECK, and is_consumer_ec_connected_. Referenced by query_eventchannel().
00243 { 00244 ACE_TRY 00245 { 00246 //ACE_DEBUG ((LM_DEBUG, 00247 // "ECG_Reconnect_ConsumerControl(%P|%t) - " 00248 // "channel %x does not exists\n")); 00249 is_consumer_ec_connected_ = 0; 00250 00251 gateway->suspend_supplier_ec (ACE_ENV_SINGLE_ARG_PARAMETER); 00252 ACE_TRY_CHECK; 00253 00254 gateway->cleanup_consumer_proxies (ACE_ENV_SINGLE_ARG_PARAMETER); 00255 ACE_TRY_CHECK; 00256 } 00257 ACE_CATCHANY 00258 { 00259 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, 00260 "TAO_EC_Reconnect_ConsumerControl::event_channel_not_exist"); 00261 // Ignore all exceptions.. 00262 } 00263 ACE_ENDTRY; 00264 } |
|
Receive the timeout from the adapter.
Definition at line 124 of file ECG_Reconnect_ConsumerEC_Control.cpp. References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_NEW_ENV, and query_eventchannel(). Referenced by TAO_ECG_Reconnect_ConsumerEC_Control_Adapter::handle_timeout().
00127 { 00128 // NOTE, setting RELATIVE_RT_TIMEOUT_POLICY for the duration of 00129 // query_eventchannel () below has greater impact than desired. For 00130 // example, while we are pinging consumers here, a nested upcall, 00131 // which requires making remote calls may come into the ORB. Those 00132 // remote calls will be carried out with with 00133 // RELATIVE_RT_TIMEOUT_POLICY set here in effect. 00134 // @@ TODO: should use Guard to set and reset policies. 00135 ACE_TRY_NEW_ENV 00136 { 00137 // Query the state of the Current object *before* we initiate 00138 // the iteration... 00139 CORBA::PolicyTypeSeq types; 00140 CORBA::PolicyList_var policies = 00141 this->policy_current_->get_policy_overrides (types 00142 ACE_ENV_ARG_PARAMETER); 00143 ACE_TRY_CHECK; 00144 00145 // Change the timeout 00146 this->policy_current_->set_policy_overrides (this->policy_list_, 00147 CORBA::ADD_OVERRIDE 00148 ACE_ENV_ARG_PARAMETER); 00149 ACE_TRY_CHECK; 00150 00151 // Query the state of the consumers... 00152 this->query_eventchannel (ACE_ENV_SINGLE_ARG_PARAMETER); 00153 ACE_TRY_CHECK; 00154 00155 this->policy_current_->set_policy_overrides (policies.in (), 00156 CORBA::SET_OVERRIDE 00157 ACE_ENV_ARG_PARAMETER); 00158 ACE_TRY_CHECK; 00159 for (CORBA::ULong i = 0; i != policies->length (); ++i) 00160 { 00161 policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); 00162 ACE_TRY_CHECK; 00163 } 00164 } 00165 ACE_CATCHANY 00166 { 00167 // Ignore all exceptions 00168 } 00169 ACE_ENDTRY; 00170 } |
|
Check if the consumers still exists. It is a helper method for handle_timeout() to isolate the exceptions. Definition at line 79 of file ECG_Reconnect_ConsumerEC_Control.cpp. References ACE_CATCH, ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, TAO_EC_Gateway_IIOP::consumer_ec_non_existent(), event_channel_not_exist(), is_consumer_ec_connected_, and try_reconnect(). Referenced by handle_timeout().
00081 { 00082 ACE_TRY 00083 { 00084 if (is_consumer_ec_connected_ == 1) 00085 { 00086 CORBA::Boolean disconnected; 00087 CORBA::Boolean non_existent = 00088 gateway_->consumer_ec_non_existent (disconnected 00089 ACE_ENV_ARG_PARAMETER); 00090 ACE_TRY_CHECK; 00091 if (non_existent && !disconnected) 00092 { 00093 this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER); 00094 ACE_TRY_CHECK; 00095 } 00096 } 00097 else 00098 { 00099 this->try_reconnect(ACE_ENV_SINGLE_ARG_PARAMETER); 00100 ACE_TRY_CHECK; 00101 } 00102 } 00103 ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex) 00104 { 00105 this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER); 00106 ACE_TRY_CHECK; 00107 } 00108 ACE_CATCH (CORBA::TRANSIENT, transient) 00109 { 00110 // This is TAO's minor code for a failed connection, we may 00111 // want to be more lenient in the future.. 00112 // if (transient.minor () == 0x54410085) 00113 this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER); 00114 ACE_TRY_CHECK; 00115 } 00116 ACE_CATCHANY 00117 { 00118 // Ignore all exceptions 00119 } 00120 ACE_ENDTRY; 00121 } |
|
Reconnect to the consumer ec.
Definition at line 61 of file ECG_Reconnect_ConsumerEC_Control.cpp. References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, is_consumer_ec_connected_, and TAO_EC_Gateway_IIOP::reconnect_consumer_ec(). Referenced by try_reconnect().
00063 { 00064 ACE_TRY 00065 { 00066 is_consumer_ec_connected_ = 1; 00067 00068 gateway_->reconnect_consumer_ec(ACE_ENV_SINGLE_ARG_PARAMETER); 00069 ACE_TRY_CHECK; 00070 } 00071 ACE_CATCHANY 00072 { 00073 // Ignore all exceptions 00074 } 00075 ACE_ENDTRY; 00076 } |
|
Shutdown any internal threads or timers used to poll the state of the event channel. Reimplemented from TAO_ECG_ConsumerEC_Control. Definition at line 227 of file ECG_Reconnect_ConsumerEC_Control.cpp. References ACE_Reactor::cancel_timer(), and ACE_Event_Handler::reactor().
00228 { 00229 int r = 0; 00230 00231 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00232 r = this->reactor_->cancel_timer (timer_id_); 00233 #endif /* TAO_HAS_CORBA_MESSAGING */ 00234 00235 this->adapter_.reactor (0); 00236 return r; 00237 } |
|
Some system exception was raised while trying to contact the event channel Reimplemented from TAO_ECG_ConsumerEC_Control. Definition at line 267 of file ECG_Reconnect_ConsumerEC_Control.cpp. References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, TAO_EC_Gateway_IIOP::cleanup_consumer_proxies(), is_consumer_ec_connected_, and TAO_EC_Gateway_IIOP::suspend_supplier_ec().
00271 { 00272 ACE_TRY 00273 { 00274 // The current implementation is very strict, and kicks out a 00275 // client on the first system exception. We may 00276 // want to be more lenient in the future, for example, 00277 // this is TAO's minor code for a failed connection. 00278 // 00279 // if (CORBA::TRANSIENT::_narrow (&exception) != 0 00280 // && exception->minor () == 0x54410085) 00281 // return; 00282 00283 // Anything else is serious, including timeouts... 00284 //ACE_DEBUG ((LM_DEBUG, 00285 // "ECG_Reconnect_ConsumerControl(%P|%t) - " 00286 // "channel %x does not exists system except\n")); 00287 is_consumer_ec_connected_ = 0; 00288 00289 gateway->suspend_supplier_ec (ACE_ENV_SINGLE_ARG_PARAMETER); 00290 ACE_TRY_CHECK; 00291 00292 gateway->cleanup_consumer_proxies (ACE_ENV_SINGLE_ARG_PARAMETER); 00293 ACE_TRY_CHECK; 00294 } 00295 ACE_CATCHANY 00296 { 00297 // Ignore all exceptions.. 00298 } 00299 ACE_ENDTRY; 00300 } |
|
Look if we can reconnect again.
Definition at line 37 of file ECG_Reconnect_ConsumerEC_Control.cpp. References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, TAO_EC_Gateway_IIOP::consumer_ec_non_existent(), and reconnect(). Referenced by query_eventchannel().
00039 { 00040 ACE_TRY 00041 { 00042 CORBA::Boolean disconnected; 00043 CORBA::Boolean non_existent = 00044 gateway_->consumer_ec_non_existent (disconnected 00045 ACE_ENV_ARG_PARAMETER); 00046 ACE_TRY_CHECK; 00047 if (!non_existent) 00048 { 00049 this->reconnect(ACE_ENV_SINGLE_ARG_PARAMETER); 00050 ACE_TRY_CHECK; 00051 } 00052 } 00053 ACE_CATCHANY 00054 { 00055 // Ignore all exceptions 00056 } 00057 ACE_ENDTRY; 00058 } |
|
The Adapter for the reactor events.
Definition at line 117 of file ECG_Reconnect_ConsumerEC_Control.h. |
|
The gateway.
Definition at line 120 of file ECG_Reconnect_ConsumerEC_Control.h. |
|
Do we have a connection to the consumer ec.
Definition at line 135 of file ECG_Reconnect_ConsumerEC_Control.h. Referenced by event_channel_not_exist(), query_eventchannel(), reconnect(), and system_exception(). |
|
The ORB.
Definition at line 123 of file ECG_Reconnect_ConsumerEC_Control.h. |
|
To control the timeout policy in the thread.
Definition at line 126 of file ECG_Reconnect_ConsumerEC_Control.h. |
|
Precomputed policy list to the set timeout.
Definition at line 129 of file ECG_Reconnect_ConsumerEC_Control.h. |
|
The polling rate.
Definition at line 111 of file ECG_Reconnect_ConsumerEC_Control.h. |
|
The ORB reactor.
Definition at line 132 of file ECG_Reconnect_ConsumerEC_Control.h. |
|
The polling timeout.
Definition at line 114 of file ECG_Reconnect_ConsumerEC_Control.h. |
|
The timer id.
Definition at line 139 of file ECG_Reconnect_ConsumerEC_Control.h. |