EC_Reactive_ConsumerControl.cpp

Go to the documentation of this file.
00001 // EC_Reactive_ConsumerControl.cpp,v 1.25 2006/03/14 06:14:25 jtc Exp
00002 
00003 #include "orbsvcs/Event/EC_ConsumerAdmin.h"
00004 #include "orbsvcs/Event/EC_Reactive_ConsumerControl.h"
00005 #include "orbsvcs/Event/EC_Event_Channel_Base.h"
00006 #include "orbsvcs/Event/EC_ProxySupplier.h"
00007 #include "orbsvcs/Event/EC_ProxyConsumer.h" // @@ MSVC 6 bug
00008 
00009 #include "tao/Messaging/Messaging.h"
00010 #include "tao/ORB_Core.h"
00011 
00012 #include "ace/Reactor.h"
00013 
00014 #if ! defined (__ACE_INLINE__)
00015 #include "orbsvcs/Event/EC_Reactive_ConsumerControl.i"
00016 #endif /* __ACE_INLINE__ */
00017 
00018 ACE_RCSID(Event, EC_Reactive_ConsumerControl, "EC_Reactive_ConsumerControl.cpp,v 1.25 2006/03/14 06:14:25 jtc Exp")
00019 
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021 
00022 TAO_EC_Reactive_ConsumerControl::
00023      TAO_EC_Reactive_ConsumerControl (const ACE_Time_Value &rate,
00024                                       const ACE_Time_Value &timeout,
00025                                       TAO_EC_Event_Channel_Base *ec,
00026                                       CORBA::ORB_ptr orb)
00027   : rate_ (rate),
00028     timeout_ (timeout),
00029     adapter_ (this),
00030     event_channel_ (ec),
00031     orb_ (CORBA::ORB::_duplicate (orb))
00032 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00033     , timer_id_ (-1)
00034 #endif /* TAO_HAS_CORBA_MESSAGING != 0*/
00035 {
00036   this->reactor_ =
00037     this->orb_->orb_core ()->reactor ();
00038 }
00039 
00040 TAO_EC_Reactive_ConsumerControl::~TAO_EC_Reactive_ConsumerControl (void)
00041 {
00042 }
00043 
00044 void
00045 TAO_EC_Reactive_ConsumerControl::query_consumers (
00046       ACE_ENV_SINGLE_ARG_DECL)
00047 {
00048   TAO_EC_Ping_Consumer worker (this);
00049   this->event_channel_->for_each_consumer (&worker
00050                                            ACE_ENV_ARG_PARAMETER);
00051   ACE_CHECK;
00052 }
00053 
00054 void
00055 TAO_EC_Reactive_ConsumerControl::handle_timeout (
00056       const ACE_Time_Value &,
00057       const void *)
00058 {
00059   // NOTE, setting RELATIVE_RT_TIMEOUT_POLICY for the duration of
00060   // query_consumers () below has greater impact than desired.  For
00061   // example, while we are pinging consumers here, a nested upcall,
00062   // which requires making remote calls may come into the ORB.  Those
00063   // remote calls will be carried out with with
00064   // RELATIVE_RT_TIMEOUT_POLICY set here in effect.
00065 
00066   // @@ TODO: should use Guard to set and reset policies.
00067   ACE_TRY_NEW_ENV
00068     {
00069       // Query the state of the Current object *before* we initiate
00070       // the iteration...
00071       CORBA::PolicyTypeSeq types;
00072       CORBA::PolicyList_var policies =
00073         this->policy_current_->get_policy_overrides (types
00074                                                       ACE_ENV_ARG_PARAMETER);
00075       ACE_TRY_CHECK;
00076 
00077       // Change the timeout
00078       this->policy_current_->set_policy_overrides (this->policy_list_,
00079                                                    CORBA::ADD_OVERRIDE
00080                                                     ACE_ENV_ARG_PARAMETER);
00081       ACE_TRY_CHECK;
00082 
00083       // Query the state of the consumers...
00084       this->query_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
00085       ACE_TRY_CHECK;
00086 
00087       this->policy_current_->set_policy_overrides (policies.in (),
00088                                                    CORBA::SET_OVERRIDE
00089                                                     ACE_ENV_ARG_PARAMETER);
00090       ACE_TRY_CHECK;
00091       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00092         {
00093           policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00094           ACE_TRY_CHECK;
00095         }
00096     }
00097   ACE_CATCHANY
00098     {
00099       // Ignore all exceptions
00100     }
00101   ACE_ENDTRY;
00102 }
00103 
00104 int
00105 TAO_EC_Reactive_ConsumerControl::activate (void)
00106 {
00107 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00108   ACE_TRY_NEW_ENV
00109     {
00110       // Get the PolicyCurrent object
00111       CORBA::Object_var tmp =
00112         this->orb_->resolve_initial_references ("PolicyCurrent"
00113                                                  ACE_ENV_ARG_PARAMETER);
00114       ACE_TRY_CHECK;
00115 
00116       this->policy_current_ =
00117         CORBA::PolicyCurrent::_narrow (tmp.in ()
00118                                         ACE_ENV_ARG_PARAMETER);
00119       ACE_TRY_CHECK;
00120 
00121       // Timeout for polling state (default = 10 msec)
00122       TimeBase::TimeT timeout = timeout_.usec() * 10;
00123       CORBA::Any any;
00124       any <<= timeout;
00125 
00126       this->policy_list_.length (1);
00127       this->policy_list_[0] =
00128         this->orb_->create_policy (
00129                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00130                any
00131                 ACE_ENV_ARG_PARAMETER);
00132       ACE_TRY_CHECK;
00133 
00134       // Only schedule the timer, when the rate is not zero
00135       if (this->rate_ != ACE_Time_Value::zero)
00136       {
00137         // Schedule the timer after these policies has been set, because the
00138         // handle_timeout uses these policies, if done in front, the channel
00139         // can crash when the timeout expires before initiazation is ready.
00140         timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00141                                                     0,
00142                                                     this->rate_,
00143                                                     this->rate_);
00144         if (timer_id_ == -1)
00145           return -1;
00146       }
00147     }
00148   ACE_CATCHANY
00149     {
00150       return -1;
00151     }
00152   ACE_ENDTRY;
00153 #endif /* TAO_HAS_CORBA_MESSAGING */
00154 
00155   return 0;
00156 }
00157 
00158 int
00159 TAO_EC_Reactive_ConsumerControl::shutdown (void)
00160 {
00161   int r = 0;
00162 
00163 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00164   r = this->reactor_->cancel_timer (timer_id_);
00165 #endif /* TAO_HAS_CORBA_MESSAGING */
00166   this->adapter_.reactor (0);
00167   return r;
00168 }
00169 
00170 void
00171 TAO_EC_Reactive_ConsumerControl::consumer_not_exist (
00172       TAO_EC_ProxyPushSupplier *proxy
00173       ACE_ENV_ARG_DECL)
00174 {
00175   ACE_TRY
00176     {
00177       //ACE_DEBUG ((LM_DEBUG,
00178       //            "EC_Reactive_ConsumerControl(%P|%t) - "
00179       //            "Consumer %x does not exists\n", long(proxy)));
00180       proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00181       ACE_TRY_CHECK;
00182     }
00183   ACE_CATCHANY
00184     {
00185       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00186                            "Reactive_ConsumerControl::consumer_not_exist");
00187       // Ignore all exceptions..
00188     }
00189   ACE_ENDTRY;
00190 }
00191 
00192 void
00193 TAO_EC_Reactive_ConsumerControl::system_exception (
00194       TAO_EC_ProxyPushSupplier *proxy,
00195       CORBA::SystemException & /* exception */
00196       ACE_ENV_ARG_DECL)
00197 {
00198   ACE_TRY
00199     {
00200       // The current implementation is very strict, and kicks out a
00201       // client on the first system exception. We may
00202       // want to be more lenient in the future, for example,
00203       // this is TAO's minor code for a failed connection.
00204       //
00205       // if (CORBA::TRANSIENT::_narrow (&exception) != 0
00206       //     && exception->minor () == 0x54410085)
00207       //   return;
00208 
00209       // Anything else is serious, including timeouts...
00210       proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00211       ACE_TRY_CHECK;
00212     }
00213   ACE_CATCHANY
00214     {
00215       // Ignore all exceptions..
00216     }
00217   ACE_ENDTRY;
00218 }
00219 
00220 // ****************************************************************
00221 
00222 TAO_EC_ConsumerControl_Adapter::TAO_EC_ConsumerControl_Adapter (
00223       TAO_EC_Reactive_ConsumerControl *adaptee)
00224   :  adaptee_ (adaptee)
00225 {
00226 }
00227 
00228 int
00229 TAO_EC_ConsumerControl_Adapter::handle_timeout (
00230       const ACE_Time_Value &tv,
00231       const void *arg)
00232 {
00233   this->adaptee_->handle_timeout (tv, arg);
00234   return 0;
00235 }
00236 
00237 // ****************************************************************
00238 
00239 void
00240 TAO_EC_Ping_Consumer::work (TAO_EC_ProxyPushSupplier *supplier
00241                             ACE_ENV_ARG_DECL)
00242 {
00243   ACE_TRY
00244     {
00245       CORBA::Boolean disconnected;
00246       CORBA::Boolean non_existent =
00247         supplier->consumer_non_existent (disconnected
00248                                           ACE_ENV_ARG_PARAMETER);
00249       ACE_TRY_CHECK;
00250       if (non_existent && !disconnected)
00251         {
00252           this->control_->consumer_not_exist (supplier ACE_ENV_ARG_PARAMETER);
00253           ACE_TRY_CHECK;
00254         }
00255     }
00256   ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00257     {
00258       this->control_->consumer_not_exist (supplier ACE_ENV_ARG_PARAMETER);
00259       ACE_TRY_CHECK;
00260     }
00261   ACE_CATCH (CORBA::TRANSIENT, transient)
00262     {
00263       // This is TAO's minor code for a failed connection, we may
00264       // want to be more lenient in the future..
00265       // if (transient.minor () == 0x54410085)
00266       this->control_->consumer_not_exist (supplier ACE_ENV_ARG_PARAMETER);
00267       ACE_TRY_CHECK;
00268     }
00269   ACE_CATCHANY
00270     {
00271       // Ignore all exceptions
00272     }
00273   ACE_ENDTRY;
00274 }
00275 
00276 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:11:09 2006 for TAO_RTEvent by doxygen 1.3.6