00001
00002
00003 #include "orbsvcs/CosEvent/CEC_ProxyPullConsumer.h"
00004 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00005 #include "orbsvcs/CosEvent/CEC_SupplierAdmin.h"
00006 #include "orbsvcs/CosEvent/CEC_ConsumerAdmin.h"
00007 #include "orbsvcs/CosEvent/CEC_Reactive_Pulling_Strategy.h"
00008
00009 #include "orbsvcs/Time_Utilities.h"
00010
00011 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00012 #include "tao/Messaging/Messaging.h"
00013 #endif
00014
00015 #include "tao/ORB_Core.h"
00016
00017 #include "ace/Reactor.h"
00018
00019 #if ! defined (__ACE_INLINE__)
00020 #include "orbsvcs/CosEvent/CEC_Reactive_Pulling_Strategy.inl"
00021 #endif
00022
00023 ACE_RCSID(CosEvent, CEC_Reactive_Pulling_Strategy, "$Id: CEC_Reactive_Pulling_Strategy.cpp 76626 2007-01-26 13:50:03Z elliott_c $")
00024
00025
00026 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00027
00028 TAO_CEC_Reactive_Pulling_Strategy::
00029 TAO_CEC_Reactive_Pulling_Strategy (const ACE_Time_Value &rate,
00030 const ACE_Time_Value &relative_timeout,
00031 TAO_CEC_EventChannel *event_channel,
00032 CORBA::ORB_ptr orb)
00033 : adapter_ (this),
00034 rate_ (rate),
00035 relative_timeout_ (relative_timeout),
00036 event_channel_ (event_channel),
00037 orb_ (CORBA::ORB::_duplicate (orb))
00038 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00039
00040
00041 , timer_id_ (-1)
00042 #endif
00043 {
00044 this->reactor_ =
00045 this->orb_->orb_core ()->reactor ();
00046 }
00047
00048 void
00049 TAO_CEC_Reactive_Pulling_Strategy::handle_timeout (
00050 const ACE_Time_Value &,
00051 const void *)
00052 {
00053 try
00054 {
00055
00056
00057 CORBA::PolicyTypeSeq types;
00058 CORBA::PolicyList_var policies =
00059 this->policy_current_->get_policy_overrides (types);
00060
00061
00062 this->policy_current_->set_policy_overrides (this->policy_list_,
00063 CORBA::ADD_OVERRIDE);
00064
00065 try
00066 {
00067 TAO_CEC_Pull_Event worker (this->event_channel_->consumer_admin (),
00068 this->event_channel_->supplier_control ());
00069
00070 this->event_channel_->supplier_admin ()->for_each (&worker);
00071 }
00072 catch (const CORBA::Exception&)
00073 {
00074
00075 }
00076
00077 this->policy_current_->set_policy_overrides (policies.in (),
00078 CORBA::SET_OVERRIDE);
00079 for (CORBA::ULong i = 0; i != policies->length (); ++i)
00080 {
00081 policies[i]->destroy ();
00082 }
00083 }
00084 catch (const CORBA::Exception&)
00085 {
00086
00087 }
00088 }
00089
00090 void
00091 TAO_CEC_Reactive_Pulling_Strategy::activate (void)
00092 {
00093 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00094 timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00095 0,
00096 this->rate_,
00097 this->rate_);
00098 if (timer_id_ == -1)
00099 return;
00100
00101 try
00102 {
00103
00104 CORBA::Object_var tmp =
00105 this->orb_->resolve_initial_references ("PolicyCurrent");
00106
00107 this->policy_current_ =
00108 CORBA::PolicyCurrent::_narrow (tmp.in ());
00109
00110
00111
00112
00113 TimeBase::TimeT timeout;
00114 ORBSVCS_Time::Time_Value_to_TimeT (timeout,
00115 this->relative_timeout_);
00116 CORBA::Any any;
00117 any <<= timeout;
00118
00119 this->policy_list_.length (1);
00120 this->policy_list_[0] =
00121 this->orb_->create_policy (
00122 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00123 any);
00124 }
00125 catch (const CORBA::Exception&)
00126 {
00127 }
00128 #endif
00129 }
00130
00131 void
00132 TAO_CEC_Reactive_Pulling_Strategy::shutdown (void)
00133 {
00134 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00135 this->reactor_->cancel_timer (timer_id_);
00136 #endif
00137 this->adapter_.reactor (0);
00138 }
00139
00140
00141
00142 TAO_CEC_Pulling_Strategy_Adapter::TAO_CEC_Pulling_Strategy_Adapter (
00143 TAO_CEC_Reactive_Pulling_Strategy *adaptee)
00144 : adaptee_ (adaptee)
00145 {
00146 }
00147
00148 int
00149 TAO_CEC_Pulling_Strategy_Adapter::handle_timeout (
00150 const ACE_Time_Value &tv,
00151 const void *arg)
00152 {
00153 this->adaptee_->handle_timeout (tv, arg);
00154 return 0;
00155 }
00156
00157
00158
00159 void
00160 TAO_CEC_Pull_Event::work (TAO_CEC_ProxyPullConsumer *consumer)
00161 {
00162 CORBA::Boolean has_event = 0;
00163 CORBA::Any_var any;
00164
00165 try
00166 {
00167 any = consumer->try_pull_from_supplier (has_event);
00168 }
00169 catch (const CORBA::Exception&)
00170 {
00171
00172 return;
00173 }
00174
00175 if (has_event)
00176 {
00177 this->consumer_admin_->push (any.in ());
00178 }
00179 }
00180
00181 TAO_END_VERSIONED_NAMESPACE_DECL