00001
00002
00003 #include "orbsvcs/Event/EC_Gateway_IIOP.h"
00004 #include "orbsvcs/Event/ECG_Reactive_ConsumerEC_Control.h"
00005 #include "tao/Messaging/Messaging.h"
00006 #include "tao/ORB_Core.h"
00007
00008 #include "ace/Reactor.h"
00009
00010 ACE_RCSID(Event, ECG_Reactive_ConsumerEventChannelControl, "ECG_Reactive_ConsumerEC_Control.cpp,v 1.7 2006/03/14 06:14:25 jtc Exp")
00011
00012 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00013
00014 TAO_ECG_Reactive_ConsumerEC_Control::
00015 TAO_ECG_Reactive_ConsumerEC_Control (const ACE_Time_Value &rate,
00016 const ACE_Time_Value &timeout,
00017 TAO_EC_Gateway_IIOP* gateway,
00018 CORBA::ORB_ptr orb)
00019 : rate_ (rate),
00020 timeout_ (timeout),
00021 adapter_ (this),
00022 gateway_ (gateway),
00023 orb_ (CORBA::ORB::_duplicate (orb))
00024 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00025 , timer_id_ (-1)
00026 #endif
00027 {
00028 this->reactor_ =
00029 this->orb_->orb_core ()->reactor ();
00030 }
00031
00032 TAO_ECG_Reactive_ConsumerEC_Control::~TAO_ECG_Reactive_ConsumerEC_Control (void)
00033 {
00034 }
00035
00036 void
00037 TAO_ECG_Reactive_ConsumerEC_Control::query_eventchannel (
00038 ACE_ENV_SINGLE_ARG_DECL)
00039 {
00040 ACE_TRY
00041 {
00042 CORBA::Boolean disconnected;
00043 CORBA::Boolean non_existent =
00044 gateway_->consumer_ec_non_existent (disconnected
00045 ACE_ENV_ARG_PARAMETER);
00046 ACE_TRY_CHECK;
00047 if (non_existent && !disconnected)
00048 {
00049 this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER);
00050 ACE_TRY_CHECK;
00051 }
00052 }
00053 ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00054 {
00055 this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER);
00056 ACE_TRY_CHECK;
00057 }
00058 ACE_CATCH (CORBA::TRANSIENT, transient)
00059 {
00060
00061
00062
00063 this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER);
00064 ACE_TRY_CHECK;
00065 }
00066 ACE_CATCHANY
00067 {
00068
00069 }
00070 ACE_ENDTRY;
00071 }
00072
00073 void
00074 TAO_ECG_Reactive_ConsumerEC_Control::handle_timeout (
00075 const ACE_Time_Value &,
00076 const void *)
00077 {
00078
00079
00080
00081
00082
00083
00084
00085
00086 ACE_TRY_NEW_ENV
00087 {
00088
00089
00090 CORBA::PolicyTypeSeq types;
00091 CORBA::PolicyList_var policies =
00092 this->policy_current_->get_policy_overrides (types
00093 ACE_ENV_ARG_PARAMETER);
00094 ACE_TRY_CHECK;
00095
00096
00097 this->policy_current_->set_policy_overrides (this->policy_list_,
00098 CORBA::ADD_OVERRIDE
00099 ACE_ENV_ARG_PARAMETER);
00100 ACE_TRY_CHECK;
00101
00102
00103 this->query_eventchannel (ACE_ENV_SINGLE_ARG_PARAMETER);
00104 ACE_TRY_CHECK;
00105
00106 this->policy_current_->set_policy_overrides (policies.in (),
00107 CORBA::SET_OVERRIDE
00108 ACE_ENV_ARG_PARAMETER);
00109 ACE_TRY_CHECK;
00110 for (CORBA::ULong i = 0; i != policies->length (); ++i)
00111 {
00112 policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00113 ACE_TRY_CHECK;
00114 }
00115 }
00116 ACE_CATCHANY
00117 {
00118
00119 }
00120 ACE_ENDTRY;
00121 }
00122
00123 int
00124 TAO_ECG_Reactive_ConsumerEC_Control::activate (void)
00125 {
00126 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00127 ACE_TRY_NEW_ENV
00128 {
00129
00130 CORBA::Object_var tmp =
00131 this->orb_->resolve_initial_references ("PolicyCurrent"
00132 ACE_ENV_ARG_PARAMETER);
00133 ACE_TRY_CHECK;
00134
00135 this->policy_current_ =
00136 CORBA::PolicyCurrent::_narrow (tmp.in ()
00137 ACE_ENV_ARG_PARAMETER);
00138 ACE_TRY_CHECK;
00139
00140
00141 TimeBase::TimeT timeout = timeout_.usec() * 10;
00142 CORBA::Any any;
00143 any <<= timeout;
00144
00145 this->policy_list_.length (1);
00146 this->policy_list_[0] =
00147 this->orb_->create_policy (
00148 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00149 any
00150 ACE_ENV_ARG_PARAMETER);
00151 ACE_TRY_CHECK;
00152
00153
00154 if (this->rate_ != ACE_Time_Value::zero)
00155 {
00156
00157
00158
00159 timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00160 0,
00161 this->rate_,
00162 this->rate_);
00163 if (timer_id_ == -1)
00164 return -1;
00165 }
00166 }
00167 ACE_CATCHANY
00168 {
00169 return -1;
00170 }
00171 ACE_ENDTRY;
00172 #endif
00173
00174 return 0;
00175 }
00176
00177 int
00178 TAO_ECG_Reactive_ConsumerEC_Control::shutdown (void)
00179 {
00180 int r = 0;
00181
00182 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00183 r = this->reactor_->cancel_timer (timer_id_);
00184 #endif
00185
00186 this->adapter_.reactor (0);
00187 return r;
00188 }
00189
00190 void
00191 TAO_ECG_Reactive_ConsumerEC_Control::event_channel_not_exist (
00192 TAO_EC_Gateway_IIOP* gateway
00193 ACE_ENV_ARG_DECL)
00194 {
00195 ACE_TRY
00196 {
00197 ACE_DEBUG ((LM_DEBUG,
00198 "EC_Reactive_ConsumerControl(%P|%t) - "
00199 "channel %x does not exists\n"));
00200 gateway->cleanup_consumer_ec ();
00201
00202 gateway->cleanup_consumer_proxies (ACE_ENV_SINGLE_ARG_PARAMETER);
00203 ACE_TRY_CHECK;
00204
00205 }
00206 ACE_CATCHANY
00207 {
00208 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00209 "TAO_EC_Reactive_ConsumerControl::event_channel_not_exist");
00210
00211 }
00212 ACE_ENDTRY;
00213 }
00214
00215 void
00216 TAO_ECG_Reactive_ConsumerEC_Control::system_exception (
00217 TAO_EC_Gateway_IIOP* gateway,
00218 CORBA::SystemException &
00219 ACE_ENV_ARG_DECL)
00220 {
00221 ACE_TRY
00222 {
00223 gateway->cleanup_consumer_ec ();
00224
00225 gateway->cleanup_consumer_proxies (ACE_ENV_SINGLE_ARG_PARAMETER);
00226 ACE_TRY_CHECK;
00227
00228 }
00229 ACE_CATCHANY
00230 {
00231
00232 }
00233 ACE_ENDTRY;
00234 }
00235
00236
00237
00238 TAO_ECG_Reactive_ConsumerEC_Control_Adapter::TAO_ECG_Reactive_ConsumerEC_Control_Adapter (
00239 TAO_ECG_Reactive_ConsumerEC_Control *adaptee)
00240 : adaptee_ (adaptee)
00241 {
00242 }
00243
00244 int
00245 TAO_ECG_Reactive_ConsumerEC_Control_Adapter::handle_timeout (
00246 const ACE_Time_Value &tv,
00247 const void *arg)
00248 {
00249 this->adaptee_->handle_timeout (tv, arg);
00250 return 0;
00251 }
00252
00253 TAO_END_VERSIONED_NAMESPACE_DECL