CEC_Reactive_ConsumerControl.cpp

Go to the documentation of this file.
00001 // $Id: CEC_Reactive_ConsumerControl.cpp 76626 2007-01-26 13:50:03Z elliott_c $
00002 
00003 // Note: This class controls the behaviour of consumers connected to both
00004 // the Typed and Un-typed Event Channels.  A check must be made in the code
00005 // to ensure the correct EC is referenced.
00006 
00007 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00008 #include "orbsvcs/CosEvent/CEC_ConsumerAdmin.h"
00009 #include "orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.h"
00010 
00011 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00012 #include "orbsvcs/CosEvent/CEC_TypedEventChannel.h"
00013 #include "orbsvcs/CosEvent/CEC_TypedConsumerAdmin.h"
00014 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00015 
00016 #include "orbsvcs/CosEvent/CEC_ProxyPushSupplier.h"
00017 #include "orbsvcs/CosEvent/CEC_ProxyPullSupplier.h"
00018 
00019 #include "orbsvcs/Time_Utilities.h"
00020 
00021 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00022 #include "tao/Messaging/Messaging.h"
00023 #endif
00024 
00025 #include "tao/ORB_Core.h"
00026 #include "tao/debug.h"
00027 #include "ace/Reactor.h"
00028 
00029 #if ! defined (__ACE_INLINE__)
00030 #include "orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.inl"
00031 #endif /* __ACE_INLINE__ */
00032 
00033 ACE_RCSID (CosEvent,
00034            CEC_Reactive_ConsumerControl,
00035            "$Id: CEC_Reactive_ConsumerControl.cpp 76626 2007-01-26 13:50:03Z elliott_c $")
00036 
00037 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00038 
00039 // TAO_CEC_Reactive_ConsumerControl constructor for the Un-typed EC
00040 TAO_CEC_Reactive_ConsumerControl::
00041      TAO_CEC_Reactive_ConsumerControl (const ACE_Time_Value &rate,
00042                                        const ACE_Time_Value &timeout,
00043                                        unsigned int retries,
00044                                        TAO_CEC_EventChannel *ec,
00045                                        CORBA::ORB_ptr orb)
00046   : rate_ (rate),
00047     timeout_ (timeout),
00048     retries_ (retries),
00049     adapter_ (this),
00050     event_channel_ (ec),
00051 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00052     typed_event_channel_ (0),
00053 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00054     orb_ (CORBA::ORB::_duplicate (orb))
00055 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00056    // Initialise timer_id_ to an invalid timer id, so that in case we don't
00057    // schedule a timer, we don't cancel a random timer at shutdown
00058    , timer_id_ (-1)
00059 #endif /* TAO_HAS_CORBA_MESSAGING */
00060 {
00061   this->reactor_ =
00062     this->orb_->orb_core ()->reactor ();
00063 }
00064 
00065 // TAO_CEC_Reactive_ConsumerControl constructor for the Typed EC
00066 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00067 TAO_CEC_Reactive_ConsumerControl::
00068      TAO_CEC_Reactive_ConsumerControl (const ACE_Time_Value &rate,
00069                                        const ACE_Time_Value &timeout,
00070                                        unsigned int retries,
00071                                        TAO_CEC_TypedEventChannel *ec,
00072                                        CORBA::ORB_ptr orb)
00073   : rate_ (rate),
00074     timeout_ (timeout),
00075     retries_ (retries),
00076     adapter_ (this),
00077     event_channel_ (0),
00078     typed_event_channel_ (ec),
00079     orb_ (CORBA::ORB::_duplicate (orb))
00080 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00081    // Initialise timer_id_ to an invalid timer id, so that in case we don't
00082    // schedule a timer, we don't cancel a random timer at shutdown
00083    , timer_id_ (-1)
00084 #endif /* TAO_HAS_CORBA_MESSAGING */
00085 {
00086   this->reactor_ =
00087     this->orb_->orb_core ()->reactor ();
00088 }
00089 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00090 
00091 TAO_CEC_Reactive_ConsumerControl::~TAO_CEC_Reactive_ConsumerControl (void)
00092 {
00093 }
00094 
00095 void
00096 TAO_CEC_Reactive_ConsumerControl::query_consumers ()
00097 {
00098   TAO_CEC_Ping_Push_Consumer push_worker (this);
00099 
00100 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00101   if (this->typed_event_channel_)
00102     {
00103       // Typed EC
00104       this->typed_event_channel_->typed_consumer_admin ()->for_each (&push_worker);
00105     }
00106   else
00107     {
00108 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00109 
00110   // Un-typed EC
00111   this->event_channel_->consumer_admin ()->for_each (&push_worker);
00112 
00113   TAO_CEC_Ping_Pull_Consumer pull_worker (this);
00114   this->event_channel_->consumer_admin ()->for_each (&pull_worker);
00115 
00116 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00117     }
00118 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00119 }
00120 
00121 bool
00122 TAO_CEC_Reactive_ConsumerControl::need_to_disconnect (
00123                                     PortableServer::ServantBase* proxy)
00124 {
00125   bool disconnect = true;
00126 
00127 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00128   if (this->typed_event_channel_)
00129     {
00130       // Typed EC
00131       TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00132       if (this->typed_event_channel_->
00133           get_servant_retry_map ().find (proxy, entry) == 0)
00134         {
00135           ++entry->int_id_;
00136           if (entry->int_id_ <= this->retries_)
00137             {
00138               disconnect = false;
00139             }
00140         }
00141     }
00142   else
00143     {
00144 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00145 
00146   // Un-typed EC
00147   TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00148   if (this->event_channel_->
00149       get_servant_retry_map ().find (proxy, entry) == 0)
00150     {
00151       ++entry->int_id_;
00152       if (entry->int_id_ <= this->retries_)
00153         {
00154           disconnect = false;
00155         }
00156     }
00157 
00158 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00159     }
00160 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00161 
00162   return disconnect;
00163 }
00164 
00165 void
00166 TAO_CEC_Reactive_ConsumerControl::successful_transmission (
00167                                     PortableServer::ServantBase* proxy)
00168 {
00169 
00170 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00171   if (this->typed_event_channel_)
00172     {
00173       // Typed EC
00174       TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00175       if (this->typed_event_channel_->
00176           get_servant_retry_map ().find (proxy, entry) == 0)
00177         {
00178           entry->int_id_ = 0;
00179         }
00180     }
00181   else
00182     {
00183 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00184 
00185   // Un-typed EC
00186   TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00187   if (this->event_channel_->
00188       get_servant_retry_map ().find (proxy, entry) == 0)
00189     {
00190       entry->int_id_ = 0;
00191     }
00192 
00193 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00194     }
00195 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00196 
00197 }
00198 
00199 void
00200 TAO_CEC_Reactive_ConsumerControl::handle_timeout (
00201       const ACE_Time_Value &,
00202       const void *)
00203 {
00204   try
00205     {
00206       // Query the state of the Current object *before* we initiate
00207       // the iteration...
00208       CORBA::PolicyTypeSeq types;
00209       CORBA::PolicyList_var policies =
00210         this->policy_current_->get_policy_overrides (types);
00211 
00212       // Change the timeout
00213       this->policy_current_->set_policy_overrides (this->policy_list_,
00214                                                    CORBA::ADD_OVERRIDE);
00215 
00216       try
00217         {
00218           // Query the state of the consumers...
00219           this->query_consumers ();
00220         }
00221       catch (const CORBA::Exception&)
00222         {
00223           // Ignore all exceptions
00224         }
00225 
00226       this->policy_current_->set_policy_overrides (policies.in (),
00227                                                    CORBA::SET_OVERRIDE);
00228       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00229         {
00230           policies[i]->destroy ();
00231         }
00232     }
00233   catch (const CORBA::Exception&)
00234     {
00235       // Ignore all exceptions
00236     }
00237 }
00238 
00239 int
00240 TAO_CEC_Reactive_ConsumerControl::activate (void)
00241 {
00242 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00243   try
00244     {
00245       // Get the PolicyCurrent object
00246       CORBA::Object_var tmp =
00247         this->orb_->resolve_initial_references ("PolicyCurrent");
00248 
00249       this->policy_current_ =
00250         CORBA::PolicyCurrent::_narrow (tmp.in ());
00251 
00252       // Pre-compute the policy list to the set the right timeout
00253       // value...
00254       // We need to convert the relative timeout into 100's of nano seconds.
00255       TimeBase::TimeT timeout;
00256       ORBSVCS_Time::Time_Value_to_TimeT (timeout,
00257                                          this->timeout_);
00258       CORBA::Any any;
00259       any <<= timeout;
00260 
00261       this->policy_list_.length (1);
00262       this->policy_list_[0] =
00263         this->orb_->create_policy (
00264                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00265                any);
00266 
00267       // Only schedule the timer, when the rate is not zero
00268       if (this->rate_ != ACE_Time_Value::zero)
00269       {
00270         // Schedule the timer after these policies has been set, because the
00271         // handle_timeout uses these policies, if done in front, the channel
00272         // can crash when the timeout expires before initiazation is ready.
00273         timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00274                                                     0,
00275                                                     this->rate_,
00276                                                     this->rate_);
00277         if (timer_id_ == -1)
00278          return -1;
00279       }
00280     }
00281   catch (const CORBA::Exception&)
00282     {
00283       return -1;
00284     }
00285 #endif /* TAO_HAS_CORBA_MESSAGING */
00286 
00287   return 0;
00288 }
00289 
00290 int
00291 TAO_CEC_Reactive_ConsumerControl::shutdown (void)
00292 {
00293   int r = 0;
00294 
00295 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00296   r = this->reactor_->cancel_timer (timer_id_);
00297 #endif /* TAO_HAS_CORBA_MESSAGING */
00298   this->adapter_.reactor (0);
00299   return r;
00300 }
00301 
00302 void
00303 TAO_CEC_Reactive_ConsumerControl::consumer_not_exist (
00304       TAO_CEC_ProxyPushSupplier *proxy)
00305 {
00306   try
00307     {
00308       proxy->disconnect_push_supplier ();
00309 
00310       if (TAO_debug_level >= 10)
00311         {
00312           ACE_DEBUG ((LM_DEBUG,
00313                       ACE_TEXT ("ProxyPushSupplier disconnected due to consumer_not_exist\n")));
00314         }
00315     }
00316   catch (const CORBA::Exception& ex)
00317     {
00318       ex._tao_print_exception (
00319         ACE_TEXT ("Reactive_ConsumerControl::consumer_not_exist"));
00320       // Ignore all exceptions..
00321     }
00322 }
00323 
00324 void
00325 TAO_CEC_Reactive_ConsumerControl::consumer_not_exist (
00326       TAO_CEC_ProxyPullSupplier *proxy)
00327 {
00328   try
00329     {
00330       proxy->disconnect_pull_supplier ();
00331     }
00332   catch (const CORBA::Exception& ex)
00333     {
00334       ex._tao_print_exception (
00335         ACE_TEXT ("Reactive_ConsumerControl::consumer_not_exist"));
00336       // Ignore all exceptions..
00337     }
00338 }
00339 
00340 void
00341 TAO_CEC_Reactive_ConsumerControl::system_exception (
00342       TAO_CEC_ProxyPushSupplier *proxy,
00343       CORBA::SystemException & /* exception */)
00344 {
00345   try
00346     {
00347       if (this->need_to_disconnect (proxy))
00348         {
00349           proxy->disconnect_push_supplier ();
00350 
00351           if (TAO_debug_level >= 10)
00352             {
00353               ACE_DEBUG ((LM_DEBUG,
00354                       ACE_TEXT ("ProxyPushSupplier disconnected due to consumer_not_exist\n")));
00355             }
00356         }
00357     }
00358   catch (const CORBA::Exception&)
00359     {
00360       // Ignore all exceptions..
00361     }
00362 }
00363 
00364 // ****************************************************************
00365 
00366 TAO_CEC_ConsumerControl_Adapter::TAO_CEC_ConsumerControl_Adapter (
00367       TAO_CEC_Reactive_ConsumerControl *adaptee)
00368   :  adaptee_ (adaptee)
00369 {
00370 }
00371 
00372 int
00373 TAO_CEC_ConsumerControl_Adapter::handle_timeout (
00374       const ACE_Time_Value &tv,
00375       const void *arg)
00376 {
00377   this->adaptee_->handle_timeout (tv, arg);
00378   return 0;
00379 }
00380 
00381 // ****************************************************************
00382 
00383 void
00384 TAO_CEC_Ping_Push_Consumer::work (TAO_CEC_ProxyPushSupplier *supplier)
00385 {
00386   try
00387     {
00388       CORBA::Boolean disconnected;
00389       CORBA::Boolean non_existent =
00390         supplier->consumer_non_existent (disconnected);
00391       if (non_existent && !disconnected)
00392         {
00393           this->control_->consumer_not_exist (supplier);
00394         }
00395     }
00396   catch (const CORBA::OBJECT_NOT_EXIST& )
00397     {
00398       this->control_->consumer_not_exist (supplier);
00399     }
00400   catch (const CORBA::TRANSIENT& )
00401     {
00402       if (this->control_->need_to_disconnect (supplier))
00403         {
00404           this->control_->consumer_not_exist (supplier);
00405         }
00406     }
00407   catch (const CORBA::Exception&)
00408     {
00409       // Ignore all exceptions
00410     }
00411 }
00412 
00413 // ****************************************************************
00414 
00415 void
00416 TAO_CEC_Ping_Pull_Consumer::work (TAO_CEC_ProxyPullSupplier *supplier)
00417 {
00418   try
00419     {
00420       CORBA::Boolean disconnected;
00421       CORBA::Boolean non_existent =
00422         supplier->consumer_non_existent (disconnected);
00423       if (non_existent && !disconnected)
00424         {
00425           this->control_->consumer_not_exist (supplier);
00426         }
00427     }
00428   catch (const CORBA::OBJECT_NOT_EXIST& )
00429     {
00430       this->control_->consumer_not_exist (supplier);
00431     }
00432   catch (const CORBA::TRANSIENT& )
00433     {
00434       if (this->control_->need_to_disconnect (supplier))
00435         {
00436           this->control_->consumer_not_exist (supplier);
00437         }
00438     }
00439   catch (const CORBA::Exception&)
00440     {
00441       // Ignore all exceptions
00442     }
00443 }
00444 
00445 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:44:51 2010 for TAO_CosEvent by  doxygen 1.4.7