EC_Reactive_SupplierControl.cpp

Go to the documentation of this file.
00001 // $Id: EC_Reactive_SupplierControl.cpp 76626 2007-01-26 13:50:03Z elliott_c $
00002 
00003 #include "orbsvcs/Event/EC_SupplierAdmin.h"
00004 #include "orbsvcs/Event/EC_Reactive_SupplierControl.h"
00005 #include "orbsvcs/Event/EC_Event_Channel_Base.h"
00006 #include "orbsvcs/Event/EC_ProxyConsumer.h"
00007 #include "orbsvcs/Event/EC_ProxySupplier.h" // @@ MSVC 6 bug
00008 
00009 #include "tao/Messaging/Messaging.h"
00010 #include "tao/ORB_Core.h"
00011 
00012 #include "ace/Reactor.h"
00013 
00014 #if ! defined (__ACE_INLINE__)
00015 #include "orbsvcs/Event/EC_Reactive_SupplierControl.inl"
00016 #endif /* __ACE_INLINE__ */
00017 
00018 ACE_RCSID(Event, EC_Reactive_SupplierControl, "$Id: EC_Reactive_SupplierControl.cpp 76626 2007-01-26 13:50:03Z elliott_c $")
00019 
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021 
00022 TAO_EC_Reactive_SupplierControl::
00023      TAO_EC_Reactive_SupplierControl (const ACE_Time_Value &rate,
00024                                       const ACE_Time_Value &timeout,
00025                                       TAO_EC_Event_Channel_Base *ec,
00026                                       CORBA::ORB_ptr orb)
00027   : rate_ (rate),
00028     timeout_ (timeout),
00029     adapter_ (this),
00030     event_channel_ (ec),
00031     orb_ (CORBA::ORB::_duplicate (orb))
00032 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00033     , timer_id_ (-1)
00034 #endif /* TAO_HAS_CORBA_MESSAGING != 0*/
00035 {
00036   this->reactor_ =
00037     this->orb_->orb_core ()->reactor ();
00038 }
00039 
00040 TAO_EC_Reactive_SupplierControl::~TAO_EC_Reactive_SupplierControl (void)
00041 {
00042 }
00043 
00044 void
00045 TAO_EC_Reactive_SupplierControl::query_suppliers ()
00046 {
00047   TAO_EC_Ping_Supplier worker (this);
00048   this->event_channel_->for_each_supplier (&worker);
00049 }
00050 
00051 void
00052 TAO_EC_Reactive_SupplierControl::handle_timeout (
00053       const ACE_Time_Value &,
00054       const void *)
00055 {
00056   try
00057     {
00058       // Query the state of the Current object *before* we initiate
00059       // the iteration...
00060       CORBA::PolicyTypeSeq types;
00061       CORBA::PolicyList_var policies =
00062         this->policy_current_->get_policy_overrides (types);
00063 
00064       // Change the timeout
00065       this->policy_current_->set_policy_overrides (this->policy_list_,
00066                                                    CORBA::ADD_OVERRIDE);
00067 
00068       // Query the state of the suppliers...
00069       this->query_suppliers ();
00070 
00071       this->policy_current_->set_policy_overrides (policies.in (),
00072                                                    CORBA::SET_OVERRIDE);
00073       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00074         {
00075           policies[i]->destroy ();
00076         }
00077     }
00078   catch (const CORBA::Exception&)
00079     {
00080       // Ignore all exceptions
00081     }
00082 }
00083 
00084 int
00085 TAO_EC_Reactive_SupplierControl::activate (void)
00086 {
00087 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00088   try
00089     {
00090       // Get the PolicyCurrent object
00091       CORBA::Object_var tmp =
00092         this->orb_->resolve_initial_references ("PolicyCurrent");
00093 
00094       this->policy_current_ =
00095         CORBA::PolicyCurrent::_narrow (tmp.in ());
00096 
00097       // Timeout for polling state (default = 10 msec)
00098       TimeBase::TimeT timeout = timeout_.usec() * 10;
00099       CORBA::Any any;
00100       any <<= timeout;
00101 
00102       this->policy_list_.length (1);
00103       this->policy_list_[0] =
00104         this->orb_->create_policy (
00105                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00106                any);
00107 
00108       // Only schedule the timer, when the rate is not zero
00109       if (this->rate_ != ACE_Time_Value::zero)
00110       {
00111         // Schedule the timer after these policies has been set, because the
00112         // handle_timeout uses these policies, if done in front, the channel
00113         // can crash when the timeout expires before initiazation is ready.
00114         timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00115                                                     0,
00116                                                     this->rate_,
00117                                                     this->rate_);
00118         if (timer_id_ == -1)
00119           return -1;
00120       }
00121     }
00122   catch (const CORBA::Exception&)
00123     {
00124       return -1;
00125     }
00126 #endif /* TAO_HAS_CORBA_MESSAGING */
00127 
00128   return 0;
00129 }
00130 
00131 int
00132 TAO_EC_Reactive_SupplierControl::shutdown (void)
00133 {
00134   int r = 0;
00135 
00136 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00137   r = this->reactor_->cancel_timer (timer_id_);
00138 #endif /* TAO_HAS_CORBA_MESSAGING */
00139   this->adapter_.reactor (0);
00140   return r;
00141 }
00142 
00143 void
00144 TAO_EC_Reactive_SupplierControl::supplier_not_exist (
00145       TAO_EC_ProxyPushConsumer *proxy)
00146 {
00147   try
00148     {
00149       //ACE_DEBUG ((LM_DEBUG,
00150       //            "EC_Reactive_SupplierControl(%P|%t) - "
00151       //            "Consumer %x does not exists\n", long(proxy)));
00152       proxy->disconnect_push_consumer ();
00153     }
00154   catch (const CORBA::Exception&)
00155     {
00156       // Ignore all exceptions..
00157     }
00158 }
00159 
00160 void
00161 TAO_EC_Reactive_SupplierControl::system_exception (
00162       TAO_EC_ProxyPushConsumer *proxy,
00163       CORBA::SystemException & /* exception */)
00164 {
00165   try
00166     {
00167       // The current implementation is very strict, and kicks out a
00168       // client on the first system exception. We may
00169       // want to be more lenient in the future, for example,
00170       // this is TAO's minor code for a failed connection.
00171       //
00172       // if (CORBA::TRANSIENT::_narrow (&exception) != 0
00173       //     && exception->minor () == 0x54410085)
00174       //   return;
00175 
00176       // Anything else is serious, including timeouts...
00177       proxy->disconnect_push_consumer ();
00178     }
00179   catch (const CORBA::Exception&)
00180     {
00181       // Ignore all exceptions..
00182     }
00183 }
00184 
00185 // ****************************************************************
00186 
00187 TAO_EC_SupplierControl_Adapter::TAO_EC_SupplierControl_Adapter (
00188       TAO_EC_Reactive_SupplierControl *adaptee)
00189   :  adaptee_ (adaptee)
00190 {
00191 }
00192 
00193 int
00194 TAO_EC_SupplierControl_Adapter::handle_timeout (
00195       const ACE_Time_Value &tv,
00196       const void *arg)
00197 {
00198   this->adaptee_->handle_timeout (tv, arg);
00199   return 0;
00200 }
00201 
00202 // ****************************************************************
00203 
00204 void
00205 TAO_EC_Ping_Supplier::work (TAO_EC_ProxyPushConsumer *consumer)
00206 {
00207   try
00208     {
00209       CORBA::Boolean disconnected;
00210       CORBA::Boolean non_existent =
00211         consumer->supplier_non_existent (disconnected);
00212       if (non_existent && !disconnected)
00213         {
00214           this->control_->supplier_not_exist (consumer);
00215         }
00216     }
00217   catch (const CORBA::OBJECT_NOT_EXIST&)
00218     {
00219       this->control_->supplier_not_exist (consumer);
00220     }
00221   catch (const CORBA::TRANSIENT&)
00222     {
00223       // This is TAO's minor code for a failed connection, we may
00224       // want to be more lenient in the future..
00225       // if (transient.minor () == 0x54410085)
00226       this->control_->supplier_not_exist (consumer);
00227     }
00228   catch (const CORBA::Exception&)
00229     {
00230       // Ignore all exceptions
00231     }
00232 }
00233 
00234 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:44:05 2010 for TAO_RTEvent by  doxygen 1.4.7