00001
00002
00003 #include "orbsvcs/Event/EC_ConsumerAdmin.h"
00004 #include "orbsvcs/Event/EC_Reactive_ConsumerControl.h"
00005 #include "orbsvcs/Event/EC_Event_Channel_Base.h"
00006 #include "orbsvcs/Event/EC_ProxySupplier.h"
00007 #include "orbsvcs/Event/EC_ProxyConsumer.h"
00008
00009 #include "tao/Messaging/Messaging.h"
00010 #include "tao/ORB_Core.h"
00011
00012 #include "ace/Reactor.h"
00013
00014 #if ! defined (__ACE_INLINE__)
00015 #include "orbsvcs/Event/EC_Reactive_ConsumerControl.inl"
00016 #endif
00017
00018 ACE_RCSID(Event, EC_Reactive_ConsumerControl, "$Id: EC_Reactive_ConsumerControl.cpp 76626 2007-01-26 13:50:03Z elliott_c $")
00019
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021
00022 TAO_EC_Reactive_ConsumerControl::
00023 TAO_EC_Reactive_ConsumerControl (const ACE_Time_Value &rate,
00024 const ACE_Time_Value &timeout,
00025 TAO_EC_Event_Channel_Base *ec,
00026 CORBA::ORB_ptr orb)
00027 : rate_ (rate),
00028 timeout_ (timeout),
00029 adapter_ (this),
00030 event_channel_ (ec),
00031 orb_ (CORBA::ORB::_duplicate (orb))
00032 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00033 , timer_id_ (-1)
00034 #endif
00035 {
00036 this->reactor_ =
00037 this->orb_->orb_core ()->reactor ();
00038 }
00039
00040 TAO_EC_Reactive_ConsumerControl::~TAO_EC_Reactive_ConsumerControl (void)
00041 {
00042 }
00043
00044 void
00045 TAO_EC_Reactive_ConsumerControl::query_consumers ()
00046 {
00047 TAO_EC_Ping_Consumer worker (this);
00048 this->event_channel_->for_each_consumer (&worker);
00049 }
00050
00051 void
00052 TAO_EC_Reactive_ConsumerControl::handle_timeout (
00053 const ACE_Time_Value &,
00054 const void *)
00055 {
00056
00057
00058
00059
00060
00061
00062
00063
00064 try
00065 {
00066
00067
00068 CORBA::PolicyTypeSeq types;
00069 CORBA::PolicyList_var policies =
00070 this->policy_current_->get_policy_overrides (types);
00071
00072
00073 this->policy_current_->set_policy_overrides (this->policy_list_,
00074 CORBA::ADD_OVERRIDE);
00075
00076
00077 this->query_consumers ();
00078
00079 this->policy_current_->set_policy_overrides (policies.in (),
00080 CORBA::SET_OVERRIDE);
00081 for (CORBA::ULong i = 0; i != policies->length (); ++i)
00082 {
00083 policies[i]->destroy ();
00084 }
00085 }
00086 catch (const CORBA::Exception&)
00087 {
00088
00089 }
00090 }
00091
00092 int
00093 TAO_EC_Reactive_ConsumerControl::activate (void)
00094 {
00095 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00096 try
00097 {
00098
00099 CORBA::Object_var tmp =
00100 this->orb_->resolve_initial_references ("PolicyCurrent");
00101
00102 this->policy_current_ =
00103 CORBA::PolicyCurrent::_narrow (tmp.in ());
00104
00105
00106 TimeBase::TimeT timeout = timeout_.usec() * 10;
00107 CORBA::Any any;
00108 any <<= timeout;
00109
00110 this->policy_list_.length (1);
00111 this->policy_list_[0] =
00112 this->orb_->create_policy (
00113 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00114 any);
00115
00116
00117 if (this->rate_ != ACE_Time_Value::zero)
00118 {
00119
00120
00121
00122 timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00123 0,
00124 this->rate_,
00125 this->rate_);
00126 if (timer_id_ == -1)
00127 return -1;
00128 }
00129 }
00130 catch (const CORBA::Exception&)
00131 {
00132 return -1;
00133 }
00134 #endif
00135
00136 return 0;
00137 }
00138
00139 int
00140 TAO_EC_Reactive_ConsumerControl::shutdown (void)
00141 {
00142 int r = 0;
00143
00144 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00145 r = this->reactor_->cancel_timer (timer_id_);
00146 #endif
00147 this->adapter_.reactor (0);
00148 return r;
00149 }
00150
00151 void
00152 TAO_EC_Reactive_ConsumerControl::consumer_not_exist (
00153 TAO_EC_ProxyPushSupplier *proxy)
00154 {
00155 try
00156 {
00157
00158
00159
00160 proxy->disconnect_push_supplier ();
00161 }
00162 catch (const CORBA::Exception& ex)
00163 {
00164 ex._tao_print_exception ("Reactive_ConsumerControl::consumer_not_exist");
00165
00166 }
00167 }
00168
00169 void
00170 TAO_EC_Reactive_ConsumerControl::system_exception (
00171 TAO_EC_ProxyPushSupplier *proxy,
00172 CORBA::SystemException & )
00173 {
00174 try
00175 {
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186 proxy->disconnect_push_supplier ();
00187 }
00188 catch (const CORBA::Exception&)
00189 {
00190
00191 }
00192 }
00193
00194
00195
00196 TAO_EC_ConsumerControl_Adapter::TAO_EC_ConsumerControl_Adapter (
00197 TAO_EC_Reactive_ConsumerControl *adaptee)
00198 : adaptee_ (adaptee)
00199 {
00200 }
00201
00202 int
00203 TAO_EC_ConsumerControl_Adapter::handle_timeout (
00204 const ACE_Time_Value &tv,
00205 const void *arg)
00206 {
00207 this->adaptee_->handle_timeout (tv, arg);
00208 return 0;
00209 }
00210
00211
00212
00213 void
00214 TAO_EC_Ping_Consumer::work (TAO_EC_ProxyPushSupplier *supplier)
00215 {
00216 try
00217 {
00218 CORBA::Boolean disconnected;
00219 CORBA::Boolean non_existent =
00220 supplier->consumer_non_existent (disconnected);
00221 if (non_existent && !disconnected)
00222 {
00223 this->control_->consumer_not_exist (supplier);
00224 }
00225 }
00226 catch (const CORBA::OBJECT_NOT_EXIST&)
00227 {
00228 this->control_->consumer_not_exist (supplier);
00229 }
00230 catch (const CORBA::TRANSIENT&)
00231 {
00232
00233
00234
00235 this->control_->consumer_not_exist (supplier);
00236 }
00237 catch (const CORBA::Exception&)
00238 {
00239
00240 }
00241 }
00242
00243 TAO_END_VERSIONED_NAMESPACE_DECL