EC_Reactive_SupplierControl.cpp

Go to the documentation of this file.
00001 // EC_Reactive_SupplierControl.cpp,v 1.26 2006/03/14 06:14:25 jtc Exp
00002 
00003 #include "orbsvcs/Event/EC_SupplierAdmin.h"
00004 #include "orbsvcs/Event/EC_Reactive_SupplierControl.h"
00005 #include "orbsvcs/Event/EC_Event_Channel_Base.h"
00006 #include "orbsvcs/Event/EC_ProxyConsumer.h"
00007 #include "orbsvcs/Event/EC_ProxySupplier.h" // @@ MSVC 6 bug
00008 
00009 #include "tao/Messaging/Messaging.h"
00010 #include "tao/ORB_Core.h"
00011 
00012 #include "ace/Reactor.h"
00013 
00014 #if ! defined (__ACE_INLINE__)
00015 #include "orbsvcs/Event/EC_Reactive_SupplierControl.i"
00016 #endif /* __ACE_INLINE__ */
00017 
00018 ACE_RCSID(Event, EC_Reactive_SupplierControl, "EC_Reactive_SupplierControl.cpp,v 1.26 2006/03/14 06:14:25 jtc Exp")
00019 
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021 
00022 TAO_EC_Reactive_SupplierControl::
00023      TAO_EC_Reactive_SupplierControl (const ACE_Time_Value &rate,
00024                                       const ACE_Time_Value &timeout,
00025                                       TAO_EC_Event_Channel_Base *ec,
00026                                       CORBA::ORB_ptr orb)
00027   : rate_ (rate),
00028     timeout_ (timeout),
00029     adapter_ (this),
00030     event_channel_ (ec),
00031     orb_ (CORBA::ORB::_duplicate (orb))
00032 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00033     , timer_id_ (-1)
00034 #endif /* TAO_HAS_CORBA_MESSAGING != 0*/
00035 {
00036   this->reactor_ =
00037     this->orb_->orb_core ()->reactor ();
00038 }
00039 
00040 TAO_EC_Reactive_SupplierControl::~TAO_EC_Reactive_SupplierControl (void)
00041 {
00042 }
00043 
00044 void
00045 TAO_EC_Reactive_SupplierControl::query_suppliers (
00046       ACE_ENV_SINGLE_ARG_DECL)
00047 {
00048   TAO_EC_Ping_Supplier worker (this);
00049   this->event_channel_->for_each_supplier (&worker
00050                                            ACE_ENV_ARG_PARAMETER);
00051   ACE_CHECK;
00052 }
00053 
00054 void
00055 TAO_EC_Reactive_SupplierControl::handle_timeout (
00056       const ACE_Time_Value &,
00057       const void *)
00058 {
00059   ACE_TRY_NEW_ENV
00060     {
00061       // Query the state of the Current object *before* we initiate
00062       // the iteration...
00063       CORBA::PolicyTypeSeq types;
00064       CORBA::PolicyList_var policies =
00065         this->policy_current_->get_policy_overrides (types
00066                                                       ACE_ENV_ARG_PARAMETER);
00067       ACE_TRY_CHECK;
00068 
00069       // Change the timeout
00070       this->policy_current_->set_policy_overrides (this->policy_list_,
00071                                                    CORBA::ADD_OVERRIDE
00072                                                     ACE_ENV_ARG_PARAMETER);
00073       ACE_TRY_CHECK;
00074 
00075       // Query the state of the suppliers...
00076       this->query_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
00077       ACE_TRY_CHECK;
00078 
00079       this->policy_current_->set_policy_overrides (policies.in (),
00080                                                    CORBA::SET_OVERRIDE
00081                                                     ACE_ENV_ARG_PARAMETER);
00082       ACE_TRY_CHECK;
00083       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00084         {
00085           policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00086           ACE_TRY_CHECK;
00087         }
00088     }
00089   ACE_CATCHANY
00090     {
00091       // Ignore all exceptions
00092     }
00093   ACE_ENDTRY;
00094 }
00095 
00096 int
00097 TAO_EC_Reactive_SupplierControl::activate (void)
00098 {
00099 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00100   ACE_TRY_NEW_ENV
00101     {
00102       // Get the PolicyCurrent object
00103       CORBA::Object_var tmp =
00104         this->orb_->resolve_initial_references ("PolicyCurrent"
00105                                                  ACE_ENV_ARG_PARAMETER);
00106       ACE_TRY_CHECK;
00107 
00108       this->policy_current_ =
00109         CORBA::PolicyCurrent::_narrow (tmp.in ()
00110                                         ACE_ENV_ARG_PARAMETER);
00111       ACE_TRY_CHECK;
00112 
00113       // Timeout for polling state (default = 10 msec)
00114       TimeBase::TimeT timeout = timeout_.usec() * 10;
00115       CORBA::Any any;
00116       any <<= timeout;
00117 
00118       this->policy_list_.length (1);
00119       this->policy_list_[0] =
00120         this->orb_->create_policy (
00121                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00122                any
00123                ACE_ENV_ARG_PARAMETER);
00124       ACE_TRY_CHECK;
00125 
00126       // Only schedule the timer, when the rate is not zero
00127       if (this->rate_ != ACE_Time_Value::zero)
00128       {
00129         // Schedule the timer after these policies has been set, because the
00130         // handle_timeout uses these policies, if done in front, the channel
00131         // can crash when the timeout expires before initiazation is ready.
00132         timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00133                                                     0,
00134                                                     this->rate_,
00135                                                     this->rate_);
00136         if (timer_id_ == -1)
00137           return -1;
00138       }
00139     }
00140   ACE_CATCHANY
00141     {
00142       return -1;
00143     }
00144   ACE_ENDTRY;
00145 #endif /* TAO_HAS_CORBA_MESSAGING */
00146 
00147   return 0;
00148 }
00149 
00150 int
00151 TAO_EC_Reactive_SupplierControl::shutdown (void)
00152 {
00153   int r = 0;
00154 
00155 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00156   r = this->reactor_->cancel_timer (timer_id_);
00157 #endif /* TAO_HAS_CORBA_MESSAGING */
00158   this->adapter_.reactor (0);
00159   return r;
00160 }
00161 
00162 void
00163 TAO_EC_Reactive_SupplierControl::supplier_not_exist (
00164       TAO_EC_ProxyPushConsumer *proxy
00165       ACE_ENV_ARG_DECL)
00166 {
00167   ACE_TRY
00168     {
00169       //ACE_DEBUG ((LM_DEBUG,
00170       //            "EC_Reactive_SupplierControl(%P|%t) - "
00171       //            "Consumer %x does not exists\n", long(proxy)));
00172       proxy->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00173       ACE_TRY_CHECK;
00174     }
00175   ACE_CATCHANY
00176     {
00177       // Ignore all exceptions..
00178     }
00179   ACE_ENDTRY;
00180 }
00181 
00182 void
00183 TAO_EC_Reactive_SupplierControl::system_exception (
00184       TAO_EC_ProxyPushConsumer *proxy,
00185       CORBA::SystemException & /* exception */
00186       ACE_ENV_ARG_DECL)
00187 {
00188   ACE_TRY
00189     {
00190       // The current implementation is very strict, and kicks out a
00191       // client on the first system exception. We may
00192       // want to be more lenient in the future, for example,
00193       // this is TAO's minor code for a failed connection.
00194       //
00195       // if (CORBA::TRANSIENT::_narrow (&exception) != 0
00196       //     && exception->minor () == 0x54410085)
00197       //   return;
00198 
00199       // Anything else is serious, including timeouts...
00200       proxy->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00201       ACE_TRY_CHECK;
00202     }
00203   ACE_CATCHANY
00204     {
00205       // Ignore all exceptions..
00206     }
00207   ACE_ENDTRY;
00208 }
00209 
00210 // ****************************************************************
00211 
00212 TAO_EC_SupplierControl_Adapter::TAO_EC_SupplierControl_Adapter (
00213       TAO_EC_Reactive_SupplierControl *adaptee)
00214   :  adaptee_ (adaptee)
00215 {
00216 }
00217 
00218 int
00219 TAO_EC_SupplierControl_Adapter::handle_timeout (
00220       const ACE_Time_Value &tv,
00221       const void *arg)
00222 {
00223   this->adaptee_->handle_timeout (tv, arg);
00224   return 0;
00225 }
00226 
00227 // ****************************************************************
00228 
00229 void
00230 TAO_EC_Ping_Supplier::work (TAO_EC_ProxyPushConsumer *consumer
00231                             ACE_ENV_ARG_DECL)
00232 {
00233   ACE_TRY
00234     {
00235       CORBA::Boolean disconnected;
00236       CORBA::Boolean non_existent =
00237         consumer->supplier_non_existent (disconnected
00238                                           ACE_ENV_ARG_PARAMETER);
00239       ACE_TRY_CHECK;
00240       if (non_existent && !disconnected)
00241         {
00242           this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00243           ACE_TRY_CHECK;
00244         }
00245     }
00246   ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00247     {
00248       this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00249       ACE_TRY_CHECK;
00250     }
00251   ACE_CATCH (CORBA::TRANSIENT, transient)
00252     {
00253       // This is TAO's minor code for a failed connection, we may
00254       // want to be more lenient in the future..
00255       // if (transient.minor () == 0x54410085)
00256       this->control_->supplier_not_exist (consumer ACE_ENV_ARG_PARAMETER);
00257       ACE_TRY_CHECK;
00258     }
00259   ACE_CATCHANY
00260     {
00261       // Ignore all exceptions
00262     }
00263   ACE_ENDTRY;
00264 }
00265 
00266 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:11:09 2006 for TAO_RTEvent by doxygen 1.3.6