CEC_Reactive_SupplierControl.cpp

Go to the documentation of this file.
00001 // $Id: CEC_Reactive_SupplierControl.cpp 76626 2007-01-26 13:50:03Z elliott_c $
00002 
00003 // Note: This class controls the behaviour of suppliers connected to both
00004 // the Typed and Un-typed Event Channels.  A check must be made in the code
00005 // to ensure the correct EC is referenced.
00006 
00007 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00008 #include "orbsvcs/CosEvent/CEC_SupplierAdmin.h"
00009 #include "orbsvcs/CosEvent/CEC_ProxyPushConsumer.h"
00010 #include "orbsvcs/CosEvent/CEC_Reactive_SupplierControl.h"
00011 
00012 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00013 #include "orbsvcs/CosEvent/CEC_TypedEventChannel.h"
00014 #include "orbsvcs/CosEvent/CEC_TypedSupplierAdmin.h"
00015 #include "orbsvcs/CosEvent/CEC_TypedProxyPushConsumer.h"
00016 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00017 
00018 #include "orbsvcs/CosEvent/CEC_ProxyPullConsumer.h"
00019 
00020 #include "orbsvcs/Time_Utilities.h"
00021 
00022 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00023 #include "tao/Messaging/Messaging.h"
00024 #endif
00025 
00026 #include "tao/ORB_Core.h"
00027 
00028 #include "ace/Reactor.h"
00029 
00030 #if ! defined (__ACE_INLINE__)
00031 #include "orbsvcs/CosEvent/CEC_Reactive_SupplierControl.inl"
00032 #endif /* __ACE_INLINE__ */
00033 
00034 ACE_RCSID (CosEvent,
00035            CEC_Reactive_SupplierControl,
00036            "$Id: CEC_Reactive_SupplierControl.cpp 76626 2007-01-26 13:50:03Z elliott_c $")
00037 
00038 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00039 
00040 // TAO_CEC_Reactive_SupplierControl constructor for the Un-typed EC
00041 TAO_CEC_Reactive_SupplierControl::
00042      TAO_CEC_Reactive_SupplierControl (const ACE_Time_Value &rate,
00043                                        const ACE_Time_Value &timeout,
00044                                        unsigned int retries,
00045                                        TAO_CEC_EventChannel *ec,
00046                                        CORBA::ORB_ptr orb)
00047   : rate_ (rate),
00048     timeout_ (timeout),
00049     retries_ (retries),
00050     adapter_ (this),
00051     event_channel_ (ec),
00052 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00053     typed_event_channel_ (0),
00054 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00055     orb_ (CORBA::ORB::_duplicate (orb))
00056 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00057    // Initialise timer_id_ to an invalid timer id, so that in case we don't
00058    // schedule a timer, we don't cancel a random timer at shutdown
00059    , timer_id_ (-1)
00060 #endif /* TAO_HAS_CORBA_MESSAGING */
00061 {
00062   this->reactor_ =
00063     this->orb_->orb_core ()->reactor ();
00064 }
00065 
00066 // TAO_CEC_Reactive_SupplierControl constructor for the Typed EC
00067 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00068 TAO_CEC_Reactive_SupplierControl::
00069      TAO_CEC_Reactive_SupplierControl (const ACE_Time_Value &rate,
00070                                        const ACE_Time_Value &timeout,
00071                                        unsigned int retries,
00072                                        TAO_CEC_TypedEventChannel *ec,
00073                                        CORBA::ORB_ptr orb)
00074   : rate_ (rate),
00075     timeout_ (timeout),
00076     retries_ (retries),
00077     adapter_ (this),
00078     event_channel_ (0),
00079     typed_event_channel_ (ec),
00080     orb_ (CORBA::ORB::_duplicate (orb))
00081 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00082    // Initialise timer_id_ to an invalid timer id, so that in case we don't
00083    // schedule a timer, we don't cancel a random timer at shutdown
00084    , timer_id_ (-1)
00085 #endif /* TAO_HAS_CORBA_MESSAGING */
00086 {
00087   this->reactor_ =
00088     this->orb_->orb_core ()->reactor ();
00089 }
00090 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00091 
00092 TAO_CEC_Reactive_SupplierControl::~TAO_CEC_Reactive_SupplierControl (void)
00093 {
00094 }
00095 
00096 void
00097 TAO_CEC_Reactive_SupplierControl::query_suppliers ()
00098 {
00099 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00100   if (this->typed_event_channel_)
00101     {
00102       // Typed EC
00103       TAO_CEC_Ping_Typed_Push_Supplier push_worker (this);
00104 
00105       this->typed_event_channel_->typed_supplier_admin ()->for_each (&push_worker);
00106     }
00107   else
00108     {
00109 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00110 
00111   // Un-typed EC
00112   TAO_CEC_Ping_Push_Supplier push_worker (this);
00113   this->event_channel_->supplier_admin ()->for_each (&push_worker);
00114 
00115   TAO_CEC_Ping_Pull_Supplier pull_worker (this);
00116   this->event_channel_->supplier_admin ()->for_each (&pull_worker);
00117 
00118 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00119     }
00120 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00121 }
00122 
00123 bool
00124 TAO_CEC_Reactive_SupplierControl::need_to_disconnect (
00125                                     PortableServer::ServantBase* proxy)
00126 {
00127   bool disconnect = true;
00128 
00129 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00130   if (this->typed_event_channel_)
00131     {
00132       // Typed EC
00133       TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00134       if (this->typed_event_channel_->
00135           get_servant_retry_map ().find (proxy, entry) == 0)
00136         {
00137           ++entry->int_id_;
00138           if (entry->int_id_ <= this->retries_)
00139             {
00140               disconnect = false;
00141             }
00142         }
00143     }
00144   else
00145     {
00146 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00147 
00148   // Un-typed EC
00149   TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00150   if (this->event_channel_->
00151       get_servant_retry_map ().find (proxy, entry) == 0)
00152     {
00153       ++entry->int_id_;
00154       if (entry->int_id_ <= this->retries_)
00155         {
00156           disconnect = false;
00157         }
00158     }
00159 
00160 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00161     }
00162 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00163 
00164   return disconnect;
00165 }
00166 
00167 void
00168 TAO_CEC_Reactive_SupplierControl::successful_transmission (
00169                                     PortableServer::ServantBase* proxy)
00170 {
00171 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00172   if (this->typed_event_channel_)
00173     {
00174       // Typed EC
00175       TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00176       if (this->typed_event_channel_->
00177           get_servant_retry_map ().find (proxy, entry) == 0)
00178         {
00179           entry->int_id_ = 0;
00180         }
00181     }
00182   else
00183     {
00184 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00185 
00186   // Un-typed EC
00187   TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00188   if (this->event_channel_->
00189       get_servant_retry_map ().find (proxy, entry) == 0)
00190     {
00191       entry->int_id_ = 0;
00192     }
00193 
00194 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00195     }
00196 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00197 
00198 }
00199 
00200 void
00201 TAO_CEC_Reactive_SupplierControl::handle_timeout (
00202       const ACE_Time_Value &,
00203       const void *)
00204 {
00205   try
00206     {
00207       // Query the state of the Current object *before* we initiate
00208       // the iteration...
00209       CORBA::PolicyTypeSeq types;
00210       CORBA::PolicyList_var policies =
00211         this->policy_current_->get_policy_overrides (types);
00212 
00213       // Change the timeout
00214       this->policy_current_->set_policy_overrides (this->policy_list_,
00215                                                    CORBA::ADD_OVERRIDE);
00216 
00217       try
00218         {
00219           // Query the state of the suppliers...
00220           this->query_suppliers ();
00221         }
00222       catch (const CORBA::Exception&)
00223         {
00224           // Ignore all exceptions
00225         }
00226 
00227       this->policy_current_->set_policy_overrides (policies.in (),
00228                                                    CORBA::SET_OVERRIDE);
00229       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00230         {
00231           policies[i]->destroy ();
00232         }
00233     }
00234   catch (const CORBA::Exception&)
00235     {
00236       // Ignore all exceptions
00237     }
00238 }
00239 
00240 int
00241 TAO_CEC_Reactive_SupplierControl::activate (void)
00242 {
00243 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00244   try
00245     {
00246       // Get the PolicyCurrent object
00247       CORBA::Object_var tmp =
00248         this->orb_->resolve_initial_references ("PolicyCurrent");
00249 
00250       this->policy_current_ =
00251         CORBA::PolicyCurrent::_narrow (tmp.in ());
00252 
00253       // Pre-compute the policy list to the set the right timeout
00254       // value...
00255       // We need to convert the relative timeout into 100's of nano seconds.
00256       TimeBase::TimeT timeout;
00257       ORBSVCS_Time::Time_Value_to_TimeT (timeout,
00258                                          this->timeout_);
00259       CORBA::Any any;
00260       any <<= timeout;
00261 
00262       this->policy_list_.length (1);
00263       this->policy_list_[0] =
00264         this->orb_->create_policy (
00265                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00266                any);
00267 
00268       // Only schedule the timer, when the rate is not zero
00269       if (this->rate_ != ACE_Time_Value::zero)
00270       {
00271         // Schedule the timer after these policies has been set, because the
00272         // handle_timeout uses these policies, if done in front, the channel
00273         // can crash when the timeout expires before initiazation is ready.
00274         timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00275                                                     0,
00276                                                     this->rate_,
00277                                                     this->rate_);
00278         if (timer_id_ == -1)
00279           return -1;
00280       }
00281     }
00282   catch (const CORBA::Exception&)
00283     {
00284       return -1;
00285     }
00286 #endif /* TAO_HAS_CORBA_MESSAGING */
00287 
00288   return 0;
00289 }
00290 
00291 int
00292 TAO_CEC_Reactive_SupplierControl::shutdown (void)
00293 {
00294   int r = 0;
00295 
00296 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00297   r = this->reactor_->cancel_timer (timer_id_);
00298 #endif /* TAO_HAS_CORBA_MESSAGING */
00299   this->adapter_.reactor (0);
00300   return r;
00301 }
00302 
00303 void
00304 TAO_CEC_Reactive_SupplierControl::supplier_not_exist (
00305       TAO_CEC_ProxyPushConsumer *proxy)
00306 {
00307   try
00308     {
00309       proxy->disconnect_push_consumer ();
00310     }
00311   catch (const CORBA::Exception&)
00312     {
00313       // Ignore all exceptions..
00314     }
00315 }
00316 
00317 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00318 void
00319 TAO_CEC_Reactive_SupplierControl::supplier_not_exist (
00320       TAO_CEC_TypedProxyPushConsumer *proxy)
00321 {
00322   try
00323     {
00324       proxy->disconnect_push_consumer ();
00325     }
00326   catch (const CORBA::Exception&)
00327     {
00328       // Ignore all exceptions..
00329     }
00330 }
00331 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00332 
00333 void
00334 TAO_CEC_Reactive_SupplierControl::supplier_not_exist (
00335       TAO_CEC_ProxyPullConsumer *proxy)
00336 {
00337   try
00338     {
00339       proxy->disconnect_pull_consumer ();
00340     }
00341   catch (const CORBA::Exception&)
00342     {
00343       // Ignore all exceptions..
00344     }
00345 }
00346 
00347 void
00348 TAO_CEC_Reactive_SupplierControl::system_exception (
00349       TAO_CEC_ProxyPullConsumer *proxy,
00350       CORBA::SystemException & /* exception */)
00351 {
00352   try
00353     {
00354       if (this->need_to_disconnect (proxy))
00355         {
00356           proxy->disconnect_pull_consumer ();
00357         }
00358     }
00359   catch (const CORBA::Exception&)
00360     {
00361       // Ignore all exceptions..
00362     }
00363 }
00364 
00365 // ****************************************************************
00366 
00367 TAO_CEC_SupplierControl_Adapter::TAO_CEC_SupplierControl_Adapter (
00368       TAO_CEC_Reactive_SupplierControl *adaptee)
00369   :  adaptee_ (adaptee)
00370 {
00371 }
00372 
00373 int
00374 TAO_CEC_SupplierControl_Adapter::handle_timeout (
00375       const ACE_Time_Value &tv,
00376       const void *arg)
00377 {
00378   this->adaptee_->handle_timeout (tv, arg);
00379   return 0;
00380 }
00381 
00382 // ****************************************************************
00383 
00384 void
00385 TAO_CEC_Ping_Push_Supplier::work (TAO_CEC_ProxyPushConsumer *consumer)
00386 {
00387   try
00388     {
00389       CORBA::Boolean disconnected;
00390       CORBA::Boolean non_existent =
00391         consumer->supplier_non_existent (disconnected);
00392       if (non_existent && !disconnected)
00393         {
00394           this->control_->supplier_not_exist (consumer);
00395         }
00396     }
00397   catch (const CORBA::OBJECT_NOT_EXIST& )
00398     {
00399       this->control_->supplier_not_exist (consumer);
00400     }
00401   catch (const CORBA::TRANSIENT& )
00402     {
00403       if (this->control_->need_to_disconnect (consumer))
00404         {
00405           this->control_->supplier_not_exist (consumer);
00406         }
00407     }
00408   catch (const CORBA::Exception&)
00409     {
00410       // Ignore all exceptions
00411     }
00412 }
00413 
00414 // ****************************************************************
00415 
00416 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00417 void
00418 TAO_CEC_Ping_Typed_Push_Supplier::work (TAO_CEC_TypedProxyPushConsumer *consumer)
00419 {
00420   try
00421     {
00422       CORBA::Boolean disconnected;
00423       CORBA::Boolean non_existent =
00424         consumer->supplier_non_existent (disconnected);
00425       if (non_existent && !disconnected)
00426         {
00427           this->control_->supplier_not_exist (consumer);
00428         }
00429     }
00430   catch (const CORBA::OBJECT_NOT_EXIST& )
00431     {
00432       this->control_->supplier_not_exist (consumer);
00433     }
00434   catch (const CORBA::TRANSIENT& )
00435     {
00436       if (this->control_->need_to_disconnect (consumer))
00437         {
00438           this->control_->supplier_not_exist (consumer);
00439         }
00440     }
00441   catch (const CORBA::Exception&)
00442     {
00443       // Ignore all exceptions
00444     }
00445 }
00446 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00447 
00448 // ****************************************************************
00449 
00450 void
00451 TAO_CEC_Ping_Pull_Supplier::work (TAO_CEC_ProxyPullConsumer *consumer)
00452 {
00453   try
00454     {
00455       CORBA::Boolean disconnected;
00456       CORBA::Boolean non_existent =
00457         consumer->supplier_non_existent (disconnected);
00458       if (non_existent && !disconnected)
00459         {
00460           this->control_->supplier_not_exist (consumer);
00461         }
00462     }
00463   catch (const CORBA::OBJECT_NOT_EXIST& )
00464     {
00465       this->control_->supplier_not_exist (consumer);
00466     }
00467   catch (const CORBA::TRANSIENT& )
00468     {
00469       if (this->control_->need_to_disconnect (consumer))
00470         {
00471           this->control_->supplier_not_exist (consumer);
00472         }
00473     }
00474   catch (const CORBA::Exception&)
00475     {
00476       // Ignore all exceptions
00477     }
00478 }
00479 
00480 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:44:51 2010 for TAO_CosEvent by  doxygen 1.4.7