00001
00002
00003 #include "orbsvcs/Event/EC_Gateway_IIOP.h"
00004 #include "orbsvcs/Event/ECG_Reconnect_ConsumerEC_Control.h"
00005 #include "tao/Messaging/Messaging.h"
00006 #include "tao/ORB_Core.h"
00007 #include "ace/Reactor.h"
00008
00009 ACE_RCSID(Event, ECG_Reconnect_ConsumerEventChannelControl, "ECG_Reconnect_ConsumerEC_Control.cpp,v 1.7 2006/03/14 06:14:25 jtc Exp")
00010
00011 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00012
00013 TAO_ECG_Reconnect_ConsumerEC_Control::
00014 TAO_ECG_Reconnect_ConsumerEC_Control (const ACE_Time_Value &rate,
00015 const ACE_Time_Value &timeout,
00016 TAO_EC_Gateway_IIOP* gateway,
00017 CORBA::ORB_ptr orb)
00018 : rate_ (rate),
00019 timeout_ (timeout),
00020 adapter_ (this),
00021 gateway_ (gateway),
00022 orb_ (CORBA::ORB::_duplicate (orb)),
00023 is_consumer_ec_connected_ (1)
00024 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00025 , timer_id_ (-1)
00026 #endif
00027 {
00028 this->reactor_ =
00029 this->orb_->orb_core ()->reactor ();
00030 }
00031
00032 TAO_ECG_Reconnect_ConsumerEC_Control::~TAO_ECG_Reconnect_ConsumerEC_Control (void)
00033 {
00034 }
00035
00036 void
00037 TAO_ECG_Reconnect_ConsumerEC_Control::try_reconnect (
00038 ACE_ENV_SINGLE_ARG_DECL)
00039 {
00040 ACE_TRY
00041 {
00042 CORBA::Boolean disconnected;
00043 CORBA::Boolean non_existent =
00044 gateway_->consumer_ec_non_existent (disconnected
00045 ACE_ENV_ARG_PARAMETER);
00046 ACE_TRY_CHECK;
00047 if (!non_existent)
00048 {
00049 this->reconnect(ACE_ENV_SINGLE_ARG_PARAMETER);
00050 ACE_TRY_CHECK;
00051 }
00052 }
00053 ACE_CATCHANY
00054 {
00055
00056 }
00057 ACE_ENDTRY;
00058 }
00059
00060 void
00061 TAO_ECG_Reconnect_ConsumerEC_Control::reconnect (
00062 ACE_ENV_SINGLE_ARG_DECL)
00063 {
00064 ACE_TRY
00065 {
00066 is_consumer_ec_connected_ = 1;
00067
00068 gateway_->reconnect_consumer_ec(ACE_ENV_SINGLE_ARG_PARAMETER);
00069 ACE_TRY_CHECK;
00070 }
00071 ACE_CATCHANY
00072 {
00073
00074 }
00075 ACE_ENDTRY;
00076 }
00077
00078 void
00079 TAO_ECG_Reconnect_ConsumerEC_Control::query_eventchannel (
00080 ACE_ENV_SINGLE_ARG_DECL)
00081 {
00082 ACE_TRY
00083 {
00084 if (is_consumer_ec_connected_ == 1)
00085 {
00086 CORBA::Boolean disconnected;
00087 CORBA::Boolean non_existent =
00088 gateway_->consumer_ec_non_existent (disconnected
00089 ACE_ENV_ARG_PARAMETER);
00090 ACE_TRY_CHECK;
00091 if (non_existent && !disconnected)
00092 {
00093 this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER);
00094 ACE_TRY_CHECK;
00095 }
00096 }
00097 else
00098 {
00099 this->try_reconnect(ACE_ENV_SINGLE_ARG_PARAMETER);
00100 ACE_TRY_CHECK;
00101 }
00102 }
00103 ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00104 {
00105 this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER);
00106 ACE_TRY_CHECK;
00107 }
00108 ACE_CATCH (CORBA::TRANSIENT, transient)
00109 {
00110
00111
00112
00113 this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER);
00114 ACE_TRY_CHECK;
00115 }
00116 ACE_CATCHANY
00117 {
00118
00119 }
00120 ACE_ENDTRY;
00121 }
00122
00123 void
00124 TAO_ECG_Reconnect_ConsumerEC_Control::handle_timeout (
00125 const ACE_Time_Value &,
00126 const void *)
00127 {
00128
00129
00130
00131
00132
00133
00134
00135 ACE_TRY_NEW_ENV
00136 {
00137
00138
00139 CORBA::PolicyTypeSeq types;
00140 CORBA::PolicyList_var policies =
00141 this->policy_current_->get_policy_overrides (types
00142 ACE_ENV_ARG_PARAMETER);
00143 ACE_TRY_CHECK;
00144
00145
00146 this->policy_current_->set_policy_overrides (this->policy_list_,
00147 CORBA::ADD_OVERRIDE
00148 ACE_ENV_ARG_PARAMETER);
00149 ACE_TRY_CHECK;
00150
00151
00152 this->query_eventchannel (ACE_ENV_SINGLE_ARG_PARAMETER);
00153 ACE_TRY_CHECK;
00154
00155 this->policy_current_->set_policy_overrides (policies.in (),
00156 CORBA::SET_OVERRIDE
00157 ACE_ENV_ARG_PARAMETER);
00158 ACE_TRY_CHECK;
00159 for (CORBA::ULong i = 0; i != policies->length (); ++i)
00160 {
00161 policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00162 ACE_TRY_CHECK;
00163 }
00164 }
00165 ACE_CATCHANY
00166 {
00167
00168 }
00169 ACE_ENDTRY;
00170 }
00171
00172 int
00173 TAO_ECG_Reconnect_ConsumerEC_Control::activate (void)
00174 {
00175 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00176 ACE_TRY_NEW_ENV
00177 {
00178
00179 CORBA::Object_var tmp =
00180 this->orb_->resolve_initial_references ("PolicyCurrent"
00181 ACE_ENV_ARG_PARAMETER);
00182 ACE_TRY_CHECK;
00183
00184 this->policy_current_ =
00185 CORBA::PolicyCurrent::_narrow (tmp.in ()
00186 ACE_ENV_ARG_PARAMETER);
00187 ACE_TRY_CHECK;
00188
00189
00190 TimeBase::TimeT timeout = timeout_.usec() * 10;
00191 CORBA::Any any;
00192 any <<= timeout;
00193
00194 this->policy_list_.length (1);
00195 this->policy_list_[0] =
00196 this->orb_->create_policy (
00197 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00198 any
00199 ACE_ENV_ARG_PARAMETER);
00200 ACE_TRY_CHECK;
00201
00202
00203 if (this->rate_ != ACE_Time_Value::zero)
00204 {
00205
00206
00207
00208 timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00209 0,
00210 this->rate_,
00211 this->rate_);
00212 if (timer_id_ == -1)
00213 return -1;
00214 }
00215 }
00216 ACE_CATCHANY
00217 {
00218 return -1;
00219 }
00220 ACE_ENDTRY;
00221 #endif
00222
00223 return 0;
00224 }
00225
00226 int
00227 TAO_ECG_Reconnect_ConsumerEC_Control::shutdown (void)
00228 {
00229 int r = 0;
00230
00231 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00232 r = this->reactor_->cancel_timer (timer_id_);
00233 #endif
00234
00235 this->adapter_.reactor (0);
00236 return r;
00237 }
00238
00239 void
00240 TAO_ECG_Reconnect_ConsumerEC_Control::event_channel_not_exist (
00241 TAO_EC_Gateway_IIOP* gateway
00242 ACE_ENV_ARG_DECL)
00243 {
00244 ACE_TRY
00245 {
00246
00247
00248
00249 is_consumer_ec_connected_ = 0;
00250
00251 gateway->suspend_supplier_ec (ACE_ENV_SINGLE_ARG_PARAMETER);
00252 ACE_TRY_CHECK;
00253
00254 gateway->cleanup_consumer_proxies (ACE_ENV_SINGLE_ARG_PARAMETER);
00255 ACE_TRY_CHECK;
00256 }
00257 ACE_CATCHANY
00258 {
00259 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00260 "TAO_EC_Reconnect_ConsumerControl::event_channel_not_exist");
00261
00262 }
00263 ACE_ENDTRY;
00264 }
00265
00266 void
00267 TAO_ECG_Reconnect_ConsumerEC_Control::system_exception (
00268 TAO_EC_Gateway_IIOP* gateway,
00269 CORBA::SystemException &
00270 ACE_ENV_ARG_DECL)
00271 {
00272 ACE_TRY
00273 {
00274
00275
00276
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287 is_consumer_ec_connected_ = 0;
00288
00289 gateway->suspend_supplier_ec (ACE_ENV_SINGLE_ARG_PARAMETER);
00290 ACE_TRY_CHECK;
00291
00292 gateway->cleanup_consumer_proxies (ACE_ENV_SINGLE_ARG_PARAMETER);
00293 ACE_TRY_CHECK;
00294 }
00295 ACE_CATCHANY
00296 {
00297
00298 }
00299 ACE_ENDTRY;
00300 }
00301
00302
00303
00304 TAO_ECG_Reconnect_ConsumerEC_Control_Adapter::TAO_ECG_Reconnect_ConsumerEC_Control_Adapter (
00305 TAO_ECG_Reconnect_ConsumerEC_Control *adaptee)
00306 : adaptee_ (adaptee)
00307 {
00308 }
00309
00310 int
00311 TAO_ECG_Reconnect_ConsumerEC_Control_Adapter::handle_timeout (
00312 const ACE_Time_Value &tv,
00313 const void *arg)
00314 {
00315 this->adaptee_->handle_timeout (tv, arg);
00316 return 0;
00317 }
00318
00319 TAO_END_VERSIONED_NAMESPACE_DECL