00001
00002
00003
00004
00005
00006
00007 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00008 #include "orbsvcs/CosEvent/CEC_ConsumerAdmin.h"
00009 #include "orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.h"
00010
00011 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00012 #include "orbsvcs/CosEvent/CEC_TypedEventChannel.h"
00013 #include "orbsvcs/CosEvent/CEC_TypedConsumerAdmin.h"
00014 #endif
00015
00016 #include "orbsvcs/CosEvent/CEC_ProxyPushSupplier.h"
00017 #include "orbsvcs/CosEvent/CEC_ProxyPullSupplier.h"
00018
00019 #include "orbsvcs/Time_Utilities.h"
00020
00021 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00022 #include "tao/Messaging/Messaging.h"
00023 #endif
00024
00025 #include "tao/ORB_Core.h"
00026 #include "tao/debug.h"
00027 #include "ace/Reactor.h"
00028
00029 #if ! defined (__ACE_INLINE__)
00030 #include "orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.i"
00031 #endif
00032
00033 ACE_RCSID (CosEvent,
00034 CEC_Reactive_ConsumerControl,
00035 "CEC_Reactive_ConsumerControl.cpp,v 1.24 2006/03/14 06:14:25 jtc Exp")
00036
00037 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00038
00039
00040 TAO_CEC_Reactive_ConsumerControl::
00041 TAO_CEC_Reactive_ConsumerControl (const ACE_Time_Value &rate,
00042 const ACE_Time_Value &timeout,
00043 unsigned int retries,
00044 TAO_CEC_EventChannel *ec,
00045 CORBA::ORB_ptr orb)
00046 : rate_ (rate),
00047 timeout_ (timeout),
00048 retries_ (retries),
00049 adapter_ (this),
00050 event_channel_ (ec),
00051 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00052 typed_event_channel_ (0),
00053 #endif
00054 orb_ (CORBA::ORB::_duplicate (orb))
00055 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00056
00057
00058 , timer_id_ (-1)
00059 #endif
00060 {
00061 this->reactor_ =
00062 this->orb_->orb_core ()->reactor ();
00063 }
00064
00065
00066 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00067 TAO_CEC_Reactive_ConsumerControl::
00068 TAO_CEC_Reactive_ConsumerControl (const ACE_Time_Value &rate,
00069 const ACE_Time_Value &timeout,
00070 unsigned int retries,
00071 TAO_CEC_TypedEventChannel *ec,
00072 CORBA::ORB_ptr orb)
00073 : rate_ (rate),
00074 timeout_ (timeout),
00075 retries_ (retries),
00076 adapter_ (this),
00077 event_channel_ (0),
00078 typed_event_channel_ (ec),
00079 orb_ (CORBA::ORB::_duplicate (orb))
00080 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00081
00082
00083 , timer_id_ (-1)
00084 #endif
00085 {
00086 this->reactor_ =
00087 this->orb_->orb_core ()->reactor ();
00088 }
00089 #endif
00090
00091 TAO_CEC_Reactive_ConsumerControl::~TAO_CEC_Reactive_ConsumerControl (void)
00092 {
00093 }
00094
00095 void
00096 TAO_CEC_Reactive_ConsumerControl::query_consumers (
00097 ACE_ENV_SINGLE_ARG_DECL)
00098 {
00099 TAO_CEC_Ping_Push_Consumer push_worker (this);
00100
00101 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00102 if (this->typed_event_channel_)
00103 {
00104
00105 this->typed_event_channel_->typed_consumer_admin ()->for_each (&push_worker
00106 ACE_ENV_ARG_PARAMETER);
00107 ACE_CHECK;
00108 }
00109 else
00110 {
00111 #endif
00112
00113
00114 this->event_channel_->consumer_admin ()->for_each (&push_worker
00115 ACE_ENV_ARG_PARAMETER);
00116 ACE_CHECK;
00117
00118 TAO_CEC_Ping_Pull_Consumer pull_worker (this);
00119 this->event_channel_->consumer_admin ()->for_each (&pull_worker
00120 ACE_ENV_ARG_PARAMETER);
00121 ACE_CHECK;
00122
00123 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00124 }
00125 #endif
00126 }
00127
00128 bool
00129 TAO_CEC_Reactive_ConsumerControl::need_to_disconnect (
00130 PortableServer::ServantBase* proxy)
00131 {
00132 bool disconnect = true;
00133
00134 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00135 if (this->typed_event_channel_)
00136 {
00137
00138 TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00139 if (this->typed_event_channel_->
00140 get_servant_retry_map ().find (proxy, entry) == 0)
00141 {
00142 ++entry->int_id_;
00143 if (entry->int_id_ <= this->retries_)
00144 {
00145 disconnect = false;
00146 }
00147 }
00148 }
00149 else
00150 {
00151 #endif
00152
00153
00154 TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00155 if (this->event_channel_->
00156 get_servant_retry_map ().find (proxy, entry) == 0)
00157 {
00158 ++entry->int_id_;
00159 if (entry->int_id_ <= this->retries_)
00160 {
00161 disconnect = false;
00162 }
00163 }
00164
00165 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00166 }
00167 #endif
00168
00169 return disconnect;
00170 }
00171
00172 void
00173 TAO_CEC_Reactive_ConsumerControl::successful_transmission (
00174 PortableServer::ServantBase* proxy)
00175 {
00176
00177 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00178 if (this->typed_event_channel_)
00179 {
00180
00181 TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00182 if (this->typed_event_channel_->
00183 get_servant_retry_map ().find (proxy, entry) == 0)
00184 {
00185 entry->int_id_ = 0;
00186 }
00187 }
00188 else
00189 {
00190 #endif
00191
00192
00193 TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00194 if (this->event_channel_->
00195 get_servant_retry_map ().find (proxy, entry) == 0)
00196 {
00197 entry->int_id_ = 0;
00198 }
00199
00200 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00201 }
00202 #endif
00203
00204 }
00205
00206 void
00207 TAO_CEC_Reactive_ConsumerControl::handle_timeout (
00208 const ACE_Time_Value &,
00209 const void *)
00210 {
00211 ACE_TRY_NEW_ENV
00212 {
00213
00214
00215 CORBA::PolicyTypeSeq types;
00216 CORBA::PolicyList_var policies =
00217 this->policy_current_->get_policy_overrides (types
00218 ACE_ENV_ARG_PARAMETER);
00219 ACE_TRY_CHECK;
00220
00221
00222 this->policy_current_->set_policy_overrides (this->policy_list_,
00223 CORBA::ADD_OVERRIDE
00224 ACE_ENV_ARG_PARAMETER);
00225 ACE_TRY_CHECK;
00226
00227 ACE_TRY_EX (query)
00228 {
00229
00230 this->query_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
00231 ACE_TRY_CHECK_EX (query);
00232 }
00233 ACE_CATCHANY
00234 {
00235
00236 }
00237 ACE_ENDTRY;
00238
00239 this->policy_current_->set_policy_overrides (policies.in (),
00240 CORBA::SET_OVERRIDE
00241 ACE_ENV_ARG_PARAMETER);
00242 ACE_TRY_CHECK;
00243 for (CORBA::ULong i = 0; i != policies->length (); ++i)
00244 {
00245 policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00246 ACE_TRY_CHECK;
00247 }
00248 }
00249 ACE_CATCHANY
00250 {
00251
00252 }
00253 ACE_ENDTRY;
00254 }
00255
00256 int
00257 TAO_CEC_Reactive_ConsumerControl::activate (void)
00258 {
00259 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00260 ACE_TRY_NEW_ENV
00261 {
00262
00263 CORBA::Object_var tmp =
00264 this->orb_->resolve_initial_references ("PolicyCurrent"
00265 ACE_ENV_ARG_PARAMETER);
00266 ACE_TRY_CHECK;
00267
00268 this->policy_current_ =
00269 CORBA::PolicyCurrent::_narrow (tmp.in ()
00270 ACE_ENV_ARG_PARAMETER);
00271 ACE_TRY_CHECK;
00272
00273
00274
00275
00276 TimeBase::TimeT timeout;
00277 ORBSVCS_Time::Time_Value_to_TimeT (timeout,
00278 this->timeout_);
00279 CORBA::Any any;
00280 any <<= timeout;
00281
00282 this->policy_list_.length (1);
00283 this->policy_list_[0] =
00284 this->orb_->create_policy (
00285 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00286 any
00287 ACE_ENV_ARG_PARAMETER);
00288 ACE_TRY_CHECK;
00289
00290
00291 if (this->rate_ != ACE_Time_Value::zero)
00292 {
00293
00294
00295
00296 timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00297 0,
00298 this->rate_,
00299 this->rate_);
00300 if (timer_id_ == -1)
00301 return -1;
00302 }
00303 }
00304 ACE_CATCHANY
00305 {
00306 return -1;
00307 }
00308 ACE_ENDTRY;
00309 #endif
00310
00311 return 0;
00312 }
00313
00314 int
00315 TAO_CEC_Reactive_ConsumerControl::shutdown (void)
00316 {
00317 int r = 0;
00318
00319 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00320 r = this->reactor_->cancel_timer (timer_id_);
00321 #endif
00322 this->adapter_.reactor (0);
00323 return r;
00324 }
00325
00326 void
00327 TAO_CEC_Reactive_ConsumerControl::consumer_not_exist (
00328 TAO_CEC_ProxyPushSupplier *proxy
00329 ACE_ENV_ARG_DECL)
00330 {
00331 ACE_TRY
00332 {
00333 proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00334 ACE_TRY_CHECK;
00335
00336 if (TAO_debug_level >= 10)
00337 {
00338 ACE_DEBUG ((LM_DEBUG,
00339 ACE_TEXT ("ProxyPushSupplier disconnected due to consumer_not_exist\n")));
00340 }
00341 }
00342 ACE_CATCHANY
00343 {
00344 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00345 ACE_TEXT ("Reactive_ConsumerControl::consumer_not_exist"));
00346
00347 }
00348 ACE_ENDTRY;
00349 }
00350
00351 void
00352 TAO_CEC_Reactive_ConsumerControl::consumer_not_exist (
00353 TAO_CEC_ProxyPullSupplier *proxy
00354 ACE_ENV_ARG_DECL)
00355 {
00356 ACE_TRY
00357 {
00358 proxy->disconnect_pull_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00359 ACE_TRY_CHECK;
00360 }
00361 ACE_CATCHANY
00362 {
00363 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00364 ACE_TEXT ("Reactive_ConsumerControl::consumer_not_exist"));
00365
00366 }
00367 ACE_ENDTRY;
00368 }
00369
00370 void
00371 TAO_CEC_Reactive_ConsumerControl::system_exception (
00372 TAO_CEC_ProxyPushSupplier *proxy,
00373 CORBA::SystemException &
00374 ACE_ENV_ARG_DECL)
00375 {
00376 ACE_TRY
00377 {
00378 if (this->need_to_disconnect (proxy))
00379 {
00380 proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00381 ACE_TRY_CHECK;
00382
00383 if (TAO_debug_level >= 10)
00384 {
00385 ACE_DEBUG ((LM_DEBUG,
00386 ACE_TEXT ("ProxyPushSupplier disconnected due to consumer_not_exist\n")));
00387 }
00388 }
00389 }
00390 ACE_CATCHANY
00391 {
00392
00393 }
00394 ACE_ENDTRY;
00395 }
00396
00397
00398
00399 TAO_CEC_ConsumerControl_Adapter::TAO_CEC_ConsumerControl_Adapter (
00400 TAO_CEC_Reactive_ConsumerControl *adaptee)
00401 : adaptee_ (adaptee)
00402 {
00403 }
00404
00405 int
00406 TAO_CEC_ConsumerControl_Adapter::handle_timeout (
00407 const ACE_Time_Value &tv,
00408 const void *arg)
00409 {
00410 this->adaptee_->handle_timeout (tv, arg);
00411 return 0;
00412 }
00413
00414
00415
00416 void
00417 TAO_CEC_Ping_Push_Consumer::work (TAO_CEC_ProxyPushSupplier *supplier
00418 ACE_ENV_ARG_DECL)
00419 {
00420 ACE_TRY
00421 {
00422 CORBA::Boolean disconnected;
00423 CORBA::Boolean non_existent =
00424 supplier->consumer_non_existent (disconnected
00425 ACE_ENV_ARG_PARAMETER);
00426 ACE_TRY_CHECK;
00427 if (non_existent && !disconnected)
00428 {
00429 this->control_->consumer_not_exist (supplier ACE_ENV_ARG_PARAMETER);
00430 ACE_TRY_CHECK;
00431 }
00432 }
00433 ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00434 {
00435 this->control_->consumer_not_exist (supplier ACE_ENV_ARG_PARAMETER);
00436 ACE_TRY_CHECK;
00437 }
00438 ACE_CATCH (CORBA::TRANSIENT, transient)
00439 {
00440 if (this->control_->need_to_disconnect (supplier))
00441 {
00442 this->control_->consumer_not_exist (supplier ACE_ENV_ARG_PARAMETER);
00443 ACE_TRY_CHECK;
00444 }
00445 }
00446 ACE_CATCHANY
00447 {
00448
00449 }
00450 ACE_ENDTRY;
00451 }
00452
00453
00454
00455 void
00456 TAO_CEC_Ping_Pull_Consumer::work (TAO_CEC_ProxyPullSupplier *supplier
00457 ACE_ENV_ARG_DECL)
00458 {
00459 ACE_TRY
00460 {
00461 CORBA::Boolean disconnected;
00462 CORBA::Boolean non_existent =
00463 supplier->consumer_non_existent (disconnected
00464 ACE_ENV_ARG_PARAMETER);
00465 ACE_TRY_CHECK;
00466 if (non_existent && !disconnected)
00467 {
00468 this->control_->consumer_not_exist (supplier ACE_ENV_ARG_PARAMETER);
00469 ACE_TRY_CHECK;
00470 }
00471 }
00472 ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00473 {
00474 this->control_->consumer_not_exist (supplier ACE_ENV_ARG_PARAMETER);
00475 ACE_TRY_CHECK;
00476 }
00477 ACE_CATCH (CORBA::TRANSIENT, transient)
00478 {
00479 if (this->control_->need_to_disconnect (supplier))
00480 {
00481 this->control_->consumer_not_exist (supplier ACE_ENV_ARG_PARAMETER);
00482 ACE_TRY_CHECK;
00483 }
00484 }
00485 ACE_CATCHANY
00486 {
00487
00488 }
00489 ACE_ENDTRY;
00490 }
00491
00492 TAO_END_VERSIONED_NAMESPACE_DECL