ECG_Reactive_ConsumerEC_Control.cpp

Go to the documentation of this file.
00001 // $Id: ECG_Reactive_ConsumerEC_Control.cpp 76626 2007-01-26 13:50:03Z elliott_c $
00002 
00003 #include "orbsvcs/Event/EC_Gateway_IIOP.h"
00004 #include "orbsvcs/Event/ECG_Reactive_ConsumerEC_Control.h"
00005 #include "tao/Messaging/Messaging.h"
00006 #include "tao/ORB_Core.h"
00007 
00008 #include "ace/Reactor.h"
00009 
00010 ACE_RCSID(Event, ECG_Reactive_ConsumerEventChannelControl, "$Id: ECG_Reactive_ConsumerEC_Control.cpp 76626 2007-01-26 13:50:03Z elliott_c $")
00011 
00012 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00013 
00014 TAO_ECG_Reactive_ConsumerEC_Control::
00015      TAO_ECG_Reactive_ConsumerEC_Control (const ACE_Time_Value &rate,
00016                                           const ACE_Time_Value &timeout,
00017                                           TAO_EC_Gateway_IIOP* gateway,
00018                                           CORBA::ORB_ptr orb)
00019   : rate_ (rate),
00020     timeout_ (timeout),
00021     adapter_ (this),
00022     gateway_ (gateway),
00023     orb_ (CORBA::ORB::_duplicate (orb))
00024 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00025     , timer_id_ (-1)
00026 #endif /* TAO_HAS_CORBA_MESSAGING != 0*/
00027 {
00028   this->reactor_ =
00029     this->orb_->orb_core ()->reactor ();
00030 }
00031 
00032 TAO_ECG_Reactive_ConsumerEC_Control::~TAO_ECG_Reactive_ConsumerEC_Control (void)
00033 {
00034 }
00035 
00036 void
00037 TAO_ECG_Reactive_ConsumerEC_Control::query_eventchannel ()
00038 {
00039   try
00040     {
00041       CORBA::Boolean disconnected;
00042       CORBA::Boolean non_existent =
00043         gateway_->consumer_ec_non_existent (disconnected);
00044       if (non_existent && !disconnected)
00045         {
00046           this->event_channel_not_exist (gateway_);
00047         }
00048     }
00049   catch (const CORBA::OBJECT_NOT_EXIST&)
00050     {
00051       this->event_channel_not_exist (gateway_);
00052     }
00053   catch (const CORBA::TRANSIENT&)
00054     {
00055       // This is TAO's minor code for a failed connection, we may
00056       // want to be more lenient in the future..
00057       // if (transient.minor () == 0x54410085)
00058       this->event_channel_not_exist (gateway_);
00059     }
00060   catch (const CORBA::Exception&)
00061     {
00062       // Ignore all exceptions
00063     }
00064 }
00065 
00066 void
00067 TAO_ECG_Reactive_ConsumerEC_Control::handle_timeout (
00068       const ACE_Time_Value &,
00069       const void *)
00070 {
00071   // NOTE, setting RELATIVE_RT_TIMEOUT_POLICY for the duration of
00072   // query_eventchannel () below has greater impact than desired.  For
00073   // example, while we are pinging ec here, a nested upcall,
00074   // which requires making remote calls may come into the ORB.  Those
00075   // remote calls will be carried out with with
00076   // RELATIVE_RT_TIMEOUT_POLICY set here in effect.
00077 
00078   // @@ TODO: should use Guard to set and reset policies.
00079   try
00080     {
00081       // Query the state of the Current object *before* we initiate
00082       // the iteration...
00083       CORBA::PolicyTypeSeq types;
00084       CORBA::PolicyList_var policies =
00085         this->policy_current_->get_policy_overrides (types);
00086 
00087       // Change the timeout
00088       this->policy_current_->set_policy_overrides (this->policy_list_,
00089                                                    CORBA::ADD_OVERRIDE);
00090 
00091       // Query the state of the consumers...
00092       this->query_eventchannel ();
00093 
00094       this->policy_current_->set_policy_overrides (policies.in (),
00095                                                    CORBA::SET_OVERRIDE);
00096       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00097         {
00098           policies[i]->destroy ();
00099         }
00100     }
00101   catch (const CORBA::Exception&)
00102     {
00103       // Ignore all exceptions
00104     }
00105 }
00106 
00107 int
00108 TAO_ECG_Reactive_ConsumerEC_Control::activate (void)
00109 {
00110 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00111   try
00112     {
00113       // Get the PolicyCurrent object
00114       CORBA::Object_var tmp =
00115         this->orb_->resolve_initial_references ("PolicyCurrent");
00116 
00117       this->policy_current_ =
00118         CORBA::PolicyCurrent::_narrow (tmp.in ());
00119 
00120       // Timeout for polling state (default = 10 msec)
00121       TimeBase::TimeT timeout = timeout_.usec() * 10;
00122       CORBA::Any any;
00123       any <<= timeout;
00124 
00125       this->policy_list_.length (1);
00126       this->policy_list_[0] =
00127         this->orb_->create_policy (
00128                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00129                any);
00130 
00131       // Only schedule the timer, when the rate is not zero
00132       if (this->rate_ != ACE_Time_Value::zero)
00133       {
00134         // Schedule the timer after these policies has been set, because the
00135         // handle_timeout uses these policies, if done in front, the channel
00136         // can crash when the timeout expires before initiazation is ready.
00137         timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00138                                                     0,
00139                                                     this->rate_,
00140                                                     this->rate_);
00141         if (timer_id_ == -1)
00142           return -1;
00143       }
00144     }
00145   catch (const CORBA::Exception&)
00146     {
00147       return -1;
00148     }
00149 #endif /* TAO_HAS_CORBA_MESSAGING */
00150 
00151   return 0;
00152 }
00153 
00154 int
00155 TAO_ECG_Reactive_ConsumerEC_Control::shutdown (void)
00156 {
00157   int r = 0;
00158 
00159 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00160   r = this->reactor_->cancel_timer (timer_id_);
00161 #endif /* TAO_HAS_CORBA_MESSAGING */
00162 
00163   this->adapter_.reactor (0);
00164   return r;
00165 }
00166 
00167 void
00168 TAO_ECG_Reactive_ConsumerEC_Control::event_channel_not_exist (
00169       TAO_EC_Gateway_IIOP* gateway)
00170 {
00171   try
00172     {
00173       ACE_DEBUG ((LM_DEBUG,
00174                   "EC_Reactive_ConsumerControl(%P|%t) - "
00175                   "channel %x does not exists\n"));
00176       gateway->cleanup_consumer_ec ();
00177 
00178       gateway->cleanup_consumer_proxies ();
00179 
00180     }
00181   catch (const CORBA::Exception& ex)
00182     {
00183       ex._tao_print_exception (
00184         "TAO_EC_Reactive_ConsumerControl::event_channel_not_exist");
00185       // Ignore all exceptions..
00186     }
00187 }
00188 
00189 void
00190 TAO_ECG_Reactive_ConsumerEC_Control::system_exception (
00191       TAO_EC_Gateway_IIOP* gateway,
00192       CORBA::SystemException & /* exception */)
00193 {
00194   try
00195     {
00196       gateway->cleanup_consumer_ec ();
00197 
00198       gateway->cleanup_consumer_proxies ();
00199 
00200     }
00201   catch (const CORBA::Exception&)
00202     {
00203       // Ignore all exceptions..
00204     }
00205 }
00206 
00207 // ****************************************************************
00208 
00209 TAO_ECG_Reactive_ConsumerEC_Control_Adapter::TAO_ECG_Reactive_ConsumerEC_Control_Adapter (
00210       TAO_ECG_Reactive_ConsumerEC_Control *adaptee)
00211   :  adaptee_ (adaptee)
00212 {
00213 }
00214 
00215 int
00216 TAO_ECG_Reactive_ConsumerEC_Control_Adapter::handle_timeout (
00217       const ACE_Time_Value &tv,
00218       const void *arg)
00219 {
00220   this->adaptee_->handle_timeout (tv, arg);
00221   return 0;
00222 }
00223 
00224 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:44:06 2010 for TAO_RTEvent by  doxygen 1.4.7