00001
00002
00003
00004
00005
00006
00007 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00008 #include "orbsvcs/CosEvent/CEC_SupplierAdmin.h"
00009 #include "orbsvcs/CosEvent/CEC_ProxyPushConsumer.h"
00010 #include "orbsvcs/CosEvent/CEC_Reactive_SupplierControl.h"
00011
00012 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00013 #include "orbsvcs/CosEvent/CEC_TypedEventChannel.h"
00014 #include "orbsvcs/CosEvent/CEC_TypedSupplierAdmin.h"
00015 #include "orbsvcs/CosEvent/CEC_TypedProxyPushConsumer.h"
00016 #endif
00017
00018 #include "orbsvcs/CosEvent/CEC_ProxyPullConsumer.h"
00019
00020 #include "orbsvcs/Time_Utilities.h"
00021
00022 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00023 #include "tao/Messaging/Messaging.h"
00024 #endif
00025
00026 #include "tao/ORB_Core.h"
00027
00028 #include "ace/Reactor.h"
00029
00030 #if ! defined (__ACE_INLINE__)
00031 #include "orbsvcs/CosEvent/CEC_Reactive_SupplierControl.i"
00032 #endif
00033
00034 ACE_RCSID (CosEvent,
00035 CEC_Reactive_SupplierControl,
00036 "CEC_Reactive_SupplierControl.cpp,v 1.24 2006/03/14 06:14:25 jtc Exp")
00037
00038 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00039
00040
00041 TAO_CEC_Reactive_SupplierControl::
00042 TAO_CEC_Reactive_SupplierControl (const ACE_Time_Value &rate,
00043 const ACE_Time_Value &timeout,
00044 unsigned int retries,
00045 TAO_CEC_EventChannel *ec,
00046 CORBA::ORB_ptr orb)
00047 : rate_ (rate),
00048 timeout_ (timeout),
00049 retries_ (retries),
00050 adapter_ (this),
00051 event_channel_ (ec),
00052 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00053 typed_event_channel_ (0),
00054 #endif
00055 orb_ (CORBA::ORB::_duplicate (orb))
00056 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00057
00058
00059 , timer_id_ (-1)
00060 #endif
00061 {
00062 this->reactor_ =
00063 this->orb_->orb_core ()->reactor ();
00064 }
00065
00066
00067 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00068 TAO_CEC_Reactive_SupplierControl::
00069 TAO_CEC_Reactive_SupplierControl (const ACE_Time_Value &rate,
00070 const ACE_Time_Value &timeout,
00071 unsigned int retries,
00072 TAO_CEC_TypedEventChannel *ec,
00073 CORBA::ORB_ptr orb)
00074 : rate_ (rate),
00075 timeout_ (timeout),
00076 retries_ (retries),
00077 adapter_ (this),
00078 event_channel_ (0),
00079 typed_event_channel_ (ec),
00080 orb_ (CORBA::ORB::_duplicate (orb))
00081 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00082
00083
00084 , timer_id_ (-1)
00085 #endif
00086 {
00087 this->reactor_ =
00088 this->orb_->orb_core ()->reactor ();
00089 }
00090 #endif
00091
00092 TAO_CEC_Reactive_SupplierControl::~TAO_CEC_Reactive_SupplierControl (void)
00093 {
00094 }
00095
00096 void
00097 TAO_CEC_Reactive_SupplierControl::query_suppliers (
00098 ACE_ENV_SINGLE_ARG_DECL)
00099 {
00100 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00101 if (this->typed_event_channel_)
00102 {
00103
00104 TAO_CEC_Ping_Typed_Push_Supplier push_worker (this);
00105
00106 this->typed_event_channel_->typed_supplier_admin ()->for_each (&push_worker
00107 ACE_ENV_ARG_PARAMETER);
00108 ACE_CHECK;
00109 }
00110 else
00111 {
00112 #endif
00113
00114
00115 TAO_CEC_Ping_Push_Supplier push_worker (this);
00116 this->event_channel_->supplier_admin ()->for_each (&push_worker
00117 ACE_ENV_ARG_PARAMETER);
00118 ACE_CHECK;
00119
00120 TAO_CEC_Ping_Pull_Supplier pull_worker (this);
00121 this->event_channel_->supplier_admin ()->for_each (&pull_worker
00122 ACE_ENV_ARG_PARAMETER);
00123 ACE_CHECK;
00124
00125 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00126 }
00127 #endif
00128 }
00129
00130 bool
00131 TAO_CEC_Reactive_SupplierControl::need_to_disconnect (
00132 PortableServer::ServantBase* proxy)
00133 {
00134 bool disconnect = true;
00135
00136 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00137 if (this->typed_event_channel_)
00138 {
00139
00140 TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00141 if (this->typed_event_channel_->
00142 get_servant_retry_map ().find (proxy, entry) == 0)
00143 {
00144 ++entry->int_id_;
00145 if (entry->int_id_ <= this->retries_)
00146 {
00147 disconnect = false;
00148 }
00149 }
00150 }
00151 else
00152 {
00153 #endif
00154
00155
00156 TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00157 if (this->event_channel_->
00158 get_servant_retry_map ().find (proxy, entry) == 0)
00159 {
00160 ++entry->int_id_;
00161 if (entry->int_id_ <= this->retries_)
00162 {
00163 disconnect = false;
00164 }
00165 }
00166
00167 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00168 }
00169 #endif
00170
00171 return disconnect;
00172 }
00173
00174 void
00175 TAO_CEC_Reactive_SupplierControl::successful_transmission (
00176 PortableServer::ServantBase* proxy)
00177 {
00178 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00179 if (this->typed_event_channel_)
00180 {
00181
00182 TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00183 if (this->typed_event_channel_->
00184 get_servant_retry_map ().find (proxy, entry) == 0)
00185 {
00186 entry->int_id_ = 0;
00187 }
00188 }
00189 else
00190 {
00191 #endif
00192
00193
00194 TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00195 if (this->event_channel_->
00196 get_servant_retry_map ().find (proxy, entry) == 0)
00197 {
00198 entry->int_id_ = 0;
00199 }
00200
00201 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00202 }
00203 #endif
00204
00205 }
00206
00207 void
00208 TAO_CEC_Reactive_SupplierControl::handle_timeout (
00209 const ACE_Time_Value &,
00210 const void *)
00211 {
00212 ACE_TRY_NEW_ENV
00213 {
00214
00215
00216 CORBA::PolicyTypeSeq types;
00217 CORBA::PolicyList_var policies =
00218 this->policy_current_->get_policy_overrides (types
00219 ACE_ENV_ARG_PARAMETER);
00220 ACE_TRY_CHECK;
00221
00222
00223 this->policy_current_->set_policy_overrides (this->policy_list_,
00224 CORBA::ADD_OVERRIDE
00225 ACE_ENV_ARG_PARAMETER);
00226 ACE_TRY_CHECK;
00227
00228 ACE_TRY_EX (query)
00229 {
00230
00231 this->query_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
00232 ACE_TRY_CHECK_EX (query);
00233 }
00234 ACE_CATCHANY
00235 {
00236
00237 }
00238 ACE_ENDTRY;
00239
00240 this->policy_current_->set_policy_overrides (policies.in (),
00241 CORBA::SET_OVERRIDE
00242 ACE_ENV_ARG_PARAMETER);
00243 ACE_TRY_CHECK;
00244 for (CORBA::ULong i = 0; i != policies->length (); ++i)
00245 {
00246 policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00247 ACE_TRY_CHECK;
00248 }
00249 }
00250 ACE_CATCHANY
00251 {
00252
00253 }
00254 ACE_ENDTRY;
00255 }
00256
00257 int
00258 TAO_CEC_Reactive_SupplierControl::activate (void)
00259 {
00260 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00261 ACE_TRY_NEW_ENV
00262 {
00263
00264 CORBA::Object_var tmp =
00265 this->orb_->resolve_initial_references ("PolicyCurrent"
00266 ACE_ENV_ARG_PARAMETER);
00267 ACE_TRY_CHECK;
00268
00269 this->policy_current_ =
00270 CORBA::PolicyCurrent::_narrow (tmp.in ()
00271 ACE_ENV_ARG_PARAMETER);
00272 ACE_TRY_CHECK;
00273
00274
00275
00276
00277 TimeBase::TimeT timeout;
00278 ORBSVCS_Time::Time_Value_to_TimeT (timeout,
00279 this->timeout_);
00280 CORBA::Any any;
00281 any <<= timeout;
00282
00283 this->policy_list_.length (1);
00284 this->policy_list_[0] =
00285 this->orb_->create_policy (
00286 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00287 any
00288 ACE_ENV_ARG_PARAMETER);
00289 ACE_TRY_CHECK;
00290
00291
00292 if (this->rate_ != ACE_Time_Value::zero)
00293 {
00294
00295
00296
00297 timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00298 0,
00299 this->rate_,
00300 this->rate_);
00301 if (timer_id_ == -1)
00302 return -1;
00303 }
00304 }
00305 ACE_CATCHANY
00306 {
00307 return -1;
00308 }
00309 ACE_ENDTRY;
00310 #endif
00311
00312 return 0;
00313 }
00314
00315 int
00316 TAO_CEC_Reactive_SupplierControl::shutdown (void)
00317 {
00318 int r = 0;
00319
00320 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00321 r = this->reactor_->cancel_timer (timer_id_);
00322 #endif
00323 this->adapter_.reactor (0);
00324 return r;
00325 }
00326
00327 void
00328 TAO_CEC_Reactive_SupplierControl::supplier_not_exist (
00329 TAO_CEC_ProxyPushConsumer *proxy
00330 ACE_ENV_ARG_DECL)
00331 {
00332 ACE_TRY
00333 {
00334 proxy->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00335 ACE_TRY_CHECK;
00336 }
00337 ACE_CATCHANY
00338 {
00339
00340 }
00341 ACE_ENDTRY;
00342 }
00343
00344 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00345 void
00346 TAO_CEC_Reactive_SupplierControl::supplier_not_exist (
00347 TAO_CEC_TypedProxyPushConsumer *proxy
00348 ACE_ENV_ARG_DECL)
00349 {
00350 ACE_TRY
00351 {
00352 proxy->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00353 ACE_TRY_CHECK;
00354 }
00355 ACE_CATCHANY
00356 {
00357
00358 }
00359 ACE_ENDTRY;
00360 }
00361 #endif
00362
00363 void
00364 TAO_CEC_Reactive_SupplierControl::supplier_not_exist (
00365 TAO_CEC_ProxyPullConsumer *proxy
00366 ACE_ENV_ARG_DECL)
00367 {
00368 ACE_TRY
00369 {
00370 proxy->disconnect_pull_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00371 ACE_TRY_CHECK;
00372 }
00373 ACE_CATCHANY
00374 {
00375
00376 }
00377 ACE_ENDTRY;
00378 }
00379
00380 void
00381 TAO_CEC_Reactive_SupplierControl::system_exception (
00382 TAO_CEC_ProxyPullConsumer *proxy,
00383 CORBA::SystemException &
00384 ACE_ENV_ARG_DECL)
00385 {
00386 ACE_TRY
00387 {
00388 if (this->need_to_disconnect (proxy))
00389 {
00390 proxy->disconnect_pull_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00391 ACE_TRY_CHECK;
00392 }
00393 }
00394 ACE_CATCHANY
00395 {
00396
00397 }
00398 ACE_ENDTRY;
00399 }
00400
00401
00402
00403 TAO_CEC_SupplierControl_Adapter::TAO_CEC_SupplierControl_Adapter (
00404 TAO_CEC_Reactive_SupplierControl *adaptee)
00405 : adaptee_ (adaptee)
00406 {
00407 }
00408
00409 int
00410 TAO_CEC_SupplierControl_Adapter::handle_timeout (
00411 const ACE_Time_Value &tv,
00412 const void *arg)
00413 {
00414 this->adaptee_->handle_timeout (tv, arg);
00415 return 0;
00416 }
00417
00418
00419
00420 void
00421 TAO_CEC_Ping_Push_Supplier::work (TAO_CEC_ProxyPushConsumer *consumer
00422 ACE_ENV_ARG_DECL)
00423 {
00424 ACE_TRY
00425 {
00426 CORBA::Boolean disconnected;
00427 CORBA::Boolean non_existent =
00428 consumer->supplier_non_existent (disconnected
00429 ACE_ENV_ARG_PARAMETER);
00430 ACE_TRY_CHECK;
00431 if (non_existent && !disconnected)
00432 {
00433 this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00434 ACE_TRY_CHECK;
00435 }
00436 }
00437 ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00438 {
00439 this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00440 ACE_TRY_CHECK;
00441 }
00442 ACE_CATCH (CORBA::TRANSIENT, transient)
00443 {
00444 if (this->control_->need_to_disconnect (consumer))
00445 {
00446 this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00447 ACE_TRY_CHECK;
00448 }
00449 }
00450 ACE_CATCHANY
00451 {
00452
00453 }
00454 ACE_ENDTRY;
00455 }
00456
00457
00458
00459 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00460 void
00461 TAO_CEC_Ping_Typed_Push_Supplier::work (TAO_CEC_TypedProxyPushConsumer *consumer
00462 ACE_ENV_ARG_DECL)
00463 {
00464 ACE_TRY
00465 {
00466 CORBA::Boolean disconnected;
00467 CORBA::Boolean non_existent =
00468 consumer->supplier_non_existent (disconnected
00469 ACE_ENV_ARG_PARAMETER);
00470 ACE_TRY_CHECK;
00471 if (non_existent && !disconnected)
00472 {
00473 this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00474 ACE_TRY_CHECK;
00475 }
00476 }
00477 ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00478 {
00479 this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00480 ACE_TRY_CHECK;
00481 }
00482 ACE_CATCH (CORBA::TRANSIENT, transient)
00483 {
00484 if (this->control_->need_to_disconnect (consumer))
00485 {
00486 this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00487 ACE_TRY_CHECK;
00488 }
00489 }
00490 ACE_CATCHANY
00491 {
00492
00493 }
00494 ACE_ENDTRY;
00495 }
00496 #endif
00497
00498
00499
00500 void
00501 TAO_CEC_Ping_Pull_Supplier::work (TAO_CEC_ProxyPullConsumer *consumer
00502 ACE_ENV_ARG_DECL)
00503 {
00504 ACE_TRY
00505 {
00506 CORBA::Boolean disconnected;
00507 CORBA::Boolean non_existent =
00508 consumer->supplier_non_existent (disconnected
00509 ACE_ENV_ARG_PARAMETER);
00510 ACE_TRY_CHECK;
00511 if (non_existent && !disconnected)
00512 {
00513 this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00514 ACE_TRY_CHECK;
00515 }
00516 }
00517 ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00518 {
00519 this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00520 ACE_TRY_CHECK;
00521 }
00522 ACE_CATCH (CORBA::TRANSIENT, transient)
00523 {
00524 if (this->control_->need_to_disconnect (consumer))
00525 {
00526 this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00527 ACE_TRY_CHECK;
00528 }
00529 }
00530 ACE_CATCHANY
00531 {
00532
00533 }
00534 ACE_ENDTRY;
00535 }
00536
00537 TAO_END_VERSIONED_NAMESPACE_DECL