00001
00002
00003 #include "orbsvcs/Event/EC_SupplierAdmin.h"
00004 #include "orbsvcs/Event/EC_Reactive_SupplierControl.h"
00005 #include "orbsvcs/Event/EC_Event_Channel_Base.h"
00006 #include "orbsvcs/Event/EC_ProxyConsumer.h"
00007 #include "orbsvcs/Event/EC_ProxySupplier.h"
00008
00009 #include "tao/Messaging/Messaging.h"
00010 #include "tao/ORB_Core.h"
00011
00012 #include "ace/Reactor.h"
00013
00014 #if ! defined (__ACE_INLINE__)
00015 #include "orbsvcs/Event/EC_Reactive_SupplierControl.i"
00016 #endif
00017
00018 ACE_RCSID(Event, EC_Reactive_SupplierControl, "EC_Reactive_SupplierControl.cpp,v 1.26 2006/03/14 06:14:25 jtc Exp")
00019
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021
00022 TAO_EC_Reactive_SupplierControl::
00023 TAO_EC_Reactive_SupplierControl (const ACE_Time_Value &rate,
00024 const ACE_Time_Value &timeout,
00025 TAO_EC_Event_Channel_Base *ec,
00026 CORBA::ORB_ptr orb)
00027 : rate_ (rate),
00028 timeout_ (timeout),
00029 adapter_ (this),
00030 event_channel_ (ec),
00031 orb_ (CORBA::ORB::_duplicate (orb))
00032 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00033 , timer_id_ (-1)
00034 #endif
00035 {
00036 this->reactor_ =
00037 this->orb_->orb_core ()->reactor ();
00038 }
00039
00040 TAO_EC_Reactive_SupplierControl::~TAO_EC_Reactive_SupplierControl (void)
00041 {
00042 }
00043
00044 void
00045 TAO_EC_Reactive_SupplierControl::query_suppliers (
00046 ACE_ENV_SINGLE_ARG_DECL)
00047 {
00048 TAO_EC_Ping_Supplier worker (this);
00049 this->event_channel_->for_each_supplier (&worker
00050 ACE_ENV_ARG_PARAMETER);
00051 ACE_CHECK;
00052 }
00053
00054 void
00055 TAO_EC_Reactive_SupplierControl::handle_timeout (
00056 const ACE_Time_Value &,
00057 const void *)
00058 {
00059 ACE_TRY_NEW_ENV
00060 {
00061
00062
00063 CORBA::PolicyTypeSeq types;
00064 CORBA::PolicyList_var policies =
00065 this->policy_current_->get_policy_overrides (types
00066 ACE_ENV_ARG_PARAMETER);
00067 ACE_TRY_CHECK;
00068
00069
00070 this->policy_current_->set_policy_overrides (this->policy_list_,
00071 CORBA::ADD_OVERRIDE
00072 ACE_ENV_ARG_PARAMETER);
00073 ACE_TRY_CHECK;
00074
00075
00076 this->query_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
00077 ACE_TRY_CHECK;
00078
00079 this->policy_current_->set_policy_overrides (policies.in (),
00080 CORBA::SET_OVERRIDE
00081 ACE_ENV_ARG_PARAMETER);
00082 ACE_TRY_CHECK;
00083 for (CORBA::ULong i = 0; i != policies->length (); ++i)
00084 {
00085 policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00086 ACE_TRY_CHECK;
00087 }
00088 }
00089 ACE_CATCHANY
00090 {
00091
00092 }
00093 ACE_ENDTRY;
00094 }
00095
00096 int
00097 TAO_EC_Reactive_SupplierControl::activate (void)
00098 {
00099 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00100 ACE_TRY_NEW_ENV
00101 {
00102
00103 CORBA::Object_var tmp =
00104 this->orb_->resolve_initial_references ("PolicyCurrent"
00105 ACE_ENV_ARG_PARAMETER);
00106 ACE_TRY_CHECK;
00107
00108 this->policy_current_ =
00109 CORBA::PolicyCurrent::_narrow (tmp.in ()
00110 ACE_ENV_ARG_PARAMETER);
00111 ACE_TRY_CHECK;
00112
00113
00114 TimeBase::TimeT timeout = timeout_.usec() * 10;
00115 CORBA::Any any;
00116 any <<= timeout;
00117
00118 this->policy_list_.length (1);
00119 this->policy_list_[0] =
00120 this->orb_->create_policy (
00121 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00122 any
00123 ACE_ENV_ARG_PARAMETER);
00124 ACE_TRY_CHECK;
00125
00126
00127 if (this->rate_ != ACE_Time_Value::zero)
00128 {
00129
00130
00131
00132 timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00133 0,
00134 this->rate_,
00135 this->rate_);
00136 if (timer_id_ == -1)
00137 return -1;
00138 }
00139 }
00140 ACE_CATCHANY
00141 {
00142 return -1;
00143 }
00144 ACE_ENDTRY;
00145 #endif
00146
00147 return 0;
00148 }
00149
00150 int
00151 TAO_EC_Reactive_SupplierControl::shutdown (void)
00152 {
00153 int r = 0;
00154
00155 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00156 r = this->reactor_->cancel_timer (timer_id_);
00157 #endif
00158 this->adapter_.reactor (0);
00159 return r;
00160 }
00161
00162 void
00163 TAO_EC_Reactive_SupplierControl::supplier_not_exist (
00164 TAO_EC_ProxyPushConsumer *proxy
00165 ACE_ENV_ARG_DECL)
00166 {
00167 ACE_TRY
00168 {
00169
00170
00171
00172 proxy->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00173 ACE_TRY_CHECK;
00174 }
00175 ACE_CATCHANY
00176 {
00177
00178 }
00179 ACE_ENDTRY;
00180 }
00181
00182 void
00183 TAO_EC_Reactive_SupplierControl::system_exception (
00184 TAO_EC_ProxyPushConsumer *proxy,
00185 CORBA::SystemException &
00186 ACE_ENV_ARG_DECL)
00187 {
00188 ACE_TRY
00189 {
00190
00191
00192
00193
00194
00195
00196
00197
00198
00199
00200 proxy->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00201 ACE_TRY_CHECK;
00202 }
00203 ACE_CATCHANY
00204 {
00205
00206 }
00207 ACE_ENDTRY;
00208 }
00209
00210
00211
00212 TAO_EC_SupplierControl_Adapter::TAO_EC_SupplierControl_Adapter (
00213 TAO_EC_Reactive_SupplierControl *adaptee)
00214 : adaptee_ (adaptee)
00215 {
00216 }
00217
00218 int
00219 TAO_EC_SupplierControl_Adapter::handle_timeout (
00220 const ACE_Time_Value &tv,
00221 const void *arg)
00222 {
00223 this->adaptee_->handle_timeout (tv, arg);
00224 return 0;
00225 }
00226
00227
00228
00229 void
00230 TAO_EC_Ping_Supplier::work (TAO_EC_ProxyPushConsumer *consumer
00231 ACE_ENV_ARG_DECL)
00232 {
00233 ACE_TRY
00234 {
00235 CORBA::Boolean disconnected;
00236 CORBA::Boolean non_existent =
00237 consumer->supplier_non_existent (disconnected
00238 ACE_ENV_ARG_PARAMETER);
00239 ACE_TRY_CHECK;
00240 if (non_existent && !disconnected)
00241 {
00242 this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00243 ACE_TRY_CHECK;
00244 }
00245 }
00246 ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00247 {
00248 this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00249 ACE_TRY_CHECK;
00250 }
00251 ACE_CATCH (CORBA::TRANSIENT, transient)
00252 {
00253
00254
00255
00256 this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00257 ACE_TRY_CHECK;
00258 }
00259 ACE_CATCHANY
00260 {
00261
00262 }
00263 ACE_ENDTRY;
00264 }
00265
00266 TAO_END_VERSIONED_NAMESPACE_DECL