00001
00002
00003 #include "orbsvcs/Event/EC_Gateway_IIOP.h"
00004 #include "orbsvcs/Event/ECG_Reconnect_ConsumerEC_Control.h"
00005 #include "tao/Messaging/Messaging.h"
00006 #include "tao/ORB_Core.h"
00007 #include "ace/Reactor.h"
00008
00009 ACE_RCSID(Event, ECG_Reconnect_ConsumerEventChannelControl, "$Id: ECG_Reconnect_ConsumerEC_Control.cpp 76626 2007-01-26 13:50:03Z elliott_c $")
00010
00011 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00012
00013 TAO_ECG_Reconnect_ConsumerEC_Control::
00014 TAO_ECG_Reconnect_ConsumerEC_Control (const ACE_Time_Value &rate,
00015 const ACE_Time_Value &timeout,
00016 TAO_EC_Gateway_IIOP* gateway,
00017 CORBA::ORB_ptr orb)
00018 : rate_ (rate),
00019 timeout_ (timeout),
00020 adapter_ (this),
00021 gateway_ (gateway),
00022 orb_ (CORBA::ORB::_duplicate (orb)),
00023 is_consumer_ec_connected_ (1)
00024 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00025 , timer_id_ (-1)
00026 #endif
00027 {
00028 this->reactor_ =
00029 this->orb_->orb_core ()->reactor ();
00030 }
00031
00032 TAO_ECG_Reconnect_ConsumerEC_Control::~TAO_ECG_Reconnect_ConsumerEC_Control (void)
00033 {
00034 }
00035
00036 void
00037 TAO_ECG_Reconnect_ConsumerEC_Control::try_reconnect ()
00038 {
00039 try
00040 {
00041 CORBA::Boolean disconnected;
00042 CORBA::Boolean non_existent =
00043 gateway_->consumer_ec_non_existent (disconnected);
00044 if (!non_existent)
00045 {
00046 this->reconnect();
00047 }
00048 }
00049 catch (const CORBA::Exception&)
00050 {
00051
00052 }
00053 }
00054
00055 void
00056 TAO_ECG_Reconnect_ConsumerEC_Control::reconnect ()
00057 {
00058 try
00059 {
00060 is_consumer_ec_connected_ = 1;
00061
00062 gateway_->reconnect_consumer_ec();
00063 }
00064 catch (const CORBA::Exception&)
00065 {
00066
00067 }
00068 }
00069
00070 void
00071 TAO_ECG_Reconnect_ConsumerEC_Control::query_eventchannel ()
00072 {
00073 try
00074 {
00075 if (is_consumer_ec_connected_ == 1)
00076 {
00077 CORBA::Boolean disconnected;
00078 CORBA::Boolean non_existent =
00079 gateway_->consumer_ec_non_existent (disconnected);
00080 if (non_existent && !disconnected)
00081 {
00082 this->event_channel_not_exist (gateway_);
00083 }
00084 }
00085 else
00086 {
00087 this->try_reconnect();
00088 }
00089 }
00090 catch (const CORBA::OBJECT_NOT_EXIST&)
00091 {
00092 this->event_channel_not_exist (gateway_);
00093 }
00094 catch (const CORBA::TRANSIENT&)
00095 {
00096
00097
00098
00099 this->event_channel_not_exist (gateway_);
00100 }
00101 catch (const CORBA::Exception&)
00102 {
00103
00104 }
00105 }
00106
00107 void
00108 TAO_ECG_Reconnect_ConsumerEC_Control::handle_timeout (
00109 const ACE_Time_Value &,
00110 const void *)
00111 {
00112
00113
00114
00115
00116
00117
00118
00119 try
00120 {
00121
00122
00123 CORBA::PolicyTypeSeq types;
00124 CORBA::PolicyList_var policies =
00125 this->policy_current_->get_policy_overrides (types);
00126
00127
00128 this->policy_current_->set_policy_overrides (this->policy_list_,
00129 CORBA::ADD_OVERRIDE);
00130
00131
00132 this->query_eventchannel ();
00133
00134 this->policy_current_->set_policy_overrides (policies.in (),
00135 CORBA::SET_OVERRIDE);
00136 for (CORBA::ULong i = 0; i != policies->length (); ++i)
00137 {
00138 policies[i]->destroy ();
00139 }
00140 }
00141 catch (const CORBA::Exception&)
00142 {
00143
00144 }
00145 }
00146
00147 int
00148 TAO_ECG_Reconnect_ConsumerEC_Control::activate (void)
00149 {
00150 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00151 try
00152 {
00153
00154 CORBA::Object_var tmp =
00155 this->orb_->resolve_initial_references ("PolicyCurrent");
00156
00157 this->policy_current_ =
00158 CORBA::PolicyCurrent::_narrow (tmp.in ());
00159
00160
00161 TimeBase::TimeT timeout = timeout_.usec() * 10;
00162 CORBA::Any any;
00163 any <<= timeout;
00164
00165 this->policy_list_.length (1);
00166 this->policy_list_[0] =
00167 this->orb_->create_policy (
00168 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00169 any);
00170
00171
00172 if (this->rate_ != ACE_Time_Value::zero)
00173 {
00174
00175
00176
00177 timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00178 0,
00179 this->rate_,
00180 this->rate_);
00181 if (timer_id_ == -1)
00182 return -1;
00183 }
00184 }
00185 catch (const CORBA::Exception&)
00186 {
00187 return -1;
00188 }
00189 #endif
00190
00191 return 0;
00192 }
00193
00194 int
00195 TAO_ECG_Reconnect_ConsumerEC_Control::shutdown (void)
00196 {
00197 int r = 0;
00198
00199 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00200 r = this->reactor_->cancel_timer (timer_id_);
00201 #endif
00202
00203 this->adapter_.reactor (0);
00204 return r;
00205 }
00206
00207 void
00208 TAO_ECG_Reconnect_ConsumerEC_Control::event_channel_not_exist (
00209 TAO_EC_Gateway_IIOP* gateway)
00210 {
00211 try
00212 {
00213
00214
00215
00216 is_consumer_ec_connected_ = 0;
00217
00218 gateway->suspend_supplier_ec ();
00219
00220 gateway->cleanup_consumer_proxies ();
00221 }
00222 catch (const CORBA::Exception& ex)
00223 {
00224 ex._tao_print_exception (
00225 "TAO_EC_Reconnect_ConsumerControl::event_channel_not_exist");
00226
00227 }
00228 }
00229
00230 void
00231 TAO_ECG_Reconnect_ConsumerEC_Control::system_exception (
00232 TAO_EC_Gateway_IIOP* gateway,
00233 CORBA::SystemException & )
00234 {
00235 try
00236 {
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250 is_consumer_ec_connected_ = 0;
00251
00252 gateway->suspend_supplier_ec ();
00253
00254 gateway->cleanup_consumer_proxies ();
00255 }
00256 catch (const CORBA::Exception&)
00257 {
00258
00259 }
00260 }
00261
00262
00263
00264 TAO_ECG_Reconnect_ConsumerEC_Control_Adapter::TAO_ECG_Reconnect_ConsumerEC_Control_Adapter (
00265 TAO_ECG_Reconnect_ConsumerEC_Control *adaptee)
00266 : adaptee_ (adaptee)
00267 {
00268 }
00269
00270 int
00271 TAO_ECG_Reconnect_ConsumerEC_Control_Adapter::handle_timeout (
00272 const ACE_Time_Value &tv,
00273 const void *arg)
00274 {
00275 this->adaptee_->handle_timeout (tv, arg);
00276 return 0;
00277 }
00278
00279 TAO_END_VERSIONED_NAMESPACE_DECL