ECG_Reconnect_ConsumerEC_Control.cpp

Go to the documentation of this file.
00001 // $Id: ECG_Reconnect_ConsumerEC_Control.cpp 76626 2007-01-26 13:50:03Z elliott_c $
00002 
00003 #include "orbsvcs/Event/EC_Gateway_IIOP.h"
00004 #include "orbsvcs/Event/ECG_Reconnect_ConsumerEC_Control.h"
00005 #include "tao/Messaging/Messaging.h"
00006 #include "tao/ORB_Core.h"
00007 #include "ace/Reactor.h"
00008 
00009 ACE_RCSID(Event, ECG_Reconnect_ConsumerEventChannelControl, "$Id: ECG_Reconnect_ConsumerEC_Control.cpp 76626 2007-01-26 13:50:03Z elliott_c $")
00010 
00011 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00012 
00013 TAO_ECG_Reconnect_ConsumerEC_Control::
00014      TAO_ECG_Reconnect_ConsumerEC_Control (const ACE_Time_Value &rate,
00015                                            const ACE_Time_Value &timeout,
00016                                            TAO_EC_Gateway_IIOP* gateway,
00017                                            CORBA::ORB_ptr orb)
00018   : rate_ (rate),
00019     timeout_ (timeout),
00020     adapter_ (this),
00021     gateway_ (gateway),
00022     orb_ (CORBA::ORB::_duplicate (orb)),
00023     is_consumer_ec_connected_ (1)
00024 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00025     , timer_id_ (-1)
00026 #endif /* TAO_HAS_CORBA_MESSAGING != 0*/
00027 {
00028   this->reactor_ =
00029     this->orb_->orb_core ()->reactor ();
00030 }
00031 
00032 TAO_ECG_Reconnect_ConsumerEC_Control::~TAO_ECG_Reconnect_ConsumerEC_Control (void)
00033 {
00034 }
00035 
00036 void
00037 TAO_ECG_Reconnect_ConsumerEC_Control::try_reconnect ()
00038 {
00039   try
00040     {
00041       CORBA::Boolean disconnected;
00042       CORBA::Boolean non_existent =
00043       gateway_->consumer_ec_non_existent (disconnected);
00044       if (!non_existent)
00045         {
00046           this->reconnect();
00047         }
00048     }
00049   catch (const CORBA::Exception&)
00050     {
00051       // Ignore all exceptions
00052     }
00053 }
00054 
00055 void
00056 TAO_ECG_Reconnect_ConsumerEC_Control::reconnect ()
00057 {
00058   try
00059     {
00060       is_consumer_ec_connected_ = 1;
00061 
00062       gateway_->reconnect_consumer_ec();
00063     }
00064   catch (const CORBA::Exception&)
00065     {
00066       // Ignore all exceptions
00067     }
00068 }
00069 
00070 void
00071 TAO_ECG_Reconnect_ConsumerEC_Control::query_eventchannel ()
00072 {
00073   try
00074     {
00075       if (is_consumer_ec_connected_ == 1)
00076         {
00077           CORBA::Boolean disconnected;
00078           CORBA::Boolean non_existent =
00079             gateway_->consumer_ec_non_existent (disconnected);
00080           if (non_existent && !disconnected)
00081             {
00082               this->event_channel_not_exist (gateway_);
00083             }
00084         }
00085       else
00086         {
00087            this->try_reconnect();
00088         }
00089     }
00090   catch (const CORBA::OBJECT_NOT_EXIST&)
00091     {
00092       this->event_channel_not_exist (gateway_);
00093     }
00094   catch (const CORBA::TRANSIENT&)
00095     {
00096       // This is TAO's minor code for a failed connection, we may
00097       // want to be more lenient in the future..
00098       // if (transient.minor () == 0x54410085)
00099       this->event_channel_not_exist (gateway_);
00100     }
00101   catch (const CORBA::Exception&)
00102     {
00103       // Ignore all exceptions
00104     }
00105 }
00106 
00107 void
00108 TAO_ECG_Reconnect_ConsumerEC_Control::handle_timeout (
00109       const ACE_Time_Value &,
00110       const void *)
00111 {
00112   // NOTE, setting RELATIVE_RT_TIMEOUT_POLICY for the duration of
00113   // query_eventchannel () below has greater impact than desired.  For
00114   // example, while we are pinging consumers here, a nested upcall,
00115   // which requires making remote calls may come into the ORB.  Those
00116   // remote calls will be carried out with with
00117   // RELATIVE_RT_TIMEOUT_POLICY set here in effect.
00118   // @@ TODO: should use Guard to set and reset policies.
00119   try
00120     {
00121       // Query the state of the Current object *before* we initiate
00122       // the iteration...
00123       CORBA::PolicyTypeSeq types;
00124       CORBA::PolicyList_var policies =
00125         this->policy_current_->get_policy_overrides (types);
00126 
00127       // Change the timeout
00128       this->policy_current_->set_policy_overrides (this->policy_list_,
00129                                                    CORBA::ADD_OVERRIDE);
00130 
00131       // Query the state of the consumers...
00132       this->query_eventchannel ();
00133 
00134       this->policy_current_->set_policy_overrides (policies.in (),
00135                                                    CORBA::SET_OVERRIDE);
00136       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00137         {
00138           policies[i]->destroy ();
00139         }
00140     }
00141   catch (const CORBA::Exception&)
00142     {
00143       // Ignore all exceptions
00144     }
00145 }
00146 
00147 int
00148 TAO_ECG_Reconnect_ConsumerEC_Control::activate (void)
00149 {
00150 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00151   try
00152     {
00153       // Get the PolicyCurrent object
00154       CORBA::Object_var tmp =
00155         this->orb_->resolve_initial_references ("PolicyCurrent");
00156 
00157       this->policy_current_ =
00158         CORBA::PolicyCurrent::_narrow (tmp.in ());
00159 
00160       // Timeout for polling state (default = 10 msec)
00161       TimeBase::TimeT timeout = timeout_.usec() * 10;
00162       CORBA::Any any;
00163       any <<= timeout;
00164 
00165       this->policy_list_.length (1);
00166       this->policy_list_[0] =
00167         this->orb_->create_policy (
00168                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00169                any);
00170 
00171       // Only schedule the timer, when the rate is not zero
00172       if (this->rate_ != ACE_Time_Value::zero)
00173       {
00174         // Schedule the timer after these policies has been set, because the
00175         // handle_timeout uses these policies, if done in front, the channel
00176         // can crash when the timeout expires before initiazation is ready.
00177         timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00178                                                     0,
00179                                                     this->rate_,
00180                                                     this->rate_);
00181         if (timer_id_ == -1)
00182           return -1;
00183       }
00184     }
00185   catch (const CORBA::Exception&)
00186     {
00187       return -1;
00188     }
00189 #endif /* TAO_HAS_CORBA_MESSAGING */
00190 
00191   return 0;
00192 }
00193 
00194 int
00195 TAO_ECG_Reconnect_ConsumerEC_Control::shutdown (void)
00196 {
00197   int r = 0;
00198 
00199 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00200   r = this->reactor_->cancel_timer (timer_id_);
00201 #endif /* TAO_HAS_CORBA_MESSAGING */
00202 
00203   this->adapter_.reactor (0);
00204   return r;
00205 }
00206 
00207 void
00208 TAO_ECG_Reconnect_ConsumerEC_Control::event_channel_not_exist (
00209       TAO_EC_Gateway_IIOP* gateway)
00210 {
00211   try
00212     {
00213       //ACE_DEBUG ((LM_DEBUG,
00214       //            "ECG_Reconnect_ConsumerControl(%P|%t) - "
00215       //            "channel %x does not exists\n"));
00216       is_consumer_ec_connected_ = 0;
00217 
00218       gateway->suspend_supplier_ec ();
00219 
00220       gateway->cleanup_consumer_proxies ();
00221     }
00222   catch (const CORBA::Exception& ex)
00223     {
00224       ex._tao_print_exception (
00225         "TAO_EC_Reconnect_ConsumerControl::event_channel_not_exist");
00226       // Ignore all exceptions..
00227     }
00228 }
00229 
00230 void
00231 TAO_ECG_Reconnect_ConsumerEC_Control::system_exception (
00232       TAO_EC_Gateway_IIOP* gateway,
00233       CORBA::SystemException & /* exception */)
00234 {
00235   try
00236     {
00237       // The current implementation is very strict, and kicks out a
00238       // client on the first system exception. We may
00239       // want to be more lenient in the future, for example,
00240       // this is TAO's minor code for a failed connection.
00241       //
00242       // if (CORBA::TRANSIENT::_narrow (&exception) != 0
00243       //     && exception->minor () == 0x54410085)
00244       //   return;
00245 
00246       // Anything else is serious, including timeouts...
00247       //ACE_DEBUG ((LM_DEBUG,
00248       //            "ECG_Reconnect_ConsumerControl(%P|%t) - "
00249       //            "channel %x does not exists system except\n"));
00250       is_consumer_ec_connected_ = 0;
00251 
00252       gateway->suspend_supplier_ec ();
00253 
00254       gateway->cleanup_consumer_proxies ();
00255     }
00256   catch (const CORBA::Exception&)
00257     {
00258       // Ignore all exceptions..
00259     }
00260 }
00261 
00262 // ****************************************************************
00263 
00264 TAO_ECG_Reconnect_ConsumerEC_Control_Adapter::TAO_ECG_Reconnect_ConsumerEC_Control_Adapter (
00265       TAO_ECG_Reconnect_ConsumerEC_Control *adaptee)
00266   :  adaptee_ (adaptee)
00267 {
00268 }
00269 
00270 int
00271 TAO_ECG_Reconnect_ConsumerEC_Control_Adapter::handle_timeout (
00272       const ACE_Time_Value &tv,
00273       const void *arg)
00274 {
00275   this->adaptee_->handle_timeout (tv, arg);
00276   return 0;
00277 }
00278 
00279 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:44:06 2010 for TAO_RTEvent by  doxygen 1.4.7