00001
00002
00003 #include "orbsvcs/CosEvent/CEC_ProxyPullConsumer.h"
00004 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00005 #include "orbsvcs/CosEvent/CEC_SupplierAdmin.h"
00006 #include "orbsvcs/CosEvent/CEC_ConsumerAdmin.h"
00007 #include "orbsvcs/CosEvent/CEC_Reactive_Pulling_Strategy.h"
00008
00009 #include "orbsvcs/Time_Utilities.h"
00010
00011 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00012 #include "tao/Messaging/Messaging.h"
00013 #endif
00014
00015 #include "tao/ORB_Core.h"
00016
00017 #include "ace/Reactor.h"
00018
00019 #if ! defined (__ACE_INLINE__)
00020 #include "orbsvcs/CosEvent/CEC_Reactive_Pulling_Strategy.i"
00021 #endif
00022
00023 ACE_RCSID(CosEvent, CEC_Reactive_Pulling_Strategy, "CEC_Reactive_Pulling_Strategy.cpp,v 1.19 2006/03/14 06:14:25 jtc Exp")
00024
00025
00026 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00027
00028 TAO_CEC_Reactive_Pulling_Strategy::
00029 TAO_CEC_Reactive_Pulling_Strategy (const ACE_Time_Value &rate,
00030 const ACE_Time_Value &relative_timeout,
00031 TAO_CEC_EventChannel *event_channel,
00032 CORBA::ORB_ptr orb)
00033 : adapter_ (this),
00034 rate_ (rate),
00035 relative_timeout_ (relative_timeout),
00036 event_channel_ (event_channel),
00037 orb_ (CORBA::ORB::_duplicate (orb))
00038 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00039
00040
00041 , timer_id_ (-1)
00042 #endif
00043 {
00044 this->reactor_ =
00045 this->orb_->orb_core ()->reactor ();
00046 }
00047
00048 void
00049 TAO_CEC_Reactive_Pulling_Strategy::handle_timeout (
00050 const ACE_Time_Value &,
00051 const void *)
00052 {
00053 ACE_TRY_NEW_ENV
00054 {
00055
00056
00057 CORBA::PolicyTypeSeq types;
00058 CORBA::PolicyList_var policies =
00059 this->policy_current_->get_policy_overrides (types
00060 ACE_ENV_ARG_PARAMETER);
00061 ACE_TRY_CHECK;
00062
00063
00064 this->policy_current_->set_policy_overrides (this->policy_list_,
00065 CORBA::ADD_OVERRIDE
00066 ACE_ENV_ARG_PARAMETER);
00067 ACE_TRY_CHECK;
00068
00069 ACE_TRY_EX (query)
00070 {
00071 TAO_CEC_Pull_Event worker (this->event_channel_->consumer_admin (),
00072 this->event_channel_->supplier_control ());
00073
00074 this->event_channel_->supplier_admin ()->for_each (&worker
00075 ACE_ENV_ARG_PARAMETER);
00076 ACE_TRY_CHECK_EX (query);
00077 }
00078 ACE_CATCHANY
00079 {
00080
00081 }
00082 ACE_ENDTRY;
00083
00084 this->policy_current_->set_policy_overrides (policies.in (),
00085 CORBA::SET_OVERRIDE
00086 ACE_ENV_ARG_PARAMETER);
00087 ACE_TRY_CHECK;
00088 for (CORBA::ULong i = 0; i != policies->length (); ++i)
00089 {
00090 policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00091 ACE_TRY_CHECK;
00092 }
00093 }
00094 ACE_CATCHANY
00095 {
00096
00097 }
00098 ACE_ENDTRY;
00099 }
00100
00101 void
00102 TAO_CEC_Reactive_Pulling_Strategy::activate (void)
00103 {
00104 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00105 timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00106 0,
00107 this->rate_,
00108 this->rate_);
00109 if (timer_id_ == -1)
00110 return;
00111
00112 ACE_TRY_NEW_ENV
00113 {
00114
00115 CORBA::Object_var tmp =
00116 this->orb_->resolve_initial_references ("PolicyCurrent"
00117 ACE_ENV_ARG_PARAMETER);
00118 ACE_TRY_CHECK;
00119
00120 this->policy_current_ =
00121 CORBA::PolicyCurrent::_narrow (tmp.in ()
00122 ACE_ENV_ARG_PARAMETER);
00123 ACE_TRY_CHECK;
00124
00125
00126
00127
00128 TimeBase::TimeT timeout;
00129 ORBSVCS_Time::Time_Value_to_TimeT (timeout,
00130 this->relative_timeout_);
00131 CORBA::Any any;
00132 any <<= timeout;
00133
00134 this->policy_list_.length (1);
00135 this->policy_list_[0] =
00136 this->orb_->create_policy (
00137 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00138 any
00139 ACE_ENV_ARG_PARAMETER);
00140 ACE_TRY_CHECK;
00141 }
00142 ACE_CATCHANY
00143 {
00144 }
00145 ACE_ENDTRY;
00146 #endif
00147 }
00148
00149 void
00150 TAO_CEC_Reactive_Pulling_Strategy::shutdown (void)
00151 {
00152 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00153 this->reactor_->cancel_timer (timer_id_);
00154 #endif
00155 this->adapter_.reactor (0);
00156 }
00157
00158
00159
00160 TAO_CEC_Pulling_Strategy_Adapter::TAO_CEC_Pulling_Strategy_Adapter (
00161 TAO_CEC_Reactive_Pulling_Strategy *adaptee)
00162 : adaptee_ (adaptee)
00163 {
00164 }
00165
00166 int
00167 TAO_CEC_Pulling_Strategy_Adapter::handle_timeout (
00168 const ACE_Time_Value &tv,
00169 const void *arg)
00170 {
00171 this->adaptee_->handle_timeout (tv, arg);
00172 return 0;
00173 }
00174
00175
00176
00177 void
00178 TAO_CEC_Pull_Event::work (TAO_CEC_ProxyPullConsumer *consumer
00179 ACE_ENV_ARG_DECL)
00180 {
00181 CORBA::Boolean has_event = 0;
00182 CORBA::Any_var any;
00183
00184 ACE_TRY
00185 {
00186 any = consumer->try_pull_from_supplier (has_event
00187 ACE_ENV_ARG_PARAMETER);
00188 ACE_TRY_CHECK;
00189 }
00190 ACE_CATCHANY
00191 {
00192
00193 return;
00194 }
00195 ACE_ENDTRY;
00196
00197 if (has_event)
00198 {
00199 this->consumer_admin_->push (any.in () ACE_ENV_ARG_PARAMETER);
00200 ACE_CHECK;
00201 }
00202 }
00203
00204 TAO_END_VERSIONED_NAMESPACE_DECL