CEC_Reactive_ConsumerControl.cpp

Go to the documentation of this file.
00001 // CEC_Reactive_ConsumerControl.cpp,v 1.24 2006/03/14 06:14:25 jtc Exp
00002 
00003 // Note: This class controls the behaviour of consumers connected to both
00004 // the Typed and Un-typed Event Channels.  A check must be made in the code
00005 // to ensure the correct EC is referenced. 
00006 
00007 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00008 #include "orbsvcs/CosEvent/CEC_ConsumerAdmin.h"
00009 #include "orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.h"
00010 
00011 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00012 #include "orbsvcs/CosEvent/CEC_TypedEventChannel.h"
00013 #include "orbsvcs/CosEvent/CEC_TypedConsumerAdmin.h"
00014 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00015 
00016 #include "orbsvcs/CosEvent/CEC_ProxyPushSupplier.h"
00017 #include "orbsvcs/CosEvent/CEC_ProxyPullSupplier.h"
00018 
00019 #include "orbsvcs/Time_Utilities.h"
00020 
00021 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00022 #include "tao/Messaging/Messaging.h"
00023 #endif
00024 
00025 #include "tao/ORB_Core.h"
00026 #include "tao/debug.h"
00027 #include "ace/Reactor.h"
00028 
00029 #if ! defined (__ACE_INLINE__)
00030 #include "orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.i"
00031 #endif /* __ACE_INLINE__ */
00032 
00033 ACE_RCSID (CosEvent,
00034            CEC_Reactive_ConsumerControl,
00035            "CEC_Reactive_ConsumerControl.cpp,v 1.24 2006/03/14 06:14:25 jtc Exp")
00036 
00037 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00038 
00039 // TAO_CEC_Reactive_ConsumerControl constructor for the Un-typed EC
00040 TAO_CEC_Reactive_ConsumerControl::
00041      TAO_CEC_Reactive_ConsumerControl (const ACE_Time_Value &rate,
00042                                        const ACE_Time_Value &timeout,
00043                                        unsigned int retries,
00044                                        TAO_CEC_EventChannel *ec,
00045                                        CORBA::ORB_ptr orb)
00046   : rate_ (rate),
00047     timeout_ (timeout),
00048     retries_ (retries),
00049     adapter_ (this),
00050     event_channel_ (ec),
00051 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00052     typed_event_channel_ (0),
00053 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00054     orb_ (CORBA::ORB::_duplicate (orb))
00055 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00056    // Initialise timer_id_ to an invalid timer id, so that in case we don't
00057    // schedule a timer, we don't cancel a random timer at shutdown
00058    , timer_id_ (-1)
00059 #endif /* TAO_HAS_CORBA_MESSAGING */
00060 {
00061   this->reactor_ =
00062     this->orb_->orb_core ()->reactor ();
00063 }
00064 
00065 // TAO_CEC_Reactive_ConsumerControl constructor for the Typed EC
00066 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00067 TAO_CEC_Reactive_ConsumerControl::
00068      TAO_CEC_Reactive_ConsumerControl (const ACE_Time_Value &rate,
00069                                        const ACE_Time_Value &timeout,
00070                                        unsigned int retries,
00071                                        TAO_CEC_TypedEventChannel *ec,
00072                                        CORBA::ORB_ptr orb)
00073   : rate_ (rate),
00074     timeout_ (timeout),
00075     retries_ (retries),
00076     adapter_ (this),
00077     event_channel_ (0),
00078     typed_event_channel_ (ec),
00079     orb_ (CORBA::ORB::_duplicate (orb))
00080 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00081    // Initialise timer_id_ to an invalid timer id, so that in case we don't
00082    // schedule a timer, we don't cancel a random timer at shutdown
00083    , timer_id_ (-1)
00084 #endif /* TAO_HAS_CORBA_MESSAGING */
00085 {
00086   this->reactor_ =
00087     this->orb_->orb_core ()->reactor ();
00088 }
00089 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00090 
00091 TAO_CEC_Reactive_ConsumerControl::~TAO_CEC_Reactive_ConsumerControl (void)
00092 {
00093 }
00094 
00095 void
00096 TAO_CEC_Reactive_ConsumerControl::query_consumers (
00097       ACE_ENV_SINGLE_ARG_DECL)
00098 {
00099   TAO_CEC_Ping_Push_Consumer push_worker (this);
00100 
00101 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00102   if (this->typed_event_channel_)
00103     {
00104       // Typed EC
00105       this->typed_event_channel_->typed_consumer_admin ()->for_each (&push_worker
00106                                                                      ACE_ENV_ARG_PARAMETER);
00107       ACE_CHECK;
00108     }
00109   else
00110     {
00111 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00112 
00113   // Un-typed EC
00114   this->event_channel_->consumer_admin ()->for_each (&push_worker
00115                                                      ACE_ENV_ARG_PARAMETER);
00116   ACE_CHECK;
00117 
00118   TAO_CEC_Ping_Pull_Consumer pull_worker (this);
00119   this->event_channel_->consumer_admin ()->for_each (&pull_worker
00120                                                      ACE_ENV_ARG_PARAMETER);
00121   ACE_CHECK;
00122 
00123 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00124     }
00125 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00126 }
00127 
00128 bool
00129 TAO_CEC_Reactive_ConsumerControl::need_to_disconnect (
00130                                     PortableServer::ServantBase* proxy)
00131 {
00132   bool disconnect = true;
00133 
00134 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00135   if (this->typed_event_channel_)
00136     {
00137       // Typed EC
00138       TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00139       if (this->typed_event_channel_->
00140           get_servant_retry_map ().find (proxy, entry) == 0)
00141         {
00142           ++entry->int_id_;
00143           if (entry->int_id_ <= this->retries_)
00144             {
00145               disconnect = false;
00146             }
00147         }
00148     }
00149   else
00150     {
00151 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00152 
00153   // Un-typed EC
00154   TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00155   if (this->event_channel_->
00156       get_servant_retry_map ().find (proxy, entry) == 0)
00157     {
00158       ++entry->int_id_;
00159       if (entry->int_id_ <= this->retries_)
00160         {
00161           disconnect = false;
00162         }
00163     }
00164 
00165 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00166     }
00167 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00168 
00169   return disconnect;
00170 }
00171 
00172 void
00173 TAO_CEC_Reactive_ConsumerControl::successful_transmission (
00174                                     PortableServer::ServantBase* proxy)
00175 {
00176 
00177 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00178   if (this->typed_event_channel_)
00179     {
00180       // Typed EC
00181       TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00182       if (this->typed_event_channel_->
00183           get_servant_retry_map ().find (proxy, entry) == 0)
00184         {
00185           entry->int_id_ = 0;
00186         }
00187     }
00188   else
00189     {
00190 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00191 
00192   // Un-typed EC
00193   TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00194   if (this->event_channel_->
00195       get_servant_retry_map ().find (proxy, entry) == 0)
00196     {
00197       entry->int_id_ = 0;
00198     }
00199 
00200 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00201     }
00202 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00203 
00204 }
00205 
00206 void
00207 TAO_CEC_Reactive_ConsumerControl::handle_timeout (
00208       const ACE_Time_Value &,
00209       const void *)
00210 {
00211   ACE_TRY_NEW_ENV
00212     {
00213       // Query the state of the Current object *before* we initiate
00214       // the iteration...
00215       CORBA::PolicyTypeSeq types;
00216       CORBA::PolicyList_var policies =
00217         this->policy_current_->get_policy_overrides (types
00218                                                      ACE_ENV_ARG_PARAMETER);
00219       ACE_TRY_CHECK;
00220 
00221       // Change the timeout
00222       this->policy_current_->set_policy_overrides (this->policy_list_,
00223                                                    CORBA::ADD_OVERRIDE
00224                                                    ACE_ENV_ARG_PARAMETER);
00225       ACE_TRY_CHECK;
00226 
00227       ACE_TRY_EX (query)
00228         {
00229           // Query the state of the consumers...
00230           this->query_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
00231           ACE_TRY_CHECK_EX (query);
00232         }
00233       ACE_CATCHANY
00234         {
00235           // Ignore all exceptions
00236         }
00237       ACE_ENDTRY;
00238 
00239       this->policy_current_->set_policy_overrides (policies.in (),
00240                                                    CORBA::SET_OVERRIDE
00241                                                    ACE_ENV_ARG_PARAMETER);
00242       ACE_TRY_CHECK;
00243       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00244         {
00245           policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00246           ACE_TRY_CHECK;
00247         }
00248     }
00249   ACE_CATCHANY
00250     {
00251       // Ignore all exceptions
00252     }
00253   ACE_ENDTRY;
00254 }
00255 
00256 int
00257 TAO_CEC_Reactive_ConsumerControl::activate (void)
00258 {
00259 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00260   ACE_TRY_NEW_ENV
00261     {
00262       // Get the PolicyCurrent object
00263       CORBA::Object_var tmp =
00264         this->orb_->resolve_initial_references ("PolicyCurrent"
00265                                                 ACE_ENV_ARG_PARAMETER);
00266       ACE_TRY_CHECK;
00267 
00268       this->policy_current_ =
00269         CORBA::PolicyCurrent::_narrow (tmp.in ()
00270                                        ACE_ENV_ARG_PARAMETER);
00271       ACE_TRY_CHECK;
00272 
00273       // Pre-compute the policy list to the set the right timeout
00274       // value...
00275       // We need to convert the relative timeout into 100's of nano seconds.
00276       TimeBase::TimeT timeout;
00277       ORBSVCS_Time::Time_Value_to_TimeT (timeout,
00278                                          this->timeout_);
00279       CORBA::Any any;
00280       any <<= timeout;
00281 
00282       this->policy_list_.length (1);
00283       this->policy_list_[0] =
00284         this->orb_->create_policy (
00285                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00286                any
00287                ACE_ENV_ARG_PARAMETER);
00288       ACE_TRY_CHECK;
00289 
00290       // Only schedule the timer, when the rate is not zero
00291       if (this->rate_ != ACE_Time_Value::zero)
00292       {
00293         // Schedule the timer after these policies has been set, because the
00294         // handle_timeout uses these policies, if done in front, the channel
00295         // can crash when the timeout expires before initiazation is ready.
00296         timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00297                                                     0,
00298                                                     this->rate_,
00299                                                     this->rate_);
00300         if (timer_id_ == -1)
00301          return -1;
00302       }
00303     }
00304   ACE_CATCHANY
00305     {
00306       return -1;
00307     }
00308   ACE_ENDTRY;
00309 #endif /* TAO_HAS_CORBA_MESSAGING */
00310 
00311   return 0;
00312 }
00313 
00314 int
00315 TAO_CEC_Reactive_ConsumerControl::shutdown (void)
00316 {
00317   int r = 0;
00318 
00319 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00320   r = this->reactor_->cancel_timer (timer_id_);
00321 #endif /* TAO_HAS_CORBA_MESSAGING */
00322   this->adapter_.reactor (0);
00323   return r;
00324 }
00325 
00326 void
00327 TAO_CEC_Reactive_ConsumerControl::consumer_not_exist (
00328       TAO_CEC_ProxyPushSupplier *proxy
00329       ACE_ENV_ARG_DECL)
00330 {
00331   ACE_TRY
00332     {
00333       proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00334       ACE_TRY_CHECK;
00335 
00336       if (TAO_debug_level >= 10)
00337         {
00338           ACE_DEBUG ((LM_DEBUG,
00339                       ACE_TEXT ("ProxyPushSupplier disconnected due to consumer_not_exist\n")));
00340         }
00341     }
00342   ACE_CATCHANY
00343     {
00344       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00345                            ACE_TEXT ("Reactive_ConsumerControl::consumer_not_exist"));
00346       // Ignore all exceptions..
00347     }
00348   ACE_ENDTRY;
00349 }
00350 
00351 void
00352 TAO_CEC_Reactive_ConsumerControl::consumer_not_exist (
00353       TAO_CEC_ProxyPullSupplier *proxy
00354       ACE_ENV_ARG_DECL)
00355 {
00356   ACE_TRY
00357     {
00358       proxy->disconnect_pull_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00359       ACE_TRY_CHECK;
00360     }
00361   ACE_CATCHANY
00362     {
00363       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00364                            ACE_TEXT ("Reactive_ConsumerControl::consumer_not_exist"));
00365       // Ignore all exceptions..
00366     }
00367   ACE_ENDTRY;
00368 }
00369 
00370 void
00371 TAO_CEC_Reactive_ConsumerControl::system_exception (
00372       TAO_CEC_ProxyPushSupplier *proxy,
00373       CORBA::SystemException & /* exception */
00374       ACE_ENV_ARG_DECL)
00375 {
00376   ACE_TRY
00377     {
00378       if (this->need_to_disconnect (proxy))
00379         {
00380           proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00381           ACE_TRY_CHECK;
00382 
00383           if (TAO_debug_level >= 10)
00384             {
00385               ACE_DEBUG ((LM_DEBUG,
00386                       ACE_TEXT ("ProxyPushSupplier disconnected due to consumer_not_exist\n")));
00387             }
00388         }
00389     }
00390   ACE_CATCHANY
00391     {
00392       // Ignore all exceptions..
00393     }
00394   ACE_ENDTRY;
00395 }
00396 
00397 // ****************************************************************
00398 
00399 TAO_CEC_ConsumerControl_Adapter::TAO_CEC_ConsumerControl_Adapter (
00400       TAO_CEC_Reactive_ConsumerControl *adaptee)
00401   :  adaptee_ (adaptee)
00402 {
00403 }
00404 
00405 int
00406 TAO_CEC_ConsumerControl_Adapter::handle_timeout (
00407       const ACE_Time_Value &tv,
00408       const void *arg)
00409 {
00410   this->adaptee_->handle_timeout (tv, arg);
00411   return 0;
00412 }
00413 
00414 // ****************************************************************
00415 
00416 void
00417 TAO_CEC_Ping_Push_Consumer::work (TAO_CEC_ProxyPushSupplier *supplier
00418                                   ACE_ENV_ARG_DECL)
00419 {
00420   ACE_TRY
00421     {
00422       CORBA::Boolean disconnected;
00423       CORBA::Boolean non_existent =
00424         supplier->consumer_non_existent (disconnected
00425                                          ACE_ENV_ARG_PARAMETER);
00426       ACE_TRY_CHECK;
00427       if (non_existent && !disconnected)
00428         {
00429           this->control_->consumer_not_exist (supplier ACE_ENV_ARG_PARAMETER);
00430           ACE_TRY_CHECK;
00431         }
00432     }
00433   ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00434     {
00435       this->control_->consumer_not_exist (supplier ACE_ENV_ARG_PARAMETER);
00436       ACE_TRY_CHECK;
00437     }
00438   ACE_CATCH (CORBA::TRANSIENT, transient)
00439     {
00440       if (this->control_->need_to_disconnect (supplier))
00441         {
00442           this->control_->consumer_not_exist (supplier ACE_ENV_ARG_PARAMETER);
00443           ACE_TRY_CHECK;
00444         }
00445     }
00446   ACE_CATCHANY
00447     {
00448       // Ignore all exceptions
00449     }
00450   ACE_ENDTRY;
00451 }
00452 
00453 // ****************************************************************
00454 
00455 void
00456 TAO_CEC_Ping_Pull_Consumer::work (TAO_CEC_ProxyPullSupplier *supplier
00457                                   ACE_ENV_ARG_DECL)
00458 {
00459   ACE_TRY
00460     {
00461       CORBA::Boolean disconnected;
00462       CORBA::Boolean non_existent =
00463         supplier->consumer_non_existent (disconnected
00464                                          ACE_ENV_ARG_PARAMETER);
00465       ACE_TRY_CHECK;
00466       if (non_existent && !disconnected)
00467         {
00468           this->control_->consumer_not_exist (supplier ACE_ENV_ARG_PARAMETER);
00469           ACE_TRY_CHECK;
00470         }
00471     }
00472   ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00473     {
00474       this->control_->consumer_not_exist (supplier ACE_ENV_ARG_PARAMETER);
00475       ACE_TRY_CHECK;
00476     }
00477   ACE_CATCH (CORBA::TRANSIENT, transient)
00478     {
00479       if (this->control_->need_to_disconnect (supplier))
00480         {
00481           this->control_->consumer_not_exist (supplier ACE_ENV_ARG_PARAMETER);
00482           ACE_TRY_CHECK;
00483         }
00484     }
00485   ACE_CATCHANY
00486     {
00487       // Ignore all exceptions
00488     }
00489   ACE_ENDTRY;
00490 }
00491 
00492 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:18:17 2006 for TAO_CosEvent by doxygen 1.3.6