00001
00002
00003 #include "orbsvcs/Event/EC_Gateway_IIOP.h"
00004 #include "orbsvcs/Event/ECG_Reactive_ConsumerEC_Control.h"
00005 #include "tao/Messaging/Messaging.h"
00006 #include "tao/ORB_Core.h"
00007
00008 #include "ace/Reactor.h"
00009
00010 ACE_RCSID(Event, ECG_Reactive_ConsumerEventChannelControl, "$Id: ECG_Reactive_ConsumerEC_Control.cpp 76626 2007-01-26 13:50:03Z elliott_c $")
00011
00012 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00013
00014 TAO_ECG_Reactive_ConsumerEC_Control::
00015 TAO_ECG_Reactive_ConsumerEC_Control (const ACE_Time_Value &rate,
00016 const ACE_Time_Value &timeout,
00017 TAO_EC_Gateway_IIOP* gateway,
00018 CORBA::ORB_ptr orb)
00019 : rate_ (rate),
00020 timeout_ (timeout),
00021 adapter_ (this),
00022 gateway_ (gateway),
00023 orb_ (CORBA::ORB::_duplicate (orb))
00024 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00025 , timer_id_ (-1)
00026 #endif
00027 {
00028 this->reactor_ =
00029 this->orb_->orb_core ()->reactor ();
00030 }
00031
00032 TAO_ECG_Reactive_ConsumerEC_Control::~TAO_ECG_Reactive_ConsumerEC_Control (void)
00033 {
00034 }
00035
00036 void
00037 TAO_ECG_Reactive_ConsumerEC_Control::query_eventchannel ()
00038 {
00039 try
00040 {
00041 CORBA::Boolean disconnected;
00042 CORBA::Boolean non_existent =
00043 gateway_->consumer_ec_non_existent (disconnected);
00044 if (non_existent && !disconnected)
00045 {
00046 this->event_channel_not_exist (gateway_);
00047 }
00048 }
00049 catch (const CORBA::OBJECT_NOT_EXIST&)
00050 {
00051 this->event_channel_not_exist (gateway_);
00052 }
00053 catch (const CORBA::TRANSIENT&)
00054 {
00055
00056
00057
00058 this->event_channel_not_exist (gateway_);
00059 }
00060 catch (const CORBA::Exception&)
00061 {
00062
00063 }
00064 }
00065
00066 void
00067 TAO_ECG_Reactive_ConsumerEC_Control::handle_timeout (
00068 const ACE_Time_Value &,
00069 const void *)
00070 {
00071
00072
00073
00074
00075
00076
00077
00078
00079 try
00080 {
00081
00082
00083 CORBA::PolicyTypeSeq types;
00084 CORBA::PolicyList_var policies =
00085 this->policy_current_->get_policy_overrides (types);
00086
00087
00088 this->policy_current_->set_policy_overrides (this->policy_list_,
00089 CORBA::ADD_OVERRIDE);
00090
00091
00092 this->query_eventchannel ();
00093
00094 this->policy_current_->set_policy_overrides (policies.in (),
00095 CORBA::SET_OVERRIDE);
00096 for (CORBA::ULong i = 0; i != policies->length (); ++i)
00097 {
00098 policies[i]->destroy ();
00099 }
00100 }
00101 catch (const CORBA::Exception&)
00102 {
00103
00104 }
00105 }
00106
00107 int
00108 TAO_ECG_Reactive_ConsumerEC_Control::activate (void)
00109 {
00110 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00111 try
00112 {
00113
00114 CORBA::Object_var tmp =
00115 this->orb_->resolve_initial_references ("PolicyCurrent");
00116
00117 this->policy_current_ =
00118 CORBA::PolicyCurrent::_narrow (tmp.in ());
00119
00120
00121 TimeBase::TimeT timeout = timeout_.usec() * 10;
00122 CORBA::Any any;
00123 any <<= timeout;
00124
00125 this->policy_list_.length (1);
00126 this->policy_list_[0] =
00127 this->orb_->create_policy (
00128 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00129 any);
00130
00131
00132 if (this->rate_ != ACE_Time_Value::zero)
00133 {
00134
00135
00136
00137 timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00138 0,
00139 this->rate_,
00140 this->rate_);
00141 if (timer_id_ == -1)
00142 return -1;
00143 }
00144 }
00145 catch (const CORBA::Exception&)
00146 {
00147 return -1;
00148 }
00149 #endif
00150
00151 return 0;
00152 }
00153
00154 int
00155 TAO_ECG_Reactive_ConsumerEC_Control::shutdown (void)
00156 {
00157 int r = 0;
00158
00159 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00160 r = this->reactor_->cancel_timer (timer_id_);
00161 #endif
00162
00163 this->adapter_.reactor (0);
00164 return r;
00165 }
00166
00167 void
00168 TAO_ECG_Reactive_ConsumerEC_Control::event_channel_not_exist (
00169 TAO_EC_Gateway_IIOP* gateway)
00170 {
00171 try
00172 {
00173 ACE_DEBUG ((LM_DEBUG,
00174 "EC_Reactive_ConsumerControl(%P|%t) - "
00175 "channel %x does not exists\n"));
00176 gateway->cleanup_consumer_ec ();
00177
00178 gateway->cleanup_consumer_proxies ();
00179
00180 }
00181 catch (const CORBA::Exception& ex)
00182 {
00183 ex._tao_print_exception (
00184 "TAO_EC_Reactive_ConsumerControl::event_channel_not_exist");
00185
00186 }
00187 }
00188
00189 void
00190 TAO_ECG_Reactive_ConsumerEC_Control::system_exception (
00191 TAO_EC_Gateway_IIOP* gateway,
00192 CORBA::SystemException & )
00193 {
00194 try
00195 {
00196 gateway->cleanup_consumer_ec ();
00197
00198 gateway->cleanup_consumer_proxies ();
00199
00200 }
00201 catch (const CORBA::Exception&)
00202 {
00203
00204 }
00205 }
00206
00207
00208
00209 TAO_ECG_Reactive_ConsumerEC_Control_Adapter::TAO_ECG_Reactive_ConsumerEC_Control_Adapter (
00210 TAO_ECG_Reactive_ConsumerEC_Control *adaptee)
00211 : adaptee_ (adaptee)
00212 {
00213 }
00214
00215 int
00216 TAO_ECG_Reactive_ConsumerEC_Control_Adapter::handle_timeout (
00217 const ACE_Time_Value &tv,
00218 const void *arg)
00219 {
00220 this->adaptee_->handle_timeout (tv, arg);
00221 return 0;
00222 }
00223
00224 TAO_END_VERSIONED_NAMESPACE_DECL