00001
00002
00003
00004
00005
00006
00007 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00008 #include "orbsvcs/CosEvent/CEC_ConsumerAdmin.h"
00009 #include "orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.h"
00010
00011 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00012 #include "orbsvcs/CosEvent/CEC_TypedEventChannel.h"
00013 #include "orbsvcs/CosEvent/CEC_TypedConsumerAdmin.h"
00014 #endif
00015
00016 #include "orbsvcs/CosEvent/CEC_ProxyPushSupplier.h"
00017 #include "orbsvcs/CosEvent/CEC_ProxyPullSupplier.h"
00018
00019 #include "orbsvcs/Time_Utilities.h"
00020
00021 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00022 #include "tao/Messaging/Messaging.h"
00023 #endif
00024
00025 #include "tao/ORB_Core.h"
00026 #include "tao/debug.h"
00027 #include "ace/Reactor.h"
00028
00029 #if ! defined (__ACE_INLINE__)
00030 #include "orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.inl"
00031 #endif
00032
00033 ACE_RCSID (CosEvent,
00034 CEC_Reactive_ConsumerControl,
00035 "$Id: CEC_Reactive_ConsumerControl.cpp 76626 2007-01-26 13:50:03Z elliott_c $")
00036
00037 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00038
00039
00040 TAO_CEC_Reactive_ConsumerControl::
00041 TAO_CEC_Reactive_ConsumerControl (const ACE_Time_Value &rate,
00042 const ACE_Time_Value &timeout,
00043 unsigned int retries,
00044 TAO_CEC_EventChannel *ec,
00045 CORBA::ORB_ptr orb)
00046 : rate_ (rate),
00047 timeout_ (timeout),
00048 retries_ (retries),
00049 adapter_ (this),
00050 event_channel_ (ec),
00051 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00052 typed_event_channel_ (0),
00053 #endif
00054 orb_ (CORBA::ORB::_duplicate (orb))
00055 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00056
00057
00058 , timer_id_ (-1)
00059 #endif
00060 {
00061 this->reactor_ =
00062 this->orb_->orb_core ()->reactor ();
00063 }
00064
00065
00066 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00067 TAO_CEC_Reactive_ConsumerControl::
00068 TAO_CEC_Reactive_ConsumerControl (const ACE_Time_Value &rate,
00069 const ACE_Time_Value &timeout,
00070 unsigned int retries,
00071 TAO_CEC_TypedEventChannel *ec,
00072 CORBA::ORB_ptr orb)
00073 : rate_ (rate),
00074 timeout_ (timeout),
00075 retries_ (retries),
00076 adapter_ (this),
00077 event_channel_ (0),
00078 typed_event_channel_ (ec),
00079 orb_ (CORBA::ORB::_duplicate (orb))
00080 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00081
00082
00083 , timer_id_ (-1)
00084 #endif
00085 {
00086 this->reactor_ =
00087 this->orb_->orb_core ()->reactor ();
00088 }
00089 #endif
00090
00091 TAO_CEC_Reactive_ConsumerControl::~TAO_CEC_Reactive_ConsumerControl (void)
00092 {
00093 }
00094
00095 void
00096 TAO_CEC_Reactive_ConsumerControl::query_consumers ()
00097 {
00098 TAO_CEC_Ping_Push_Consumer push_worker (this);
00099
00100 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00101 if (this->typed_event_channel_)
00102 {
00103
00104 this->typed_event_channel_->typed_consumer_admin ()->for_each (&push_worker);
00105 }
00106 else
00107 {
00108 #endif
00109
00110
00111 this->event_channel_->consumer_admin ()->for_each (&push_worker);
00112
00113 TAO_CEC_Ping_Pull_Consumer pull_worker (this);
00114 this->event_channel_->consumer_admin ()->for_each (&pull_worker);
00115
00116 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00117 }
00118 #endif
00119 }
00120
00121 bool
00122 TAO_CEC_Reactive_ConsumerControl::need_to_disconnect (
00123 PortableServer::ServantBase* proxy)
00124 {
00125 bool disconnect = true;
00126
00127 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00128 if (this->typed_event_channel_)
00129 {
00130
00131 TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00132 if (this->typed_event_channel_->
00133 get_servant_retry_map ().find (proxy, entry) == 0)
00134 {
00135 ++entry->int_id_;
00136 if (entry->int_id_ <= this->retries_)
00137 {
00138 disconnect = false;
00139 }
00140 }
00141 }
00142 else
00143 {
00144 #endif
00145
00146
00147 TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00148 if (this->event_channel_->
00149 get_servant_retry_map ().find (proxy, entry) == 0)
00150 {
00151 ++entry->int_id_;
00152 if (entry->int_id_ <= this->retries_)
00153 {
00154 disconnect = false;
00155 }
00156 }
00157
00158 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00159 }
00160 #endif
00161
00162 return disconnect;
00163 }
00164
00165 void
00166 TAO_CEC_Reactive_ConsumerControl::successful_transmission (
00167 PortableServer::ServantBase* proxy)
00168 {
00169
00170 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00171 if (this->typed_event_channel_)
00172 {
00173
00174 TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00175 if (this->typed_event_channel_->
00176 get_servant_retry_map ().find (proxy, entry) == 0)
00177 {
00178 entry->int_id_ = 0;
00179 }
00180 }
00181 else
00182 {
00183 #endif
00184
00185
00186 TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00187 if (this->event_channel_->
00188 get_servant_retry_map ().find (proxy, entry) == 0)
00189 {
00190 entry->int_id_ = 0;
00191 }
00192
00193 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00194 }
00195 #endif
00196
00197 }
00198
00199 void
00200 TAO_CEC_Reactive_ConsumerControl::handle_timeout (
00201 const ACE_Time_Value &,
00202 const void *)
00203 {
00204 try
00205 {
00206
00207
00208 CORBA::PolicyTypeSeq types;
00209 CORBA::PolicyList_var policies =
00210 this->policy_current_->get_policy_overrides (types);
00211
00212
00213 this->policy_current_->set_policy_overrides (this->policy_list_,
00214 CORBA::ADD_OVERRIDE);
00215
00216 try
00217 {
00218
00219 this->query_consumers ();
00220 }
00221 catch (const CORBA::Exception&)
00222 {
00223
00224 }
00225
00226 this->policy_current_->set_policy_overrides (policies.in (),
00227 CORBA::SET_OVERRIDE);
00228 for (CORBA::ULong i = 0; i != policies->length (); ++i)
00229 {
00230 policies[i]->destroy ();
00231 }
00232 }
00233 catch (const CORBA::Exception&)
00234 {
00235
00236 }
00237 }
00238
00239 int
00240 TAO_CEC_Reactive_ConsumerControl::activate (void)
00241 {
00242 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00243 try
00244 {
00245
00246 CORBA::Object_var tmp =
00247 this->orb_->resolve_initial_references ("PolicyCurrent");
00248
00249 this->policy_current_ =
00250 CORBA::PolicyCurrent::_narrow (tmp.in ());
00251
00252
00253
00254
00255 TimeBase::TimeT timeout;
00256 ORBSVCS_Time::Time_Value_to_TimeT (timeout,
00257 this->timeout_);
00258 CORBA::Any any;
00259 any <<= timeout;
00260
00261 this->policy_list_.length (1);
00262 this->policy_list_[0] =
00263 this->orb_->create_policy (
00264 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00265 any);
00266
00267
00268 if (this->rate_ != ACE_Time_Value::zero)
00269 {
00270
00271
00272
00273 timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00274 0,
00275 this->rate_,
00276 this->rate_);
00277 if (timer_id_ == -1)
00278 return -1;
00279 }
00280 }
00281 catch (const CORBA::Exception&)
00282 {
00283 return -1;
00284 }
00285 #endif
00286
00287 return 0;
00288 }
00289
00290 int
00291 TAO_CEC_Reactive_ConsumerControl::shutdown (void)
00292 {
00293 int r = 0;
00294
00295 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00296 r = this->reactor_->cancel_timer (timer_id_);
00297 #endif
00298 this->adapter_.reactor (0);
00299 return r;
00300 }
00301
00302 void
00303 TAO_CEC_Reactive_ConsumerControl::consumer_not_exist (
00304 TAO_CEC_ProxyPushSupplier *proxy)
00305 {
00306 try
00307 {
00308 proxy->disconnect_push_supplier ();
00309
00310 if (TAO_debug_level >= 10)
00311 {
00312 ACE_DEBUG ((LM_DEBUG,
00313 ACE_TEXT ("ProxyPushSupplier disconnected due to consumer_not_exist\n")));
00314 }
00315 }
00316 catch (const CORBA::Exception& ex)
00317 {
00318 ex._tao_print_exception (
00319 ACE_TEXT ("Reactive_ConsumerControl::consumer_not_exist"));
00320
00321 }
00322 }
00323
00324 void
00325 TAO_CEC_Reactive_ConsumerControl::consumer_not_exist (
00326 TAO_CEC_ProxyPullSupplier *proxy)
00327 {
00328 try
00329 {
00330 proxy->disconnect_pull_supplier ();
00331 }
00332 catch (const CORBA::Exception& ex)
00333 {
00334 ex._tao_print_exception (
00335 ACE_TEXT ("Reactive_ConsumerControl::consumer_not_exist"));
00336
00337 }
00338 }
00339
00340 void
00341 TAO_CEC_Reactive_ConsumerControl::system_exception (
00342 TAO_CEC_ProxyPushSupplier *proxy,
00343 CORBA::SystemException & )
00344 {
00345 try
00346 {
00347 if (this->need_to_disconnect (proxy))
00348 {
00349 proxy->disconnect_push_supplier ();
00350
00351 if (TAO_debug_level >= 10)
00352 {
00353 ACE_DEBUG ((LM_DEBUG,
00354 ACE_TEXT ("ProxyPushSupplier disconnected due to consumer_not_exist\n")));
00355 }
00356 }
00357 }
00358 catch (const CORBA::Exception&)
00359 {
00360
00361 }
00362 }
00363
00364
00365
00366 TAO_CEC_ConsumerControl_Adapter::TAO_CEC_ConsumerControl_Adapter (
00367 TAO_CEC_Reactive_ConsumerControl *adaptee)
00368 : adaptee_ (adaptee)
00369 {
00370 }
00371
00372 int
00373 TAO_CEC_ConsumerControl_Adapter::handle_timeout (
00374 const ACE_Time_Value &tv,
00375 const void *arg)
00376 {
00377 this->adaptee_->handle_timeout (tv, arg);
00378 return 0;
00379 }
00380
00381
00382
00383 void
00384 TAO_CEC_Ping_Push_Consumer::work (TAO_CEC_ProxyPushSupplier *supplier)
00385 {
00386 try
00387 {
00388 CORBA::Boolean disconnected;
00389 CORBA::Boolean non_existent =
00390 supplier->consumer_non_existent (disconnected);
00391 if (non_existent && !disconnected)
00392 {
00393 this->control_->consumer_not_exist (supplier);
00394 }
00395 }
00396 catch (const CORBA::OBJECT_NOT_EXIST& )
00397 {
00398 this->control_->consumer_not_exist (supplier);
00399 }
00400 catch (const CORBA::TRANSIENT& )
00401 {
00402 if (this->control_->need_to_disconnect (supplier))
00403 {
00404 this->control_->consumer_not_exist (supplier);
00405 }
00406 }
00407 catch (const CORBA::Exception&)
00408 {
00409
00410 }
00411 }
00412
00413
00414
00415 void
00416 TAO_CEC_Ping_Pull_Consumer::work (TAO_CEC_ProxyPullSupplier *supplier)
00417 {
00418 try
00419 {
00420 CORBA::Boolean disconnected;
00421 CORBA::Boolean non_existent =
00422 supplier->consumer_non_existent (disconnected);
00423 if (non_existent && !disconnected)
00424 {
00425 this->control_->consumer_not_exist (supplier);
00426 }
00427 }
00428 catch (const CORBA::OBJECT_NOT_EXIST& )
00429 {
00430 this->control_->consumer_not_exist (supplier);
00431 }
00432 catch (const CORBA::TRANSIENT& )
00433 {
00434 if (this->control_->need_to_disconnect (supplier))
00435 {
00436 this->control_->consumer_not_exist (supplier);
00437 }
00438 }
00439 catch (const CORBA::Exception&)
00440 {
00441
00442 }
00443 }
00444
00445 TAO_END_VERSIONED_NAMESPACE_DECL