00001
00002
00003
00004
00005
00006
00007 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00008 #include "orbsvcs/CosEvent/CEC_SupplierAdmin.h"
00009 #include "orbsvcs/CosEvent/CEC_ProxyPushConsumer.h"
00010 #include "orbsvcs/CosEvent/CEC_Reactive_SupplierControl.h"
00011
00012 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00013 #include "orbsvcs/CosEvent/CEC_TypedEventChannel.h"
00014 #include "orbsvcs/CosEvent/CEC_TypedSupplierAdmin.h"
00015 #include "orbsvcs/CosEvent/CEC_TypedProxyPushConsumer.h"
00016 #endif
00017
00018 #include "orbsvcs/CosEvent/CEC_ProxyPullConsumer.h"
00019
00020 #include "orbsvcs/Time_Utilities.h"
00021
00022 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00023 #include "tao/Messaging/Messaging.h"
00024 #endif
00025
00026 #include "tao/ORB_Core.h"
00027
00028 #include "ace/Reactor.h"
00029
00030 #if ! defined (__ACE_INLINE__)
00031 #include "orbsvcs/CosEvent/CEC_Reactive_SupplierControl.inl"
00032 #endif
00033
00034 ACE_RCSID (CosEvent,
00035 CEC_Reactive_SupplierControl,
00036 "$Id: CEC_Reactive_SupplierControl.cpp 76626 2007-01-26 13:50:03Z elliott_c $")
00037
00038 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00039
00040
00041 TAO_CEC_Reactive_SupplierControl::
00042 TAO_CEC_Reactive_SupplierControl (const ACE_Time_Value &rate,
00043 const ACE_Time_Value &timeout,
00044 unsigned int retries,
00045 TAO_CEC_EventChannel *ec,
00046 CORBA::ORB_ptr orb)
00047 : rate_ (rate),
00048 timeout_ (timeout),
00049 retries_ (retries),
00050 adapter_ (this),
00051 event_channel_ (ec),
00052 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00053 typed_event_channel_ (0),
00054 #endif
00055 orb_ (CORBA::ORB::_duplicate (orb))
00056 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00057
00058
00059 , timer_id_ (-1)
00060 #endif
00061 {
00062 this->reactor_ =
00063 this->orb_->orb_core ()->reactor ();
00064 }
00065
00066
00067 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00068 TAO_CEC_Reactive_SupplierControl::
00069 TAO_CEC_Reactive_SupplierControl (const ACE_Time_Value &rate,
00070 const ACE_Time_Value &timeout,
00071 unsigned int retries,
00072 TAO_CEC_TypedEventChannel *ec,
00073 CORBA::ORB_ptr orb)
00074 : rate_ (rate),
00075 timeout_ (timeout),
00076 retries_ (retries),
00077 adapter_ (this),
00078 event_channel_ (0),
00079 typed_event_channel_ (ec),
00080 orb_ (CORBA::ORB::_duplicate (orb))
00081 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00082
00083
00084 , timer_id_ (-1)
00085 #endif
00086 {
00087 this->reactor_ =
00088 this->orb_->orb_core ()->reactor ();
00089 }
00090 #endif
00091
00092 TAO_CEC_Reactive_SupplierControl::~TAO_CEC_Reactive_SupplierControl (void)
00093 {
00094 }
00095
00096 void
00097 TAO_CEC_Reactive_SupplierControl::query_suppliers ()
00098 {
00099 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00100 if (this->typed_event_channel_)
00101 {
00102
00103 TAO_CEC_Ping_Typed_Push_Supplier push_worker (this);
00104
00105 this->typed_event_channel_->typed_supplier_admin ()->for_each (&push_worker);
00106 }
00107 else
00108 {
00109 #endif
00110
00111
00112 TAO_CEC_Ping_Push_Supplier push_worker (this);
00113 this->event_channel_->supplier_admin ()->for_each (&push_worker);
00114
00115 TAO_CEC_Ping_Pull_Supplier pull_worker (this);
00116 this->event_channel_->supplier_admin ()->for_each (&pull_worker);
00117
00118 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00119 }
00120 #endif
00121 }
00122
00123 bool
00124 TAO_CEC_Reactive_SupplierControl::need_to_disconnect (
00125 PortableServer::ServantBase* proxy)
00126 {
00127 bool disconnect = true;
00128
00129 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00130 if (this->typed_event_channel_)
00131 {
00132
00133 TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00134 if (this->typed_event_channel_->
00135 get_servant_retry_map ().find (proxy, entry) == 0)
00136 {
00137 ++entry->int_id_;
00138 if (entry->int_id_ <= this->retries_)
00139 {
00140 disconnect = false;
00141 }
00142 }
00143 }
00144 else
00145 {
00146 #endif
00147
00148
00149 TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00150 if (this->event_channel_->
00151 get_servant_retry_map ().find (proxy, entry) == 0)
00152 {
00153 ++entry->int_id_;
00154 if (entry->int_id_ <= this->retries_)
00155 {
00156 disconnect = false;
00157 }
00158 }
00159
00160 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00161 }
00162 #endif
00163
00164 return disconnect;
00165 }
00166
00167 void
00168 TAO_CEC_Reactive_SupplierControl::successful_transmission (
00169 PortableServer::ServantBase* proxy)
00170 {
00171 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00172 if (this->typed_event_channel_)
00173 {
00174
00175 TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00176 if (this->typed_event_channel_->
00177 get_servant_retry_map ().find (proxy, entry) == 0)
00178 {
00179 entry->int_id_ = 0;
00180 }
00181 }
00182 else
00183 {
00184 #endif
00185
00186
00187 TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00188 if (this->event_channel_->
00189 get_servant_retry_map ().find (proxy, entry) == 0)
00190 {
00191 entry->int_id_ = 0;
00192 }
00193
00194 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00195 }
00196 #endif
00197
00198 }
00199
00200 void
00201 TAO_CEC_Reactive_SupplierControl::handle_timeout (
00202 const ACE_Time_Value &,
00203 const void *)
00204 {
00205 try
00206 {
00207
00208
00209 CORBA::PolicyTypeSeq types;
00210 CORBA::PolicyList_var policies =
00211 this->policy_current_->get_policy_overrides (types);
00212
00213
00214 this->policy_current_->set_policy_overrides (this->policy_list_,
00215 CORBA::ADD_OVERRIDE);
00216
00217 try
00218 {
00219
00220 this->query_suppliers ();
00221 }
00222 catch (const CORBA::Exception&)
00223 {
00224
00225 }
00226
00227 this->policy_current_->set_policy_overrides (policies.in (),
00228 CORBA::SET_OVERRIDE);
00229 for (CORBA::ULong i = 0; i != policies->length (); ++i)
00230 {
00231 policies[i]->destroy ();
00232 }
00233 }
00234 catch (const CORBA::Exception&)
00235 {
00236
00237 }
00238 }
00239
00240 int
00241 TAO_CEC_Reactive_SupplierControl::activate (void)
00242 {
00243 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00244 try
00245 {
00246
00247 CORBA::Object_var tmp =
00248 this->orb_->resolve_initial_references ("PolicyCurrent");
00249
00250 this->policy_current_ =
00251 CORBA::PolicyCurrent::_narrow (tmp.in ());
00252
00253
00254
00255
00256 TimeBase::TimeT timeout;
00257 ORBSVCS_Time::Time_Value_to_TimeT (timeout,
00258 this->timeout_);
00259 CORBA::Any any;
00260 any <<= timeout;
00261
00262 this->policy_list_.length (1);
00263 this->policy_list_[0] =
00264 this->orb_->create_policy (
00265 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00266 any);
00267
00268
00269 if (this->rate_ != ACE_Time_Value::zero)
00270 {
00271
00272
00273
00274 timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00275 0,
00276 this->rate_,
00277 this->rate_);
00278 if (timer_id_ == -1)
00279 return -1;
00280 }
00281 }
00282 catch (const CORBA::Exception&)
00283 {
00284 return -1;
00285 }
00286 #endif
00287
00288 return 0;
00289 }
00290
00291 int
00292 TAO_CEC_Reactive_SupplierControl::shutdown (void)
00293 {
00294 int r = 0;
00295
00296 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00297 r = this->reactor_->cancel_timer (timer_id_);
00298 #endif
00299 this->adapter_.reactor (0);
00300 return r;
00301 }
00302
00303 void
00304 TAO_CEC_Reactive_SupplierControl::supplier_not_exist (
00305 TAO_CEC_ProxyPushConsumer *proxy)
00306 {
00307 try
00308 {
00309 proxy->disconnect_push_consumer ();
00310 }
00311 catch (const CORBA::Exception&)
00312 {
00313
00314 }
00315 }
00316
00317 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00318 void
00319 TAO_CEC_Reactive_SupplierControl::supplier_not_exist (
00320 TAO_CEC_TypedProxyPushConsumer *proxy)
00321 {
00322 try
00323 {
00324 proxy->disconnect_push_consumer ();
00325 }
00326 catch (const CORBA::Exception&)
00327 {
00328
00329 }
00330 }
00331 #endif
00332
00333 void
00334 TAO_CEC_Reactive_SupplierControl::supplier_not_exist (
00335 TAO_CEC_ProxyPullConsumer *proxy)
00336 {
00337 try
00338 {
00339 proxy->disconnect_pull_consumer ();
00340 }
00341 catch (const CORBA::Exception&)
00342 {
00343
00344 }
00345 }
00346
00347 void
00348 TAO_CEC_Reactive_SupplierControl::system_exception (
00349 TAO_CEC_ProxyPullConsumer *proxy,
00350 CORBA::SystemException & )
00351 {
00352 try
00353 {
00354 if (this->need_to_disconnect (proxy))
00355 {
00356 proxy->disconnect_pull_consumer ();
00357 }
00358 }
00359 catch (const CORBA::Exception&)
00360 {
00361
00362 }
00363 }
00364
00365
00366
00367 TAO_CEC_SupplierControl_Adapter::TAO_CEC_SupplierControl_Adapter (
00368 TAO_CEC_Reactive_SupplierControl *adaptee)
00369 : adaptee_ (adaptee)
00370 {
00371 }
00372
00373 int
00374 TAO_CEC_SupplierControl_Adapter::handle_timeout (
00375 const ACE_Time_Value &tv,
00376 const void *arg)
00377 {
00378 this->adaptee_->handle_timeout (tv, arg);
00379 return 0;
00380 }
00381
00382
00383
00384 void
00385 TAO_CEC_Ping_Push_Supplier::work (TAO_CEC_ProxyPushConsumer *consumer)
00386 {
00387 try
00388 {
00389 CORBA::Boolean disconnected;
00390 CORBA::Boolean non_existent =
00391 consumer->supplier_non_existent (disconnected);
00392 if (non_existent && !disconnected)
00393 {
00394 this->control_->supplier_not_exist (consumer);
00395 }
00396 }
00397 catch (const CORBA::OBJECT_NOT_EXIST& )
00398 {
00399 this->control_->supplier_not_exist (consumer);
00400 }
00401 catch (const CORBA::TRANSIENT& )
00402 {
00403 if (this->control_->need_to_disconnect (consumer))
00404 {
00405 this->control_->supplier_not_exist (consumer);
00406 }
00407 }
00408 catch (const CORBA::Exception&)
00409 {
00410
00411 }
00412 }
00413
00414
00415
00416 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00417 void
00418 TAO_CEC_Ping_Typed_Push_Supplier::work (TAO_CEC_TypedProxyPushConsumer *consumer)
00419 {
00420 try
00421 {
00422 CORBA::Boolean disconnected;
00423 CORBA::Boolean non_existent =
00424 consumer->supplier_non_existent (disconnected);
00425 if (non_existent && !disconnected)
00426 {
00427 this->control_->supplier_not_exist (consumer);
00428 }
00429 }
00430 catch (const CORBA::OBJECT_NOT_EXIST& )
00431 {
00432 this->control_->supplier_not_exist (consumer);
00433 }
00434 catch (const CORBA::TRANSIENT& )
00435 {
00436 if (this->control_->need_to_disconnect (consumer))
00437 {
00438 this->control_->supplier_not_exist (consumer);
00439 }
00440 }
00441 catch (const CORBA::Exception&)
00442 {
00443
00444 }
00445 }
00446 #endif
00447
00448
00449
00450 void
00451 TAO_CEC_Ping_Pull_Supplier::work (TAO_CEC_ProxyPullConsumer *consumer)
00452 {
00453 try
00454 {
00455 CORBA::Boolean disconnected;
00456 CORBA::Boolean non_existent =
00457 consumer->supplier_non_existent (disconnected);
00458 if (non_existent && !disconnected)
00459 {
00460 this->control_->supplier_not_exist (consumer);
00461 }
00462 }
00463 catch (const CORBA::OBJECT_NOT_EXIST& )
00464 {
00465 this->control_->supplier_not_exist (consumer);
00466 }
00467 catch (const CORBA::TRANSIENT& )
00468 {
00469 if (this->control_->need_to_disconnect (consumer))
00470 {
00471 this->control_->supplier_not_exist (consumer);
00472 }
00473 }
00474 catch (const CORBA::Exception&)
00475 {
00476
00477 }
00478 }
00479
00480 TAO_END_VERSIONED_NAMESPACE_DECL