CEC_Reactive_Pulling_Strategy.cpp

Go to the documentation of this file.
00001 // CEC_Reactive_Pulling_Strategy.cpp,v 1.19 2006/03/14 06:14:25 jtc Exp
00002 
00003 #include "orbsvcs/CosEvent/CEC_ProxyPullConsumer.h"
00004 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00005 #include "orbsvcs/CosEvent/CEC_SupplierAdmin.h"
00006 #include "orbsvcs/CosEvent/CEC_ConsumerAdmin.h"
00007 #include "orbsvcs/CosEvent/CEC_Reactive_Pulling_Strategy.h"
00008 
00009 #include "orbsvcs/Time_Utilities.h"
00010 
00011 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00012 #include "tao/Messaging/Messaging.h"
00013 #endif
00014 
00015 #include "tao/ORB_Core.h"
00016 
00017 #include "ace/Reactor.h"
00018 
00019 #if ! defined (__ACE_INLINE__)
00020 #include "orbsvcs/CosEvent/CEC_Reactive_Pulling_Strategy.i"
00021 #endif /* __ACE_INLINE__ */
00022 
00023 ACE_RCSID(CosEvent, CEC_Reactive_Pulling_Strategy, "CEC_Reactive_Pulling_Strategy.cpp,v 1.19 2006/03/14 06:14:25 jtc Exp")
00024 
00025 
00026 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00027 
00028 TAO_CEC_Reactive_Pulling_Strategy::
00029     TAO_CEC_Reactive_Pulling_Strategy (const ACE_Time_Value &rate,
00030                                        const ACE_Time_Value &relative_timeout,
00031                                        TAO_CEC_EventChannel *event_channel,
00032                                        CORBA::ORB_ptr orb)
00033   :  adapter_ (this),
00034      rate_ (rate),
00035      relative_timeout_ (relative_timeout),
00036      event_channel_ (event_channel),
00037      orb_ (CORBA::ORB::_duplicate (orb))
00038 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00039    // Initialise timer_id_ to an invalid timer id, so that in case we don't
00040    // schedule a timer, we don't cancel a random timer at shutdown
00041    , timer_id_ (-1)
00042 #endif /* TAO_HAS_CORBA_MESSAGING */
00043 {
00044   this->reactor_ =
00045     this->orb_->orb_core ()->reactor ();
00046 }
00047 
00048 void
00049 TAO_CEC_Reactive_Pulling_Strategy::handle_timeout (
00050       const ACE_Time_Value &,
00051       const void *)
00052 {
00053   ACE_TRY_NEW_ENV
00054     {
00055       // Query the state of the Current object *before* we initiate
00056       // the iteration...
00057       CORBA::PolicyTypeSeq types;
00058       CORBA::PolicyList_var policies =
00059         this->policy_current_->get_policy_overrides (types
00060                                                      ACE_ENV_ARG_PARAMETER);
00061       ACE_TRY_CHECK;
00062 
00063       // Change the timeout
00064       this->policy_current_->set_policy_overrides (this->policy_list_,
00065                                                    CORBA::ADD_OVERRIDE
00066                                                    ACE_ENV_ARG_PARAMETER);
00067       ACE_TRY_CHECK;
00068 
00069       ACE_TRY_EX (query)
00070         {
00071           TAO_CEC_Pull_Event worker (this->event_channel_->consumer_admin (),
00072                                      this->event_channel_->supplier_control ());
00073 
00074           this->event_channel_->supplier_admin ()->for_each (&worker
00075                                                              ACE_ENV_ARG_PARAMETER);
00076           ACE_TRY_CHECK_EX (query);
00077         }
00078       ACE_CATCHANY
00079         {
00080           // Ignore all exceptions
00081         }
00082       ACE_ENDTRY;
00083 
00084       this->policy_current_->set_policy_overrides (policies.in (),
00085                                                    CORBA::SET_OVERRIDE
00086                                                    ACE_ENV_ARG_PARAMETER);
00087       ACE_TRY_CHECK;
00088       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00089         {
00090           policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00091           ACE_TRY_CHECK;
00092         }
00093     }
00094   ACE_CATCHANY
00095     {
00096       // Ignore all exceptions
00097     }
00098   ACE_ENDTRY;
00099 }
00100 
00101 void
00102 TAO_CEC_Reactive_Pulling_Strategy::activate (void)
00103 {
00104 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00105   timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00106                                             0,
00107                                             this->rate_,
00108                                             this->rate_);
00109   if (timer_id_ == -1)
00110     return;
00111 
00112   ACE_TRY_NEW_ENV
00113     {
00114       // Get the PolicyCurrent object
00115       CORBA::Object_var tmp =
00116         this->orb_->resolve_initial_references ("PolicyCurrent"
00117                                                 ACE_ENV_ARG_PARAMETER);
00118       ACE_TRY_CHECK;
00119 
00120       this->policy_current_ =
00121         CORBA::PolicyCurrent::_narrow (tmp.in ()
00122                                        ACE_ENV_ARG_PARAMETER);
00123       ACE_TRY_CHECK;
00124 
00125       // Pre-compute the policy list to the set the right timeout
00126       // value...
00127       // We need to convert the relative timeout into 100's of nano seconds.
00128       TimeBase::TimeT timeout;
00129       ORBSVCS_Time::Time_Value_to_TimeT (timeout,
00130                                          this->relative_timeout_);
00131       CORBA::Any any;
00132       any <<= timeout;
00133 
00134       this->policy_list_.length (1);
00135       this->policy_list_[0] =
00136         this->orb_->create_policy (
00137                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00138                any
00139                ACE_ENV_ARG_PARAMETER);
00140       ACE_TRY_CHECK;
00141     }
00142   ACE_CATCHANY
00143     {
00144     }
00145   ACE_ENDTRY;
00146 #endif /* TAO_HAS_CORBA_MESSAGING */
00147 }
00148 
00149 void
00150 TAO_CEC_Reactive_Pulling_Strategy::shutdown (void)
00151 {
00152 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00153   this->reactor_->cancel_timer (timer_id_);
00154 #endif /* TAO_HAS_CORBA_MESSAGING */
00155   this->adapter_.reactor (0);
00156 }
00157 
00158 // ****************************************************************
00159 
00160 TAO_CEC_Pulling_Strategy_Adapter::TAO_CEC_Pulling_Strategy_Adapter (
00161       TAO_CEC_Reactive_Pulling_Strategy *adaptee)
00162   :  adaptee_ (adaptee)
00163 {
00164 }
00165 
00166 int
00167 TAO_CEC_Pulling_Strategy_Adapter::handle_timeout (
00168       const ACE_Time_Value &tv,
00169       const void *arg)
00170 {
00171   this->adaptee_->handle_timeout (tv, arg);
00172   return 0;
00173 }
00174 
00175 // ****************************************************************
00176 
00177 void
00178 TAO_CEC_Pull_Event::work (TAO_CEC_ProxyPullConsumer *consumer
00179                           ACE_ENV_ARG_DECL)
00180 {
00181   CORBA::Boolean has_event = 0;
00182   CORBA::Any_var any;
00183 
00184   ACE_TRY
00185     {
00186       any = consumer->try_pull_from_supplier (has_event
00187                                               ACE_ENV_ARG_PARAMETER);
00188       ACE_TRY_CHECK;
00189     }
00190   ACE_CATCHANY
00191     {
00192       // Ignore all exceptions
00193       return;
00194     }
00195   ACE_ENDTRY;
00196 
00197   if (has_event)
00198     {
00199       this->consumer_admin_->push (any.in () ACE_ENV_ARG_PARAMETER);
00200       ACE_CHECK;
00201     }
00202 }
00203 
00204 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:18:17 2006 for TAO_CosEvent by doxygen 1.3.6