ECG_Reactive_ConsumerEC_Control.cpp

Go to the documentation of this file.
00001 // ECG_Reactive_ConsumerEC_Control.cpp,v 1.7 2006/03/14 06:14:25 jtc Exp
00002 
00003 #include "orbsvcs/Event/EC_Gateway_IIOP.h"
00004 #include "orbsvcs/Event/ECG_Reactive_ConsumerEC_Control.h"
00005 #include "tao/Messaging/Messaging.h"
00006 #include "tao/ORB_Core.h"
00007 
00008 #include "ace/Reactor.h"
00009 
00010 ACE_RCSID(Event, ECG_Reactive_ConsumerEventChannelControl, "ECG_Reactive_ConsumerEC_Control.cpp,v 1.7 2006/03/14 06:14:25 jtc Exp")
00011 
00012 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00013 
00014 TAO_ECG_Reactive_ConsumerEC_Control::
00015      TAO_ECG_Reactive_ConsumerEC_Control (const ACE_Time_Value &rate,
00016                                           const ACE_Time_Value &timeout,
00017                                           TAO_EC_Gateway_IIOP* gateway,
00018                                           CORBA::ORB_ptr orb)
00019   : rate_ (rate),
00020     timeout_ (timeout),
00021     adapter_ (this),
00022     gateway_ (gateway),
00023     orb_ (CORBA::ORB::_duplicate (orb))
00024 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00025     , timer_id_ (-1)
00026 #endif /* TAO_HAS_CORBA_MESSAGING != 0*/
00027 {
00028   this->reactor_ =
00029     this->orb_->orb_core ()->reactor ();
00030 }
00031 
00032 TAO_ECG_Reactive_ConsumerEC_Control::~TAO_ECG_Reactive_ConsumerEC_Control (void)
00033 {
00034 }
00035 
00036 void
00037 TAO_ECG_Reactive_ConsumerEC_Control::query_eventchannel (
00038       ACE_ENV_SINGLE_ARG_DECL)
00039 {
00040   ACE_TRY
00041     {
00042       CORBA::Boolean disconnected;
00043       CORBA::Boolean non_existent =
00044         gateway_->consumer_ec_non_existent (disconnected
00045                                             ACE_ENV_ARG_PARAMETER);
00046       ACE_TRY_CHECK;
00047       if (non_existent && !disconnected)
00048         {
00049           this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER);
00050           ACE_TRY_CHECK;
00051         }
00052     }
00053   ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00054     {
00055       this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER);
00056       ACE_TRY_CHECK;
00057     }
00058   ACE_CATCH (CORBA::TRANSIENT, transient)
00059     {
00060       // This is TAO's minor code for a failed connection, we may
00061       // want to be more lenient in the future..
00062       // if (transient.minor () == 0x54410085)
00063       this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER);
00064       ACE_TRY_CHECK;
00065     }
00066   ACE_CATCHANY
00067     {
00068       // Ignore all exceptions
00069     }
00070   ACE_ENDTRY;
00071 }
00072 
00073 void
00074 TAO_ECG_Reactive_ConsumerEC_Control::handle_timeout (
00075       const ACE_Time_Value &,
00076       const void *)
00077 {
00078   // NOTE, setting RELATIVE_RT_TIMEOUT_POLICY for the duration of
00079   // query_eventchannel () below has greater impact than desired.  For
00080   // example, while we are pinging ec here, a nested upcall,
00081   // which requires making remote calls may come into the ORB.  Those
00082   // remote calls will be carried out with with
00083   // RELATIVE_RT_TIMEOUT_POLICY set here in effect.
00084 
00085   // @@ TODO: should use Guard to set and reset policies.
00086   ACE_TRY_NEW_ENV
00087     {
00088       // Query the state of the Current object *before* we initiate
00089       // the iteration...
00090       CORBA::PolicyTypeSeq types;
00091       CORBA::PolicyList_var policies =
00092         this->policy_current_->get_policy_overrides (types
00093                                                       ACE_ENV_ARG_PARAMETER);
00094       ACE_TRY_CHECK;
00095 
00096       // Change the timeout
00097       this->policy_current_->set_policy_overrides (this->policy_list_,
00098                                                    CORBA::ADD_OVERRIDE
00099                                                     ACE_ENV_ARG_PARAMETER);
00100       ACE_TRY_CHECK;
00101 
00102       // Query the state of the consumers...
00103       this->query_eventchannel (ACE_ENV_SINGLE_ARG_PARAMETER);
00104       ACE_TRY_CHECK;
00105 
00106       this->policy_current_->set_policy_overrides (policies.in (),
00107                                                    CORBA::SET_OVERRIDE
00108                                                     ACE_ENV_ARG_PARAMETER);
00109       ACE_TRY_CHECK;
00110       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00111         {
00112           policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00113           ACE_TRY_CHECK;
00114         }
00115     }
00116   ACE_CATCHANY
00117     {
00118       // Ignore all exceptions
00119     }
00120   ACE_ENDTRY;
00121 }
00122 
00123 int
00124 TAO_ECG_Reactive_ConsumerEC_Control::activate (void)
00125 {
00126 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00127   ACE_TRY_NEW_ENV
00128     {
00129       // Get the PolicyCurrent object
00130       CORBA::Object_var tmp =
00131         this->orb_->resolve_initial_references ("PolicyCurrent"
00132                                                  ACE_ENV_ARG_PARAMETER);
00133       ACE_TRY_CHECK;
00134 
00135       this->policy_current_ =
00136         CORBA::PolicyCurrent::_narrow (tmp.in ()
00137                                         ACE_ENV_ARG_PARAMETER);
00138       ACE_TRY_CHECK;
00139 
00140       // Timeout for polling state (default = 10 msec)
00141       TimeBase::TimeT timeout = timeout_.usec() * 10;
00142       CORBA::Any any;
00143       any <<= timeout;
00144 
00145       this->policy_list_.length (1);
00146       this->policy_list_[0] =
00147         this->orb_->create_policy (
00148                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00149                any
00150                ACE_ENV_ARG_PARAMETER);
00151       ACE_TRY_CHECK;
00152 
00153       // Only schedule the timer, when the rate is not zero
00154       if (this->rate_ != ACE_Time_Value::zero)
00155       {
00156         // Schedule the timer after these policies has been set, because the
00157         // handle_timeout uses these policies, if done in front, the channel
00158         // can crash when the timeout expires before initiazation is ready.
00159         timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00160                                                     0,
00161                                                     this->rate_,
00162                                                     this->rate_);
00163         if (timer_id_ == -1)
00164           return -1;
00165       }
00166     }
00167   ACE_CATCHANY
00168     {
00169       return -1;
00170     }
00171   ACE_ENDTRY;
00172 #endif /* TAO_HAS_CORBA_MESSAGING */
00173 
00174   return 0;
00175 }
00176 
00177 int
00178 TAO_ECG_Reactive_ConsumerEC_Control::shutdown (void)
00179 {
00180   int r = 0;
00181 
00182 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00183   r = this->reactor_->cancel_timer (timer_id_);
00184 #endif /* TAO_HAS_CORBA_MESSAGING */
00185 
00186   this->adapter_.reactor (0);
00187   return r;
00188 }
00189 
00190 void
00191 TAO_ECG_Reactive_ConsumerEC_Control::event_channel_not_exist (
00192       TAO_EC_Gateway_IIOP* gateway
00193       ACE_ENV_ARG_DECL)
00194 {
00195   ACE_TRY
00196     {
00197       ACE_DEBUG ((LM_DEBUG,
00198                   "EC_Reactive_ConsumerControl(%P|%t) - "
00199                   "channel %x does not exists\n"));
00200       gateway->cleanup_consumer_ec ();
00201 
00202       gateway->cleanup_consumer_proxies (ACE_ENV_SINGLE_ARG_PARAMETER);
00203       ACE_TRY_CHECK;
00204 
00205     }
00206   ACE_CATCHANY
00207     {
00208       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00209                            "TAO_EC_Reactive_ConsumerControl::event_channel_not_exist");
00210       // Ignore all exceptions..
00211     }
00212   ACE_ENDTRY;
00213 }
00214 
00215 void
00216 TAO_ECG_Reactive_ConsumerEC_Control::system_exception (
00217       TAO_EC_Gateway_IIOP* gateway,
00218       CORBA::SystemException & /* exception */
00219       ACE_ENV_ARG_DECL)
00220 {
00221   ACE_TRY
00222     {
00223       gateway->cleanup_consumer_ec ();
00224 
00225       gateway->cleanup_consumer_proxies (ACE_ENV_SINGLE_ARG_PARAMETER);
00226       ACE_TRY_CHECK;
00227 
00228     }
00229   ACE_CATCHANY
00230     {
00231       // Ignore all exceptions..
00232     }
00233   ACE_ENDTRY;
00234 }
00235 
00236 // ****************************************************************
00237 
00238 TAO_ECG_Reactive_ConsumerEC_Control_Adapter::TAO_ECG_Reactive_ConsumerEC_Control_Adapter (
00239       TAO_ECG_Reactive_ConsumerEC_Control *adaptee)
00240   :  adaptee_ (adaptee)
00241 {
00242 }
00243 
00244 int
00245 TAO_ECG_Reactive_ConsumerEC_Control_Adapter::handle_timeout (
00246       const ACE_Time_Value &tv,
00247       const void *arg)
00248 {
00249   this->adaptee_->handle_timeout (tv, arg);
00250   return 0;
00251 }
00252 
00253 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:11:10 2006 for TAO_RTEvent by doxygen 1.3.6