00001
00002
00003 #include "orbsvcs/Event/EC_SupplierAdmin.h"
00004 #include "orbsvcs/Event/EC_Reactive_SupplierControl.h"
00005 #include "orbsvcs/Event/EC_Event_Channel_Base.h"
00006 #include "orbsvcs/Event/EC_ProxyConsumer.h"
00007 #include "orbsvcs/Event/EC_ProxySupplier.h"
00008
00009 #include "tao/Messaging/Messaging.h"
00010 #include "tao/ORB_Core.h"
00011
00012 #include "ace/Reactor.h"
00013
00014 #if ! defined (__ACE_INLINE__)
00015 #include "orbsvcs/Event/EC_Reactive_SupplierControl.inl"
00016 #endif
00017
00018 ACE_RCSID(Event, EC_Reactive_SupplierControl, "$Id: EC_Reactive_SupplierControl.cpp 76626 2007-01-26 13:50:03Z elliott_c $")
00019
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021
00022 TAO_EC_Reactive_SupplierControl::
00023 TAO_EC_Reactive_SupplierControl (const ACE_Time_Value &rate,
00024 const ACE_Time_Value &timeout,
00025 TAO_EC_Event_Channel_Base *ec,
00026 CORBA::ORB_ptr orb)
00027 : rate_ (rate),
00028 timeout_ (timeout),
00029 adapter_ (this),
00030 event_channel_ (ec),
00031 orb_ (CORBA::ORB::_duplicate (orb))
00032 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00033 , timer_id_ (-1)
00034 #endif
00035 {
00036 this->reactor_ =
00037 this->orb_->orb_core ()->reactor ();
00038 }
00039
00040 TAO_EC_Reactive_SupplierControl::~TAO_EC_Reactive_SupplierControl (void)
00041 {
00042 }
00043
00044 void
00045 TAO_EC_Reactive_SupplierControl::query_suppliers ()
00046 {
00047 TAO_EC_Ping_Supplier worker (this);
00048 this->event_channel_->for_each_supplier (&worker);
00049 }
00050
00051 void
00052 TAO_EC_Reactive_SupplierControl::handle_timeout (
00053 const ACE_Time_Value &,
00054 const void *)
00055 {
00056 try
00057 {
00058
00059
00060 CORBA::PolicyTypeSeq types;
00061 CORBA::PolicyList_var policies =
00062 this->policy_current_->get_policy_overrides (types);
00063
00064
00065 this->policy_current_->set_policy_overrides (this->policy_list_,
00066 CORBA::ADD_OVERRIDE);
00067
00068
00069 this->query_suppliers ();
00070
00071 this->policy_current_->set_policy_overrides (policies.in (),
00072 CORBA::SET_OVERRIDE);
00073 for (CORBA::ULong i = 0; i != policies->length (); ++i)
00074 {
00075 policies[i]->destroy ();
00076 }
00077 }
00078 catch (const CORBA::Exception&)
00079 {
00080
00081 }
00082 }
00083
00084 int
00085 TAO_EC_Reactive_SupplierControl::activate (void)
00086 {
00087 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00088 try
00089 {
00090
00091 CORBA::Object_var tmp =
00092 this->orb_->resolve_initial_references ("PolicyCurrent");
00093
00094 this->policy_current_ =
00095 CORBA::PolicyCurrent::_narrow (tmp.in ());
00096
00097
00098 TimeBase::TimeT timeout = timeout_.usec() * 10;
00099 CORBA::Any any;
00100 any <<= timeout;
00101
00102 this->policy_list_.length (1);
00103 this->policy_list_[0] =
00104 this->orb_->create_policy (
00105 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00106 any);
00107
00108
00109 if (this->rate_ != ACE_Time_Value::zero)
00110 {
00111
00112
00113
00114 timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00115 0,
00116 this->rate_,
00117 this->rate_);
00118 if (timer_id_ == -1)
00119 return -1;
00120 }
00121 }
00122 catch (const CORBA::Exception&)
00123 {
00124 return -1;
00125 }
00126 #endif
00127
00128 return 0;
00129 }
00130
00131 int
00132 TAO_EC_Reactive_SupplierControl::shutdown (void)
00133 {
00134 int r = 0;
00135
00136 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00137 r = this->reactor_->cancel_timer (timer_id_);
00138 #endif
00139 this->adapter_.reactor (0);
00140 return r;
00141 }
00142
00143 void
00144 TAO_EC_Reactive_SupplierControl::supplier_not_exist (
00145 TAO_EC_ProxyPushConsumer *proxy)
00146 {
00147 try
00148 {
00149
00150
00151
00152 proxy->disconnect_push_consumer ();
00153 }
00154 catch (const CORBA::Exception&)
00155 {
00156
00157 }
00158 }
00159
00160 void
00161 TAO_EC_Reactive_SupplierControl::system_exception (
00162 TAO_EC_ProxyPushConsumer *proxy,
00163 CORBA::SystemException & )
00164 {
00165 try
00166 {
00167
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177 proxy->disconnect_push_consumer ();
00178 }
00179 catch (const CORBA::Exception&)
00180 {
00181
00182 }
00183 }
00184
00185
00186
00187 TAO_EC_SupplierControl_Adapter::TAO_EC_SupplierControl_Adapter (
00188 TAO_EC_Reactive_SupplierControl *adaptee)
00189 : adaptee_ (adaptee)
00190 {
00191 }
00192
00193 int
00194 TAO_EC_SupplierControl_Adapter::handle_timeout (
00195 const ACE_Time_Value &tv,
00196 const void *arg)
00197 {
00198 this->adaptee_->handle_timeout (tv, arg);
00199 return 0;
00200 }
00201
00202
00203
00204 void
00205 TAO_EC_Ping_Supplier::work (TAO_EC_ProxyPushConsumer *consumer)
00206 {
00207 try
00208 {
00209 CORBA::Boolean disconnected;
00210 CORBA::Boolean non_existent =
00211 consumer->supplier_non_existent (disconnected);
00212 if (non_existent && !disconnected)
00213 {
00214 this->control_->supplier_not_exist (consumer);
00215 }
00216 }
00217 catch (const CORBA::OBJECT_NOT_EXIST&)
00218 {
00219 this->control_->supplier_not_exist (consumer);
00220 }
00221 catch (const CORBA::TRANSIENT&)
00222 {
00223
00224
00225
00226 this->control_->supplier_not_exist (consumer);
00227 }
00228 catch (const CORBA::Exception&)
00229 {
00230
00231 }
00232 }
00233
00234 TAO_END_VERSIONED_NAMESPACE_DECL