CEC_Reactive_SupplierControl.cpp

Go to the documentation of this file.
00001 // CEC_Reactive_SupplierControl.cpp,v 1.24 2006/03/14 06:14:25 jtc Exp
00002 
00003 // Note: This class controls the behaviour of suppliers connected to both
00004 // the Typed and Un-typed Event Channels.  A check must be made in the code
00005 // to ensure the correct EC is referenced. 
00006 
00007 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00008 #include "orbsvcs/CosEvent/CEC_SupplierAdmin.h"
00009 #include "orbsvcs/CosEvent/CEC_ProxyPushConsumer.h"
00010 #include "orbsvcs/CosEvent/CEC_Reactive_SupplierControl.h"
00011 
00012 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00013 #include "orbsvcs/CosEvent/CEC_TypedEventChannel.h"
00014 #include "orbsvcs/CosEvent/CEC_TypedSupplierAdmin.h"
00015 #include "orbsvcs/CosEvent/CEC_TypedProxyPushConsumer.h"
00016 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00017 
00018 #include "orbsvcs/CosEvent/CEC_ProxyPullConsumer.h"
00019 
00020 #include "orbsvcs/Time_Utilities.h"
00021 
00022 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00023 #include "tao/Messaging/Messaging.h"
00024 #endif
00025 
00026 #include "tao/ORB_Core.h"
00027 
00028 #include "ace/Reactor.h"
00029 
00030 #if ! defined (__ACE_INLINE__)
00031 #include "orbsvcs/CosEvent/CEC_Reactive_SupplierControl.i"
00032 #endif /* __ACE_INLINE__ */
00033 
00034 ACE_RCSID (CosEvent,
00035            CEC_Reactive_SupplierControl,
00036            "CEC_Reactive_SupplierControl.cpp,v 1.24 2006/03/14 06:14:25 jtc Exp")
00037 
00038 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00039 
00040 // TAO_CEC_Reactive_SupplierControl constructor for the Un-typed EC
00041 TAO_CEC_Reactive_SupplierControl::
00042      TAO_CEC_Reactive_SupplierControl (const ACE_Time_Value &rate,
00043                                        const ACE_Time_Value &timeout,
00044                                        unsigned int retries,
00045                                        TAO_CEC_EventChannel *ec,
00046                                        CORBA::ORB_ptr orb)
00047   : rate_ (rate),
00048     timeout_ (timeout),
00049     retries_ (retries),
00050     adapter_ (this),
00051     event_channel_ (ec),
00052 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00053     typed_event_channel_ (0),
00054 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00055     orb_ (CORBA::ORB::_duplicate (orb))
00056 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00057    // Initialise timer_id_ to an invalid timer id, so that in case we don't
00058    // schedule a timer, we don't cancel a random timer at shutdown
00059    , timer_id_ (-1)
00060 #endif /* TAO_HAS_CORBA_MESSAGING */
00061 {
00062   this->reactor_ =
00063     this->orb_->orb_core ()->reactor ();
00064 }
00065 
00066 // TAO_CEC_Reactive_SupplierControl constructor for the Typed EC
00067 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00068 TAO_CEC_Reactive_SupplierControl::
00069      TAO_CEC_Reactive_SupplierControl (const ACE_Time_Value &rate,
00070                                        const ACE_Time_Value &timeout,
00071                                        unsigned int retries,
00072                                        TAO_CEC_TypedEventChannel *ec,
00073                                        CORBA::ORB_ptr orb)
00074   : rate_ (rate),
00075     timeout_ (timeout),
00076     retries_ (retries),
00077     adapter_ (this),
00078     event_channel_ (0),
00079     typed_event_channel_ (ec),
00080     orb_ (CORBA::ORB::_duplicate (orb))
00081 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00082    // Initialise timer_id_ to an invalid timer id, so that in case we don't
00083    // schedule a timer, we don't cancel a random timer at shutdown
00084    , timer_id_ (-1)
00085 #endif /* TAO_HAS_CORBA_MESSAGING */
00086 {
00087   this->reactor_ =
00088     this->orb_->orb_core ()->reactor ();
00089 }
00090 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00091 
00092 TAO_CEC_Reactive_SupplierControl::~TAO_CEC_Reactive_SupplierControl (void)
00093 {
00094 }
00095 
00096 void
00097 TAO_CEC_Reactive_SupplierControl::query_suppliers (
00098       ACE_ENV_SINGLE_ARG_DECL)
00099 {
00100 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00101   if (this->typed_event_channel_)
00102     {
00103       // Typed EC
00104       TAO_CEC_Ping_Typed_Push_Supplier push_worker (this);
00105 
00106       this->typed_event_channel_->typed_supplier_admin ()->for_each (&push_worker
00107                                                                      ACE_ENV_ARG_PARAMETER);
00108       ACE_CHECK;
00109     }
00110   else
00111     {
00112 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00113 
00114   // Un-typed EC
00115   TAO_CEC_Ping_Push_Supplier push_worker (this);
00116   this->event_channel_->supplier_admin ()->for_each (&push_worker
00117                                                      ACE_ENV_ARG_PARAMETER);
00118   ACE_CHECK;
00119 
00120   TAO_CEC_Ping_Pull_Supplier pull_worker (this);
00121   this->event_channel_->supplier_admin ()->for_each (&pull_worker
00122                                                      ACE_ENV_ARG_PARAMETER);
00123   ACE_CHECK;
00124 
00125 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00126     }
00127 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00128 }
00129 
00130 bool
00131 TAO_CEC_Reactive_SupplierControl::need_to_disconnect (
00132                                     PortableServer::ServantBase* proxy)
00133 {
00134   bool disconnect = true;
00135 
00136 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00137   if (this->typed_event_channel_)
00138     {
00139       // Typed EC
00140       TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00141       if (this->typed_event_channel_->
00142           get_servant_retry_map ().find (proxy, entry) == 0)
00143         {
00144           ++entry->int_id_;
00145           if (entry->int_id_ <= this->retries_)
00146             {
00147               disconnect = false;
00148             }
00149         }
00150     }
00151   else
00152     {
00153 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00154 
00155   // Un-typed EC
00156   TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00157   if (this->event_channel_->
00158       get_servant_retry_map ().find (proxy, entry) == 0)
00159     {
00160       ++entry->int_id_;
00161       if (entry->int_id_ <= this->retries_)
00162         {
00163           disconnect = false;
00164         }
00165     }
00166 
00167 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00168     }
00169 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00170 
00171   return disconnect;
00172 }
00173 
00174 void
00175 TAO_CEC_Reactive_SupplierControl::successful_transmission (
00176                                     PortableServer::ServantBase* proxy)
00177 {
00178 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00179   if (this->typed_event_channel_)
00180     {
00181       // Typed EC
00182       TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00183       if (this->typed_event_channel_->
00184           get_servant_retry_map ().find (proxy, entry) == 0)
00185         {
00186           entry->int_id_ = 0;
00187         }
00188     }
00189   else
00190     {
00191 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00192 
00193   // Un-typed EC
00194   TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00195   if (this->event_channel_->
00196       get_servant_retry_map ().find (proxy, entry) == 0)
00197     {
00198       entry->int_id_ = 0;
00199     }
00200 
00201 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00202     }
00203 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00204 
00205 }
00206 
00207 void
00208 TAO_CEC_Reactive_SupplierControl::handle_timeout (
00209       const ACE_Time_Value &,
00210       const void *)
00211 {
00212   ACE_TRY_NEW_ENV
00213     {
00214       // Query the state of the Current object *before* we initiate
00215       // the iteration...
00216       CORBA::PolicyTypeSeq types;
00217       CORBA::PolicyList_var policies =
00218         this->policy_current_->get_policy_overrides (types
00219                                                      ACE_ENV_ARG_PARAMETER);
00220       ACE_TRY_CHECK;
00221 
00222       // Change the timeout
00223       this->policy_current_->set_policy_overrides (this->policy_list_,
00224                                                    CORBA::ADD_OVERRIDE
00225                                                    ACE_ENV_ARG_PARAMETER);
00226       ACE_TRY_CHECK;
00227 
00228       ACE_TRY_EX (query)
00229         {
00230           // Query the state of the suppliers...
00231           this->query_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
00232           ACE_TRY_CHECK_EX (query);
00233         }
00234       ACE_CATCHANY
00235         {
00236           // Ignore all exceptions
00237         }
00238       ACE_ENDTRY;
00239 
00240       this->policy_current_->set_policy_overrides (policies.in (),
00241                                                    CORBA::SET_OVERRIDE
00242                                                    ACE_ENV_ARG_PARAMETER);
00243       ACE_TRY_CHECK;
00244       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00245         {
00246           policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00247           ACE_TRY_CHECK;
00248         }
00249     }
00250   ACE_CATCHANY
00251     {
00252       // Ignore all exceptions
00253     }
00254   ACE_ENDTRY;
00255 }
00256 
00257 int
00258 TAO_CEC_Reactive_SupplierControl::activate (void)
00259 {
00260 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00261   ACE_TRY_NEW_ENV
00262     {
00263       // Get the PolicyCurrent object
00264       CORBA::Object_var tmp =
00265         this->orb_->resolve_initial_references ("PolicyCurrent"
00266                                                 ACE_ENV_ARG_PARAMETER);
00267       ACE_TRY_CHECK;
00268 
00269       this->policy_current_ =
00270         CORBA::PolicyCurrent::_narrow (tmp.in ()
00271                                        ACE_ENV_ARG_PARAMETER);
00272       ACE_TRY_CHECK;
00273 
00274       // Pre-compute the policy list to the set the right timeout
00275       // value...
00276       // We need to convert the relative timeout into 100's of nano seconds.
00277       TimeBase::TimeT timeout;
00278       ORBSVCS_Time::Time_Value_to_TimeT (timeout,
00279                                          this->timeout_);
00280       CORBA::Any any;
00281       any <<= timeout;
00282 
00283       this->policy_list_.length (1);
00284       this->policy_list_[0] =
00285         this->orb_->create_policy (
00286                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00287                any
00288                ACE_ENV_ARG_PARAMETER);
00289       ACE_TRY_CHECK;
00290 
00291       // Only schedule the timer, when the rate is not zero
00292       if (this->rate_ != ACE_Time_Value::zero)
00293       {
00294         // Schedule the timer after these policies has been set, because the
00295         // handle_timeout uses these policies, if done in front, the channel
00296         // can crash when the timeout expires before initiazation is ready.
00297         timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00298                                                     0,
00299                                                     this->rate_,
00300                                                     this->rate_);
00301         if (timer_id_ == -1)
00302           return -1;
00303       }
00304     }
00305   ACE_CATCHANY
00306     {
00307       return -1;
00308     }
00309   ACE_ENDTRY;
00310 #endif /* TAO_HAS_CORBA_MESSAGING */
00311 
00312   return 0;
00313 }
00314 
00315 int
00316 TAO_CEC_Reactive_SupplierControl::shutdown (void)
00317 {
00318   int r = 0;
00319 
00320 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00321   r = this->reactor_->cancel_timer (timer_id_);
00322 #endif /* TAO_HAS_CORBA_MESSAGING */
00323   this->adapter_.reactor (0);
00324   return r;
00325 }
00326 
00327 void
00328 TAO_CEC_Reactive_SupplierControl::supplier_not_exist (
00329       TAO_CEC_ProxyPushConsumer *proxy
00330       ACE_ENV_ARG_DECL)
00331 {
00332   ACE_TRY
00333     {
00334       proxy->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00335       ACE_TRY_CHECK;
00336     }
00337   ACE_CATCHANY
00338     {
00339       // Ignore all exceptions..
00340     }
00341   ACE_ENDTRY;
00342 }
00343 
00344 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00345 void
00346 TAO_CEC_Reactive_SupplierControl::supplier_not_exist (
00347       TAO_CEC_TypedProxyPushConsumer *proxy
00348       ACE_ENV_ARG_DECL)
00349 {
00350   ACE_TRY
00351     {
00352       proxy->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00353       ACE_TRY_CHECK;
00354     }
00355   ACE_CATCHANY
00356     {
00357       // Ignore all exceptions..
00358     }
00359   ACE_ENDTRY;
00360 }
00361 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00362 
00363 void
00364 TAO_CEC_Reactive_SupplierControl::supplier_not_exist (
00365       TAO_CEC_ProxyPullConsumer *proxy
00366       ACE_ENV_ARG_DECL)
00367 {
00368   ACE_TRY
00369     {
00370       proxy->disconnect_pull_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00371       ACE_TRY_CHECK;
00372     }
00373   ACE_CATCHANY
00374     {
00375       // Ignore all exceptions..
00376     }
00377   ACE_ENDTRY;
00378 }
00379 
00380 void
00381 TAO_CEC_Reactive_SupplierControl::system_exception (
00382       TAO_CEC_ProxyPullConsumer *proxy,
00383       CORBA::SystemException & /* exception */
00384       ACE_ENV_ARG_DECL)
00385 {
00386   ACE_TRY
00387     {
00388       if (this->need_to_disconnect (proxy))
00389         {
00390           proxy->disconnect_pull_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00391           ACE_TRY_CHECK;
00392         }
00393     }
00394   ACE_CATCHANY
00395     {
00396       // Ignore all exceptions..
00397     }
00398   ACE_ENDTRY;
00399 }
00400 
00401 // ****************************************************************
00402 
00403 TAO_CEC_SupplierControl_Adapter::TAO_CEC_SupplierControl_Adapter (
00404       TAO_CEC_Reactive_SupplierControl *adaptee)
00405   :  adaptee_ (adaptee)
00406 {
00407 }
00408 
00409 int
00410 TAO_CEC_SupplierControl_Adapter::handle_timeout (
00411       const ACE_Time_Value &tv,
00412       const void *arg)
00413 {
00414   this->adaptee_->handle_timeout (tv, arg);
00415   return 0;
00416 }
00417 
00418 // ****************************************************************
00419 
00420 void
00421 TAO_CEC_Ping_Push_Supplier::work (TAO_CEC_ProxyPushConsumer *consumer
00422                                   ACE_ENV_ARG_DECL)
00423 {
00424   ACE_TRY
00425     {
00426       CORBA::Boolean disconnected;
00427       CORBA::Boolean non_existent =
00428         consumer->supplier_non_existent (disconnected
00429                                          ACE_ENV_ARG_PARAMETER);
00430       ACE_TRY_CHECK;
00431       if (non_existent && !disconnected)
00432         {
00433           this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00434           ACE_TRY_CHECK;
00435         }
00436     }
00437   ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00438     {
00439       this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00440       ACE_TRY_CHECK;
00441     }
00442   ACE_CATCH (CORBA::TRANSIENT, transient)
00443     {
00444       if (this->control_->need_to_disconnect (consumer))
00445         {
00446           this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00447           ACE_TRY_CHECK;
00448         }
00449     }
00450   ACE_CATCHANY
00451     {
00452       // Ignore all exceptions
00453     }
00454   ACE_ENDTRY;
00455 }
00456 
00457 // ****************************************************************
00458 
00459 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00460 void
00461 TAO_CEC_Ping_Typed_Push_Supplier::work (TAO_CEC_TypedProxyPushConsumer *consumer
00462                                         ACE_ENV_ARG_DECL)
00463 {
00464   ACE_TRY
00465     {
00466       CORBA::Boolean disconnected;
00467       CORBA::Boolean non_existent =
00468         consumer->supplier_non_existent (disconnected
00469                                          ACE_ENV_ARG_PARAMETER);
00470       ACE_TRY_CHECK;
00471       if (non_existent && !disconnected)
00472         {
00473           this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00474           ACE_TRY_CHECK;
00475         }
00476     }
00477   ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00478     {
00479       this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00480       ACE_TRY_CHECK;
00481     }
00482   ACE_CATCH (CORBA::TRANSIENT, transient)
00483     {
00484       if (this->control_->need_to_disconnect (consumer))
00485         {
00486           this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00487           ACE_TRY_CHECK;
00488         }
00489     }
00490   ACE_CATCHANY
00491     {
00492       // Ignore all exceptions
00493     }
00494   ACE_ENDTRY;
00495 }
00496 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00497 
00498 // ****************************************************************
00499 
00500 void
00501 TAO_CEC_Ping_Pull_Supplier::work (TAO_CEC_ProxyPullConsumer *consumer
00502                                   ACE_ENV_ARG_DECL)
00503 {
00504   ACE_TRY
00505     {
00506       CORBA::Boolean disconnected;
00507       CORBA::Boolean non_existent =
00508         consumer->supplier_non_existent (disconnected
00509                                          ACE_ENV_ARG_PARAMETER);
00510       ACE_TRY_CHECK;
00511       if (non_existent && !disconnected)
00512         {
00513           this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00514           ACE_TRY_CHECK;
00515         }
00516     }
00517   ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00518     {
00519       this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00520       ACE_TRY_CHECK;
00521     }
00522   ACE_CATCH (CORBA::TRANSIENT, transient)
00523     {
00524       if (this->control_->need_to_disconnect (consumer))
00525         {
00526           this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00527           ACE_TRY_CHECK;
00528         }
00529     }
00530   ACE_CATCHANY
00531     {
00532       // Ignore all exceptions
00533     }
00534   ACE_ENDTRY;
00535 }
00536 
00537 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:18:17 2006 for TAO_CosEvent by doxygen 1.3.6