EC_Reactive_ConsumerControl.cpp

Go to the documentation of this file.
00001 // $Id: EC_Reactive_ConsumerControl.cpp 76626 2007-01-26 13:50:03Z elliott_c $
00002 
00003 #include "orbsvcs/Event/EC_ConsumerAdmin.h"
00004 #include "orbsvcs/Event/EC_Reactive_ConsumerControl.h"
00005 #include "orbsvcs/Event/EC_Event_Channel_Base.h"
00006 #include "orbsvcs/Event/EC_ProxySupplier.h"
00007 #include "orbsvcs/Event/EC_ProxyConsumer.h" // @@ MSVC 6 bug
00008 
00009 #include "tao/Messaging/Messaging.h"
00010 #include "tao/ORB_Core.h"
00011 
00012 #include "ace/Reactor.h"
00013 
00014 #if ! defined (__ACE_INLINE__)
00015 #include "orbsvcs/Event/EC_Reactive_ConsumerControl.inl"
00016 #endif /* __ACE_INLINE__ */
00017 
00018 ACE_RCSID(Event, EC_Reactive_ConsumerControl, "$Id: EC_Reactive_ConsumerControl.cpp 76626 2007-01-26 13:50:03Z elliott_c $")
00019 
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021 
00022 TAO_EC_Reactive_ConsumerControl::
00023      TAO_EC_Reactive_ConsumerControl (const ACE_Time_Value &rate,
00024                                       const ACE_Time_Value &timeout,
00025                                       TAO_EC_Event_Channel_Base *ec,
00026                                       CORBA::ORB_ptr orb)
00027   : rate_ (rate),
00028     timeout_ (timeout),
00029     adapter_ (this),
00030     event_channel_ (ec),
00031     orb_ (CORBA::ORB::_duplicate (orb))
00032 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00033     , timer_id_ (-1)
00034 #endif /* TAO_HAS_CORBA_MESSAGING != 0*/
00035 {
00036   this->reactor_ =
00037     this->orb_->orb_core ()->reactor ();
00038 }
00039 
00040 TAO_EC_Reactive_ConsumerControl::~TAO_EC_Reactive_ConsumerControl (void)
00041 {
00042 }
00043 
00044 void
00045 TAO_EC_Reactive_ConsumerControl::query_consumers ()
00046 {
00047   TAO_EC_Ping_Consumer worker (this);
00048   this->event_channel_->for_each_consumer (&worker);
00049 }
00050 
00051 void
00052 TAO_EC_Reactive_ConsumerControl::handle_timeout (
00053       const ACE_Time_Value &,
00054       const void *)
00055 {
00056   // NOTE, setting RELATIVE_RT_TIMEOUT_POLICY for the duration of
00057   // query_consumers () below has greater impact than desired.  For
00058   // example, while we are pinging consumers here, a nested upcall,
00059   // which requires making remote calls may come into the ORB.  Those
00060   // remote calls will be carried out with with
00061   // RELATIVE_RT_TIMEOUT_POLICY set here in effect.
00062 
00063   // @@ TODO: should use Guard to set and reset policies.
00064   try
00065     {
00066       // Query the state of the Current object *before* we initiate
00067       // the iteration...
00068       CORBA::PolicyTypeSeq types;
00069       CORBA::PolicyList_var policies =
00070         this->policy_current_->get_policy_overrides (types);
00071 
00072       // Change the timeout
00073       this->policy_current_->set_policy_overrides (this->policy_list_,
00074                                                    CORBA::ADD_OVERRIDE);
00075 
00076       // Query the state of the consumers...
00077       this->query_consumers ();
00078 
00079       this->policy_current_->set_policy_overrides (policies.in (),
00080                                                    CORBA::SET_OVERRIDE);
00081       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00082         {
00083           policies[i]->destroy ();
00084         }
00085     }
00086   catch (const CORBA::Exception&)
00087     {
00088       // Ignore all exceptions
00089     }
00090 }
00091 
00092 int
00093 TAO_EC_Reactive_ConsumerControl::activate (void)
00094 {
00095 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00096   try
00097     {
00098       // Get the PolicyCurrent object
00099       CORBA::Object_var tmp =
00100         this->orb_->resolve_initial_references ("PolicyCurrent");
00101 
00102       this->policy_current_ =
00103         CORBA::PolicyCurrent::_narrow (tmp.in ());
00104 
00105       // Timeout for polling state (default = 10 msec)
00106       TimeBase::TimeT timeout = timeout_.usec() * 10;
00107       CORBA::Any any;
00108       any <<= timeout;
00109 
00110       this->policy_list_.length (1);
00111       this->policy_list_[0] =
00112         this->orb_->create_policy (
00113                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00114                any);
00115 
00116       // Only schedule the timer, when the rate is not zero
00117       if (this->rate_ != ACE_Time_Value::zero)
00118       {
00119         // Schedule the timer after these policies has been set, because the
00120         // handle_timeout uses these policies, if done in front, the channel
00121         // can crash when the timeout expires before initiazation is ready.
00122         timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00123                                                     0,
00124                                                     this->rate_,
00125                                                     this->rate_);
00126         if (timer_id_ == -1)
00127           return -1;
00128       }
00129     }
00130   catch (const CORBA::Exception&)
00131     {
00132       return -1;
00133     }
00134 #endif /* TAO_HAS_CORBA_MESSAGING */
00135 
00136   return 0;
00137 }
00138 
00139 int
00140 TAO_EC_Reactive_ConsumerControl::shutdown (void)
00141 {
00142   int r = 0;
00143 
00144 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00145   r = this->reactor_->cancel_timer (timer_id_);
00146 #endif /* TAO_HAS_CORBA_MESSAGING */
00147   this->adapter_.reactor (0);
00148   return r;
00149 }
00150 
00151 void
00152 TAO_EC_Reactive_ConsumerControl::consumer_not_exist (
00153       TAO_EC_ProxyPushSupplier *proxy)
00154 {
00155   try
00156     {
00157       //ACE_DEBUG ((LM_DEBUG,
00158       //            "EC_Reactive_ConsumerControl(%P|%t) - "
00159       //            "Consumer %x does not exists\n", long(proxy)));
00160       proxy->disconnect_push_supplier ();
00161     }
00162   catch (const CORBA::Exception& ex)
00163     {
00164       ex._tao_print_exception ("Reactive_ConsumerControl::consumer_not_exist");
00165       // Ignore all exceptions..
00166     }
00167 }
00168 
00169 void
00170 TAO_EC_Reactive_ConsumerControl::system_exception (
00171       TAO_EC_ProxyPushSupplier *proxy,
00172       CORBA::SystemException & /* exception */)
00173 {
00174   try
00175     {
00176       // The current implementation is very strict, and kicks out a
00177       // client on the first system exception. We may
00178       // want to be more lenient in the future, for example,
00179       // this is TAO's minor code for a failed connection.
00180       //
00181       // if (CORBA::TRANSIENT::_narrow (&exception) != 0
00182       //     && exception->minor () == 0x54410085)
00183       //   return;
00184 
00185       // Anything else is serious, including timeouts...
00186       proxy->disconnect_push_supplier ();
00187     }
00188   catch (const CORBA::Exception&)
00189     {
00190       // Ignore all exceptions..
00191     }
00192 }
00193 
00194 // ****************************************************************
00195 
00196 TAO_EC_ConsumerControl_Adapter::TAO_EC_ConsumerControl_Adapter (
00197       TAO_EC_Reactive_ConsumerControl *adaptee)
00198   :  adaptee_ (adaptee)
00199 {
00200 }
00201 
00202 int
00203 TAO_EC_ConsumerControl_Adapter::handle_timeout (
00204       const ACE_Time_Value &tv,
00205       const void *arg)
00206 {
00207   this->adaptee_->handle_timeout (tv, arg);
00208   return 0;
00209 }
00210 
00211 // ****************************************************************
00212 
00213 void
00214 TAO_EC_Ping_Consumer::work (TAO_EC_ProxyPushSupplier *supplier)
00215 {
00216   try
00217     {
00218       CORBA::Boolean disconnected;
00219       CORBA::Boolean non_existent =
00220         supplier->consumer_non_existent (disconnected);
00221       if (non_existent && !disconnected)
00222         {
00223           this->control_->consumer_not_exist (supplier);
00224         }
00225     }
00226   catch (const CORBA::OBJECT_NOT_EXIST&)
00227     {
00228       this->control_->consumer_not_exist (supplier);
00229     }
00230   catch (const CORBA::TRANSIENT&)
00231     {
00232       // This is TAO's minor code for a failed connection, we may
00233       // want to be more lenient in the future..
00234       // if (transient.minor () == 0x54410085)
00235       this->control_->consumer_not_exist (supplier);
00236     }
00237   catch (const CORBA::Exception&)
00238     {
00239       // Ignore all exceptions
00240     }
00241 }
00242 
00243 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:44:05 2010 for TAO_RTEvent by  doxygen 1.4.7