ECG_Reconnect_ConsumerEC_Control.cpp

Go to the documentation of this file.
00001 // ECG_Reconnect_ConsumerEC_Control.cpp,v 1.7 2006/03/14 06:14:25 jtc Exp
00002 
00003 #include "orbsvcs/Event/EC_Gateway_IIOP.h"
00004 #include "orbsvcs/Event/ECG_Reconnect_ConsumerEC_Control.h"
00005 #include "tao/Messaging/Messaging.h"
00006 #include "tao/ORB_Core.h"
00007 #include "ace/Reactor.h"
00008 
00009 ACE_RCSID(Event, ECG_Reconnect_ConsumerEventChannelControl, "ECG_Reconnect_ConsumerEC_Control.cpp,v 1.7 2006/03/14 06:14:25 jtc Exp")
00010 
00011 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00012 
00013 TAO_ECG_Reconnect_ConsumerEC_Control::
00014      TAO_ECG_Reconnect_ConsumerEC_Control (const ACE_Time_Value &rate,
00015                                            const ACE_Time_Value &timeout,
00016                                            TAO_EC_Gateway_IIOP* gateway,
00017                                            CORBA::ORB_ptr orb)
00018   : rate_ (rate),
00019     timeout_ (timeout),
00020     adapter_ (this),
00021     gateway_ (gateway),
00022     orb_ (CORBA::ORB::_duplicate (orb)),
00023     is_consumer_ec_connected_ (1)
00024 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00025     , timer_id_ (-1)
00026 #endif /* TAO_HAS_CORBA_MESSAGING != 0*/
00027 {
00028   this->reactor_ =
00029     this->orb_->orb_core ()->reactor ();
00030 }
00031 
00032 TAO_ECG_Reconnect_ConsumerEC_Control::~TAO_ECG_Reconnect_ConsumerEC_Control (void)
00033 {
00034 }
00035 
00036 void
00037 TAO_ECG_Reconnect_ConsumerEC_Control::try_reconnect (
00038       ACE_ENV_SINGLE_ARG_DECL)
00039 {
00040   ACE_TRY
00041     {
00042       CORBA::Boolean disconnected;
00043       CORBA::Boolean non_existent =
00044       gateway_->consumer_ec_non_existent (disconnected
00045                                            ACE_ENV_ARG_PARAMETER);
00046       ACE_TRY_CHECK;
00047       if (!non_existent)
00048         {
00049           this->reconnect(ACE_ENV_SINGLE_ARG_PARAMETER);
00050           ACE_TRY_CHECK;
00051         }
00052     }
00053   ACE_CATCHANY
00054     {
00055       // Ignore all exceptions
00056     }
00057   ACE_ENDTRY;
00058 }
00059 
00060 void
00061 TAO_ECG_Reconnect_ConsumerEC_Control::reconnect (
00062       ACE_ENV_SINGLE_ARG_DECL)
00063 {
00064   ACE_TRY
00065     {
00066       is_consumer_ec_connected_ = 1;
00067 
00068       gateway_->reconnect_consumer_ec(ACE_ENV_SINGLE_ARG_PARAMETER);
00069       ACE_TRY_CHECK;
00070     }
00071   ACE_CATCHANY
00072     {
00073       // Ignore all exceptions
00074     }
00075   ACE_ENDTRY;
00076 }
00077 
00078 void
00079 TAO_ECG_Reconnect_ConsumerEC_Control::query_eventchannel (
00080       ACE_ENV_SINGLE_ARG_DECL)
00081 {
00082   ACE_TRY
00083     {
00084       if (is_consumer_ec_connected_ == 1)
00085         {
00086           CORBA::Boolean disconnected;
00087           CORBA::Boolean non_existent =
00088             gateway_->consumer_ec_non_existent (disconnected
00089                                                 ACE_ENV_ARG_PARAMETER);
00090           ACE_TRY_CHECK;
00091           if (non_existent && !disconnected)
00092             {
00093               this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER);
00094               ACE_TRY_CHECK;
00095             }
00096         }
00097       else
00098         {
00099            this->try_reconnect(ACE_ENV_SINGLE_ARG_PARAMETER);
00100            ACE_TRY_CHECK;
00101         }
00102     }
00103   ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00104     {
00105       this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER);
00106       ACE_TRY_CHECK;
00107     }
00108   ACE_CATCH (CORBA::TRANSIENT, transient)
00109     {
00110       // This is TAO's minor code for a failed connection, we may
00111       // want to be more lenient in the future..
00112       // if (transient.minor () == 0x54410085)
00113       this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER);
00114       ACE_TRY_CHECK;
00115     }
00116   ACE_CATCHANY
00117     {
00118       // Ignore all exceptions
00119     }
00120   ACE_ENDTRY;
00121 }
00122 
00123 void
00124 TAO_ECG_Reconnect_ConsumerEC_Control::handle_timeout (
00125       const ACE_Time_Value &,
00126       const void *)
00127 {
00128   // NOTE, setting RELATIVE_RT_TIMEOUT_POLICY for the duration of
00129   // query_eventchannel () below has greater impact than desired.  For
00130   // example, while we are pinging consumers here, a nested upcall,
00131   // which requires making remote calls may come into the ORB.  Those
00132   // remote calls will be carried out with with
00133   // RELATIVE_RT_TIMEOUT_POLICY set here in effect.
00134   // @@ TODO: should use Guard to set and reset policies.
00135   ACE_TRY_NEW_ENV
00136     {
00137       // Query the state of the Current object *before* we initiate
00138       // the iteration...
00139       CORBA::PolicyTypeSeq types;
00140       CORBA::PolicyList_var policies =
00141         this->policy_current_->get_policy_overrides (types
00142                                                       ACE_ENV_ARG_PARAMETER);
00143       ACE_TRY_CHECK;
00144 
00145       // Change the timeout
00146       this->policy_current_->set_policy_overrides (this->policy_list_,
00147                                                    CORBA::ADD_OVERRIDE
00148                                                     ACE_ENV_ARG_PARAMETER);
00149       ACE_TRY_CHECK;
00150 
00151       // Query the state of the consumers...
00152       this->query_eventchannel (ACE_ENV_SINGLE_ARG_PARAMETER);
00153       ACE_TRY_CHECK;
00154 
00155       this->policy_current_->set_policy_overrides (policies.in (),
00156                                                    CORBA::SET_OVERRIDE
00157                                                     ACE_ENV_ARG_PARAMETER);
00158       ACE_TRY_CHECK;
00159       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00160         {
00161           policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00162           ACE_TRY_CHECK;
00163         }
00164     }
00165   ACE_CATCHANY
00166     {
00167       // Ignore all exceptions
00168     }
00169   ACE_ENDTRY;
00170 }
00171 
00172 int
00173 TAO_ECG_Reconnect_ConsumerEC_Control::activate (void)
00174 {
00175 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00176   ACE_TRY_NEW_ENV
00177     {
00178       // Get the PolicyCurrent object
00179       CORBA::Object_var tmp =
00180         this->orb_->resolve_initial_references ("PolicyCurrent"
00181                                                  ACE_ENV_ARG_PARAMETER);
00182       ACE_TRY_CHECK;
00183 
00184       this->policy_current_ =
00185         CORBA::PolicyCurrent::_narrow (tmp.in ()
00186                                         ACE_ENV_ARG_PARAMETER);
00187       ACE_TRY_CHECK;
00188 
00189       // Timeout for polling state (default = 10 msec)
00190       TimeBase::TimeT timeout = timeout_.usec() * 10;
00191       CORBA::Any any;
00192       any <<= timeout;
00193 
00194       this->policy_list_.length (1);
00195       this->policy_list_[0] =
00196         this->orb_->create_policy (
00197                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00198                any
00199                ACE_ENV_ARG_PARAMETER);
00200       ACE_TRY_CHECK;
00201 
00202       // Only schedule the timer, when the rate is not zero
00203       if (this->rate_ != ACE_Time_Value::zero)
00204       {
00205         // Schedule the timer after these policies has been set, because the
00206         // handle_timeout uses these policies, if done in front, the channel
00207         // can crash when the timeout expires before initiazation is ready.
00208         timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00209                                                     0,
00210                                                     this->rate_,
00211                                                     this->rate_);
00212         if (timer_id_ == -1)
00213           return -1;
00214       }
00215     }
00216   ACE_CATCHANY
00217     {
00218       return -1;
00219     }
00220   ACE_ENDTRY;
00221 #endif /* TAO_HAS_CORBA_MESSAGING */
00222 
00223   return 0;
00224 }
00225 
00226 int
00227 TAO_ECG_Reconnect_ConsumerEC_Control::shutdown (void)
00228 {
00229   int r = 0;
00230 
00231 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00232   r = this->reactor_->cancel_timer (timer_id_);
00233 #endif /* TAO_HAS_CORBA_MESSAGING */
00234 
00235   this->adapter_.reactor (0);
00236   return r;
00237 }
00238 
00239 void
00240 TAO_ECG_Reconnect_ConsumerEC_Control::event_channel_not_exist (
00241       TAO_EC_Gateway_IIOP* gateway
00242       ACE_ENV_ARG_DECL)
00243 {
00244   ACE_TRY
00245     {
00246       //ACE_DEBUG ((LM_DEBUG,
00247       //            "ECG_Reconnect_ConsumerControl(%P|%t) - "
00248       //            "channel %x does not exists\n"));
00249       is_consumer_ec_connected_ = 0;
00250 
00251       gateway->suspend_supplier_ec (ACE_ENV_SINGLE_ARG_PARAMETER);
00252       ACE_TRY_CHECK;
00253 
00254       gateway->cleanup_consumer_proxies (ACE_ENV_SINGLE_ARG_PARAMETER);
00255       ACE_TRY_CHECK;
00256     }
00257   ACE_CATCHANY
00258     {
00259       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00260                            "TAO_EC_Reconnect_ConsumerControl::event_channel_not_exist");
00261       // Ignore all exceptions..
00262     }
00263   ACE_ENDTRY;
00264 }
00265 
00266 void
00267 TAO_ECG_Reconnect_ConsumerEC_Control::system_exception (
00268       TAO_EC_Gateway_IIOP* gateway,
00269       CORBA::SystemException & /* exception */
00270       ACE_ENV_ARG_DECL)
00271 {
00272   ACE_TRY
00273     {
00274       // The current implementation is very strict, and kicks out a
00275       // client on the first system exception. We may
00276       // want to be more lenient in the future, for example,
00277       // this is TAO's minor code for a failed connection.
00278       //
00279       // if (CORBA::TRANSIENT::_narrow (&exception) != 0
00280       //     && exception->minor () == 0x54410085)
00281       //   return;
00282 
00283       // Anything else is serious, including timeouts...
00284       //ACE_DEBUG ((LM_DEBUG,
00285       //            "ECG_Reconnect_ConsumerControl(%P|%t) - "
00286       //            "channel %x does not exists system except\n"));
00287       is_consumer_ec_connected_ = 0;
00288 
00289       gateway->suspend_supplier_ec (ACE_ENV_SINGLE_ARG_PARAMETER);
00290       ACE_TRY_CHECK;
00291 
00292       gateway->cleanup_consumer_proxies (ACE_ENV_SINGLE_ARG_PARAMETER);
00293       ACE_TRY_CHECK;
00294     }
00295   ACE_CATCHANY
00296     {
00297       // Ignore all exceptions..
00298     }
00299   ACE_ENDTRY;
00300 }
00301 
00302 // ****************************************************************
00303 
00304 TAO_ECG_Reconnect_ConsumerEC_Control_Adapter::TAO_ECG_Reconnect_ConsumerEC_Control_Adapter (
00305       TAO_ECG_Reconnect_ConsumerEC_Control *adaptee)
00306   :  adaptee_ (adaptee)
00307 {
00308 }
00309 
00310 int
00311 TAO_ECG_Reconnect_ConsumerEC_Control_Adapter::handle_timeout (
00312       const ACE_Time_Value &tv,
00313       const void *arg)
00314 {
00315   this->adaptee_->handle_timeout (tv, arg);
00316   return 0;
00317 }
00318 
00319 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:11:10 2006 for TAO_RTEvent by doxygen 1.3.6