00001
00002
00003 #include "orbsvcs/Event/EC_ConsumerAdmin.h"
00004 #include "orbsvcs/Event/EC_Reactive_ConsumerControl.h"
00005 #include "orbsvcs/Event/EC_Event_Channel_Base.h"
00006 #include "orbsvcs/Event/EC_ProxySupplier.h"
00007 #include "orbsvcs/Event/EC_ProxyConsumer.h"
00008
00009 #include "tao/Messaging/Messaging.h"
00010 #include "tao/ORB_Core.h"
00011
00012 #include "ace/Reactor.h"
00013
00014 #if ! defined (__ACE_INLINE__)
00015 #include "orbsvcs/Event/EC_Reactive_ConsumerControl.i"
00016 #endif
00017
00018 ACE_RCSID(Event, EC_Reactive_ConsumerControl, "EC_Reactive_ConsumerControl.cpp,v 1.25 2006/03/14 06:14:25 jtc Exp")
00019
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021
00022 TAO_EC_Reactive_ConsumerControl::
00023 TAO_EC_Reactive_ConsumerControl (const ACE_Time_Value &rate,
00024 const ACE_Time_Value &timeout,
00025 TAO_EC_Event_Channel_Base *ec,
00026 CORBA::ORB_ptr orb)
00027 : rate_ (rate),
00028 timeout_ (timeout),
00029 adapter_ (this),
00030 event_channel_ (ec),
00031 orb_ (CORBA::ORB::_duplicate (orb))
00032 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00033 , timer_id_ (-1)
00034 #endif
00035 {
00036 this->reactor_ =
00037 this->orb_->orb_core ()->reactor ();
00038 }
00039
00040 TAO_EC_Reactive_ConsumerControl::~TAO_EC_Reactive_ConsumerControl (void)
00041 {
00042 }
00043
00044 void
00045 TAO_EC_Reactive_ConsumerControl::query_consumers (
00046 ACE_ENV_SINGLE_ARG_DECL)
00047 {
00048 TAO_EC_Ping_Consumer worker (this);
00049 this->event_channel_->for_each_consumer (&worker
00050 ACE_ENV_ARG_PARAMETER);
00051 ACE_CHECK;
00052 }
00053
00054 void
00055 TAO_EC_Reactive_ConsumerControl::handle_timeout (
00056 const ACE_Time_Value &,
00057 const void *)
00058 {
00059
00060
00061
00062
00063
00064
00065
00066
00067 ACE_TRY_NEW_ENV
00068 {
00069
00070
00071 CORBA::PolicyTypeSeq types;
00072 CORBA::PolicyList_var policies =
00073 this->policy_current_->get_policy_overrides (types
00074 ACE_ENV_ARG_PARAMETER);
00075 ACE_TRY_CHECK;
00076
00077
00078 this->policy_current_->set_policy_overrides (this->policy_list_,
00079 CORBA::ADD_OVERRIDE
00080 ACE_ENV_ARG_PARAMETER);
00081 ACE_TRY_CHECK;
00082
00083
00084 this->query_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
00085 ACE_TRY_CHECK;
00086
00087 this->policy_current_->set_policy_overrides (policies.in (),
00088 CORBA::SET_OVERRIDE
00089 ACE_ENV_ARG_PARAMETER);
00090 ACE_TRY_CHECK;
00091 for (CORBA::ULong i = 0; i != policies->length (); ++i)
00092 {
00093 policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00094 ACE_TRY_CHECK;
00095 }
00096 }
00097 ACE_CATCHANY
00098 {
00099
00100 }
00101 ACE_ENDTRY;
00102 }
00103
00104 int
00105 TAO_EC_Reactive_ConsumerControl::activate (void)
00106 {
00107 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00108 ACE_TRY_NEW_ENV
00109 {
00110
00111 CORBA::Object_var tmp =
00112 this->orb_->resolve_initial_references ("PolicyCurrent"
00113 ACE_ENV_ARG_PARAMETER);
00114 ACE_TRY_CHECK;
00115
00116 this->policy_current_ =
00117 CORBA::PolicyCurrent::_narrow (tmp.in ()
00118 ACE_ENV_ARG_PARAMETER);
00119 ACE_TRY_CHECK;
00120
00121
00122 TimeBase::TimeT timeout = timeout_.usec() * 10;
00123 CORBA::Any any;
00124 any <<= timeout;
00125
00126 this->policy_list_.length (1);
00127 this->policy_list_[0] =
00128 this->orb_->create_policy (
00129 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00130 any
00131 ACE_ENV_ARG_PARAMETER);
00132 ACE_TRY_CHECK;
00133
00134
00135 if (this->rate_ != ACE_Time_Value::zero)
00136 {
00137
00138
00139
00140 timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00141 0,
00142 this->rate_,
00143 this->rate_);
00144 if (timer_id_ == -1)
00145 return -1;
00146 }
00147 }
00148 ACE_CATCHANY
00149 {
00150 return -1;
00151 }
00152 ACE_ENDTRY;
00153 #endif
00154
00155 return 0;
00156 }
00157
00158 int
00159 TAO_EC_Reactive_ConsumerControl::shutdown (void)
00160 {
00161 int r = 0;
00162
00163 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00164 r = this->reactor_->cancel_timer (timer_id_);
00165 #endif
00166 this->adapter_.reactor (0);
00167 return r;
00168 }
00169
00170 void
00171 TAO_EC_Reactive_ConsumerControl::consumer_not_exist (
00172 TAO_EC_ProxyPushSupplier *proxy
00173 ACE_ENV_ARG_DECL)
00174 {
00175 ACE_TRY
00176 {
00177
00178
00179
00180 proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00181 ACE_TRY_CHECK;
00182 }
00183 ACE_CATCHANY
00184 {
00185 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00186 "Reactive_ConsumerControl::consumer_not_exist");
00187
00188 }
00189 ACE_ENDTRY;
00190 }
00191
00192 void
00193 TAO_EC_Reactive_ConsumerControl::system_exception (
00194 TAO_EC_ProxyPushSupplier *proxy,
00195 CORBA::SystemException &
00196 ACE_ENV_ARG_DECL)
00197 {
00198 ACE_TRY
00199 {
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210 proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00211 ACE_TRY_CHECK;
00212 }
00213 ACE_CATCHANY
00214 {
00215
00216 }
00217 ACE_ENDTRY;
00218 }
00219
00220
00221
00222 TAO_EC_ConsumerControl_Adapter::TAO_EC_ConsumerControl_Adapter (
00223 TAO_EC_Reactive_ConsumerControl *adaptee)
00224 : adaptee_ (adaptee)
00225 {
00226 }
00227
00228 int
00229 TAO_EC_ConsumerControl_Adapter::handle_timeout (
00230 const ACE_Time_Value &tv,
00231 const void *arg)
00232 {
00233 this->adaptee_->handle_timeout (tv, arg);
00234 return 0;
00235 }
00236
00237
00238
00239 void
00240 TAO_EC_Ping_Consumer::work (TAO_EC_ProxyPushSupplier *supplier
00241 ACE_ENV_ARG_DECL)
00242 {
00243 ACE_TRY
00244 {
00245 CORBA::Boolean disconnected;
00246 CORBA::Boolean non_existent =
00247 supplier->consumer_non_existent (disconnected
00248 ACE_ENV_ARG_PARAMETER);
00249 ACE_TRY_CHECK;
00250 if (non_existent && !disconnected)
00251 {
00252 this->control_->consumer_not_exist (supplier ACE_ENV_ARG_PARAMETER);
00253 ACE_TRY_CHECK;
00254 }
00255 }
00256 ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00257 {
00258 this->control_->consumer_not_exist (supplier ACE_ENV_ARG_PARAMETER);
00259 ACE_TRY_CHECK;
00260 }
00261 ACE_CATCH (CORBA::TRANSIENT, transient)
00262 {
00263
00264
00265
00266 this->control_->consumer_not_exist (supplier ACE_ENV_ARG_PARAMETER);
00267 ACE_TRY_CHECK;
00268 }
00269 ACE_CATCHANY
00270 {
00271
00272 }
00273 ACE_ENDTRY;
00274 }
00275
00276 TAO_END_VERSIONED_NAMESPACE_DECL