CEC_Reactive_Pulling_Strategy.cpp

Go to the documentation of this file.
00001 // $Id: CEC_Reactive_Pulling_Strategy.cpp 76626 2007-01-26 13:50:03Z elliott_c $
00002 
00003 #include "orbsvcs/CosEvent/CEC_ProxyPullConsumer.h"
00004 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00005 #include "orbsvcs/CosEvent/CEC_SupplierAdmin.h"
00006 #include "orbsvcs/CosEvent/CEC_ConsumerAdmin.h"
00007 #include "orbsvcs/CosEvent/CEC_Reactive_Pulling_Strategy.h"
00008 
00009 #include "orbsvcs/Time_Utilities.h"
00010 
00011 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00012 #include "tao/Messaging/Messaging.h"
00013 #endif
00014 
00015 #include "tao/ORB_Core.h"
00016 
00017 #include "ace/Reactor.h"
00018 
00019 #if ! defined (__ACE_INLINE__)
00020 #include "orbsvcs/CosEvent/CEC_Reactive_Pulling_Strategy.inl"
00021 #endif /* __ACE_INLINE__ */
00022 
00023 ACE_RCSID(CosEvent, CEC_Reactive_Pulling_Strategy, "$Id: CEC_Reactive_Pulling_Strategy.cpp 76626 2007-01-26 13:50:03Z elliott_c $")
00024 
00025 
00026 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00027 
00028 TAO_CEC_Reactive_Pulling_Strategy::
00029     TAO_CEC_Reactive_Pulling_Strategy (const ACE_Time_Value &rate,
00030                                        const ACE_Time_Value &relative_timeout,
00031                                        TAO_CEC_EventChannel *event_channel,
00032                                        CORBA::ORB_ptr orb)
00033   :  adapter_ (this),
00034      rate_ (rate),
00035      relative_timeout_ (relative_timeout),
00036      event_channel_ (event_channel),
00037      orb_ (CORBA::ORB::_duplicate (orb))
00038 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00039    // Initialise timer_id_ to an invalid timer id, so that in case we don't
00040    // schedule a timer, we don't cancel a random timer at shutdown
00041    , timer_id_ (-1)
00042 #endif /* TAO_HAS_CORBA_MESSAGING */
00043 {
00044   this->reactor_ =
00045     this->orb_->orb_core ()->reactor ();
00046 }
00047 
00048 void
00049 TAO_CEC_Reactive_Pulling_Strategy::handle_timeout (
00050       const ACE_Time_Value &,
00051       const void *)
00052 {
00053   try
00054     {
00055       // Query the state of the Current object *before* we initiate
00056       // the iteration...
00057       CORBA::PolicyTypeSeq types;
00058       CORBA::PolicyList_var policies =
00059         this->policy_current_->get_policy_overrides (types);
00060 
00061       // Change the timeout
00062       this->policy_current_->set_policy_overrides (this->policy_list_,
00063                                                    CORBA::ADD_OVERRIDE);
00064 
00065       try
00066         {
00067           TAO_CEC_Pull_Event worker (this->event_channel_->consumer_admin (),
00068                                      this->event_channel_->supplier_control ());
00069 
00070           this->event_channel_->supplier_admin ()->for_each (&worker);
00071         }
00072       catch (const CORBA::Exception&)
00073         {
00074           // Ignore all exceptions
00075         }
00076 
00077       this->policy_current_->set_policy_overrides (policies.in (),
00078                                                    CORBA::SET_OVERRIDE);
00079       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00080         {
00081           policies[i]->destroy ();
00082         }
00083     }
00084   catch (const CORBA::Exception&)
00085     {
00086       // Ignore all exceptions
00087     }
00088 }
00089 
00090 void
00091 TAO_CEC_Reactive_Pulling_Strategy::activate (void)
00092 {
00093 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00094   timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00095                                             0,
00096                                             this->rate_,
00097                                             this->rate_);
00098   if (timer_id_ == -1)
00099     return;
00100 
00101   try
00102     {
00103       // Get the PolicyCurrent object
00104       CORBA::Object_var tmp =
00105         this->orb_->resolve_initial_references ("PolicyCurrent");
00106 
00107       this->policy_current_ =
00108         CORBA::PolicyCurrent::_narrow (tmp.in ());
00109 
00110       // Pre-compute the policy list to the set the right timeout
00111       // value...
00112       // We need to convert the relative timeout into 100's of nano seconds.
00113       TimeBase::TimeT timeout;
00114       ORBSVCS_Time::Time_Value_to_TimeT (timeout,
00115                                          this->relative_timeout_);
00116       CORBA::Any any;
00117       any <<= timeout;
00118 
00119       this->policy_list_.length (1);
00120       this->policy_list_[0] =
00121         this->orb_->create_policy (
00122                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00123                any);
00124     }
00125   catch (const CORBA::Exception&)
00126     {
00127     }
00128 #endif /* TAO_HAS_CORBA_MESSAGING */
00129 }
00130 
00131 void
00132 TAO_CEC_Reactive_Pulling_Strategy::shutdown (void)
00133 {
00134 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00135   this->reactor_->cancel_timer (timer_id_);
00136 #endif /* TAO_HAS_CORBA_MESSAGING */
00137   this->adapter_.reactor (0);
00138 }
00139 
00140 // ****************************************************************
00141 
00142 TAO_CEC_Pulling_Strategy_Adapter::TAO_CEC_Pulling_Strategy_Adapter (
00143       TAO_CEC_Reactive_Pulling_Strategy *adaptee)
00144   :  adaptee_ (adaptee)
00145 {
00146 }
00147 
00148 int
00149 TAO_CEC_Pulling_Strategy_Adapter::handle_timeout (
00150       const ACE_Time_Value &tv,
00151       const void *arg)
00152 {
00153   this->adaptee_->handle_timeout (tv, arg);
00154   return 0;
00155 }
00156 
00157 // ****************************************************************
00158 
00159 void
00160 TAO_CEC_Pull_Event::work (TAO_CEC_ProxyPullConsumer *consumer)
00161 {
00162   CORBA::Boolean has_event = 0;
00163   CORBA::Any_var any;
00164 
00165   try
00166     {
00167       any = consumer->try_pull_from_supplier (has_event);
00168     }
00169   catch (const CORBA::Exception&)
00170     {
00171       // Ignore all exceptions
00172       return;
00173     }
00174 
00175   if (has_event)
00176     {
00177       this->consumer_admin_->push (any.in ());
00178     }
00179 }
00180 
00181 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:44:51 2010 for TAO_CosEvent by  doxygen 1.4.7