TAO_CEC_Reactive_SupplierControl Class Reference

SupplierControl. More...

#include <CEC_Reactive_SupplierControl.h>

Inheritance diagram for TAO_CEC_Reactive_SupplierControl:

Inheritance graph
[legend]
Collaboration diagram for TAO_CEC_Reactive_SupplierControl:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TAO_CEC_Reactive_SupplierControl (const ACE_Time_Value &rate, const ACE_Time_Value &timeout, unsigned int retries, TAO_CEC_EventChannel *event_channel, CORBA::ORB_ptr orb)
virtual ~TAO_CEC_Reactive_SupplierControl (void)
 destructor...
void handle_timeout (const ACE_Time_Value &tv, const void *arg)
 Receive the timeout from the adapter.
virtual int activate (void)
virtual int shutdown (void)
virtual void supplier_not_exist (TAO_CEC_ProxyPushConsumer *proxy)
virtual void supplier_not_exist (TAO_CEC_ProxyPullConsumer *proxy)
virtual void system_exception (TAO_CEC_ProxyPullConsumer *proxy, CORBA::SystemException &)
 Some system exception was rasied while trying to push an event.
virtual bool need_to_disconnect (PortableServer::ServantBase *proxy)
virtual void successful_transmission (PortableServer::ServantBase *proxy)
 Allow others to inform us when a send or receive was successful.

Private Member Functions

void query_suppliers (void)

Private Attributes

ACE_Time_Value rate_
 The polling rate.
ACE_Time_Value timeout_
 The polling timeout.
unsigned int retries_
 The number of retries per proxy until it is disconnected.
TAO_CEC_SupplierControl_Adapter adapter_
 The Adapter for the reactor events.
TAO_CEC_EventChannelevent_channel_
 The event channel.
CORBA::ORB_var orb_
 The ORB.
CORBA::PolicyCurrent_var policy_current_
 To control the timeout policy in the thread.
CORBA::PolicyList policy_list_
 Precomputed policy list to the set timeout.
ACE_Reactorreactor_
 The ORB reactor.
long timer_id_
 The timer id.

Detailed Description

SupplierControl.

Defines the interface for the supplier control strategy. This strategy handles misbehaving or failing suppliers. = MEMORY MANAGMENT = LOCKING = TODO

Definition at line 79 of file CEC_Reactive_SupplierControl.h.


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_CEC_Reactive_SupplierControl::TAO_CEC_Reactive_SupplierControl ( const ACE_Time_Value rate,
const ACE_Time_Value timeout,
unsigned int  retries,
TAO_CEC_EventChannel event_channel,
CORBA::ORB_ptr  orb 
)

Constructor. It does not assume ownership of the <event_channel> parameter.

Definition at line 42 of file CEC_Reactive_SupplierControl.cpp.

References orb_.

00047   : rate_ (rate),
00048     timeout_ (timeout),
00049     retries_ (retries),
00050     adapter_ (this),
00051     event_channel_ (ec),
00052 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00053     typed_event_channel_ (0),
00054 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00055     orb_ (CORBA::ORB::_duplicate (orb))
00056 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00057    // Initialise timer_id_ to an invalid timer id, so that in case we don't
00058    // schedule a timer, we don't cancel a random timer at shutdown
00059    , timer_id_ (-1)
00060 #endif /* TAO_HAS_CORBA_MESSAGING */
00061 {
00062   this->reactor_ =
00063     this->orb_->orb_core ()->reactor ();
00064 }

TAO_CEC_Reactive_SupplierControl::~TAO_CEC_Reactive_SupplierControl ( void   )  [virtual]

destructor...

Definition at line 92 of file CEC_Reactive_SupplierControl.cpp.

00093 {
00094 }


Member Function Documentation

int TAO_CEC_Reactive_SupplierControl::activate ( void   )  [virtual]

Activate any internal threads or timers used to poll the state of the suppliers

Reimplemented from TAO_CEC_SupplierControl.

Definition at line 241 of file CEC_Reactive_SupplierControl.cpp.

References TAO_Pseudo_Var_T< T >::in(), orb_, policy_current_, policy_list_, reactor_, ACE_Reactor::schedule_timer(), timer_id_, and ACE_Time_Value::zero.

00242 {
00243 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00244   try
00245     {
00246       // Get the PolicyCurrent object
00247       CORBA::Object_var tmp =
00248         this->orb_->resolve_initial_references ("PolicyCurrent");
00249 
00250       this->policy_current_ =
00251         CORBA::PolicyCurrent::_narrow (tmp.in ());
00252 
00253       // Pre-compute the policy list to the set the right timeout
00254       // value...
00255       // We need to convert the relative timeout into 100's of nano seconds.
00256       TimeBase::TimeT timeout;
00257       ORBSVCS_Time::Time_Value_to_TimeT (timeout,
00258                                          this->timeout_);
00259       CORBA::Any any;
00260       any <<= timeout;
00261 
00262       this->policy_list_.length (1);
00263       this->policy_list_[0] =
00264         this->orb_->create_policy (
00265                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00266                any);
00267 
00268       // Only schedule the timer, when the rate is not zero
00269       if (this->rate_ != ACE_Time_Value::zero)
00270       {
00271         // Schedule the timer after these policies has been set, because the
00272         // handle_timeout uses these policies, if done in front, the channel
00273         // can crash when the timeout expires before initiazation is ready.
00274         timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00275                                                     0,
00276                                                     this->rate_,
00277                                                     this->rate_);
00278         if (timer_id_ == -1)
00279           return -1;
00280       }
00281     }
00282   catch (const CORBA::Exception&)
00283     {
00284       return -1;
00285     }
00286 #endif /* TAO_HAS_CORBA_MESSAGING */
00287 
00288   return 0;
00289 }

void TAO_CEC_Reactive_SupplierControl::handle_timeout ( const ACE_Time_Value tv,
const void *  arg 
)

Receive the timeout from the adapter.

Definition at line 201 of file CEC_Reactive_SupplierControl.cpp.

References policy_current_, and query_suppliers().

Referenced by TAO_CEC_SupplierControl_Adapter::handle_timeout().

00204 {
00205   try
00206     {
00207       // Query the state of the Current object *before* we initiate
00208       // the iteration...
00209       CORBA::PolicyTypeSeq types;
00210       CORBA::PolicyList_var policies =
00211         this->policy_current_->get_policy_overrides (types);
00212 
00213       // Change the timeout
00214       this->policy_current_->set_policy_overrides (this->policy_list_,
00215                                                    CORBA::ADD_OVERRIDE);
00216 
00217       try
00218         {
00219           // Query the state of the suppliers...
00220           this->query_suppliers ();
00221         }
00222       catch (const CORBA::Exception&)
00223         {
00224           // Ignore all exceptions
00225         }
00226 
00227       this->policy_current_->set_policy_overrides (policies.in (),
00228                                                    CORBA::SET_OVERRIDE);
00229       for (CORBA::ULong i = 0; i != policies->length (); ++i)
00230         {
00231           policies[i]->destroy ();
00232         }
00233     }
00234   catch (const CORBA::Exception&)
00235     {
00236       // Ignore all exceptions
00237     }
00238 }

bool TAO_CEC_Reactive_SupplierControl::need_to_disconnect ( PortableServer::ServantBase proxy  )  [virtual]

Do we need to disconnect this supplier? The parameter type for proxy is PortableServer::ServantBase* due to the fact that this method will be used for TAO_CEC_ProxyPushSupplier's and TAO_CEC_ProxyPullSupplier's.

Reimplemented from TAO_CEC_SupplierControl.

Definition at line 124 of file CEC_Reactive_SupplierControl.cpp.

00126 {
00127   bool disconnect = true;
00128 
00129 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00130   if (this->typed_event_channel_)
00131     {
00132       // Typed EC
00133       TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00134       if (this->typed_event_channel_->
00135           get_servant_retry_map ().find (proxy, entry) == 0)
00136         {
00137           ++entry->int_id_;
00138           if (entry->int_id_ <= this->retries_)
00139             {
00140               disconnect = false;
00141             }
00142         }
00143     }
00144   else
00145     {
00146 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00147 
00148   // Un-typed EC
00149   TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00150   if (this->event_channel_->
00151       get_servant_retry_map ().find (proxy, entry) == 0)
00152     {
00153       ++entry->int_id_;
00154       if (entry->int_id_ <= this->retries_)
00155         {
00156           disconnect = false;
00157         }
00158     }
00159 
00160 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00161     }
00162 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00163 
00164   return disconnect;
00165 }

void TAO_CEC_Reactive_SupplierControl::query_suppliers ( void   )  [private]

Check if the suppliers still exists. It is a helper method for handle_timeout() to isolate the exceptions.

Definition at line 97 of file CEC_Reactive_SupplierControl.cpp.

References event_channel_, TAO_CEC_SupplierAdmin::for_each(), and TAO_CEC_EventChannel::supplier_admin().

Referenced by handle_timeout().

00098 {
00099 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00100   if (this->typed_event_channel_)
00101     {
00102       // Typed EC
00103       TAO_CEC_Ping_Typed_Push_Supplier push_worker (this);
00104 
00105       this->typed_event_channel_->typed_supplier_admin ()->for_each (&push_worker);
00106     }
00107   else
00108     {
00109 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00110 
00111   // Un-typed EC
00112   TAO_CEC_Ping_Push_Supplier push_worker (this);
00113   this->event_channel_->supplier_admin ()->for_each (&push_worker);
00114 
00115   TAO_CEC_Ping_Pull_Supplier pull_worker (this);
00116   this->event_channel_->supplier_admin ()->for_each (&pull_worker);
00117 
00118 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00119     }
00120 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00121 }

int TAO_CEC_Reactive_SupplierControl::shutdown ( void   )  [virtual]

Reimplemented from TAO_CEC_SupplierControl.

Definition at line 292 of file CEC_Reactive_SupplierControl.cpp.

References adapter_, ACE_Reactor::cancel_timer(), ACE_Event_Handler::reactor(), reactor_, and timer_id_.

00293 {
00294   int r = 0;
00295 
00296 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00297   r = this->reactor_->cancel_timer (timer_id_);
00298 #endif /* TAO_HAS_CORBA_MESSAGING */
00299   this->adapter_.reactor (0);
00300   return r;
00301 }

void TAO_CEC_Reactive_SupplierControl::successful_transmission ( PortableServer::ServantBase proxy  )  [virtual]

Allow others to inform us when a send or receive was successful.

Reimplemented from TAO_CEC_SupplierControl.

Definition at line 168 of file CEC_Reactive_SupplierControl.cpp.

00170 {
00171 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00172   if (this->typed_event_channel_)
00173     {
00174       // Typed EC
00175       TAO_CEC_TypedEventChannel::ServantRetryMap::ENTRY* entry = 0;
00176       if (this->typed_event_channel_->
00177           get_servant_retry_map ().find (proxy, entry) == 0)
00178         {
00179           entry->int_id_ = 0;
00180         }
00181     }
00182   else
00183     {
00184 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00185 
00186   // Un-typed EC
00187   TAO_CEC_EventChannel::ServantRetryMap::ENTRY* entry = 0;
00188   if (this->event_channel_->
00189       get_servant_retry_map ().find (proxy, entry) == 0)
00190     {
00191       entry->int_id_ = 0;
00192     }
00193 
00194 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00195     }
00196 #endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
00197 
00198 }

void TAO_CEC_Reactive_SupplierControl::supplier_not_exist ( TAO_CEC_ProxyPullConsumer proxy  )  [virtual]

Invoked by helper classes when they detect that a supplier does not exists (i.e. _non_existent() returns true and/or the CORBA::OBJECT_NOT_EXIST exception has been raised).

Reimplemented from TAO_CEC_SupplierControl.

Definition at line 334 of file CEC_Reactive_SupplierControl.cpp.

References TAO_CEC_ProxyPullConsumer::disconnect_pull_consumer().

00336 {
00337   try
00338     {
00339       proxy->disconnect_pull_consumer ();
00340     }
00341   catch (const CORBA::Exception&)
00342     {
00343       // Ignore all exceptions..
00344     }
00345 }

void TAO_CEC_Reactive_SupplierControl::supplier_not_exist ( TAO_CEC_ProxyPushConsumer proxy  )  [virtual]

Invoked by helper classes when they detect that a supplier does not exists (i.e. _non_existent() returns true and/or the CORBA::OBJECT_NOT_EXIST exception has been raised).

Reimplemented from TAO_CEC_SupplierControl.

Definition at line 304 of file CEC_Reactive_SupplierControl.cpp.

References TAO_CEC_ProxyPushConsumer::disconnect_push_consumer().

00306 {
00307   try
00308     {
00309       proxy->disconnect_push_consumer ();
00310     }
00311   catch (const CORBA::Exception&)
00312     {
00313       // Ignore all exceptions..
00314     }
00315 }

void TAO_CEC_Reactive_SupplierControl::system_exception ( TAO_CEC_ProxyPullConsumer proxy,
CORBA::SystemException  
) [virtual]

Some system exception was rasied while trying to push an event.

Reimplemented from TAO_CEC_SupplierControl.

Definition at line 348 of file CEC_Reactive_SupplierControl.cpp.

References TAO_CEC_ProxyPullConsumer::disconnect_pull_consumer().

00351 {
00352   try
00353     {
00354       if (this->need_to_disconnect (proxy))
00355         {
00356           proxy->disconnect_pull_consumer ();
00357         }
00358     }
00359   catch (const CORBA::Exception&)
00360     {
00361       // Ignore all exceptions..
00362     }
00363 }


Member Data Documentation

TAO_CEC_SupplierControl_Adapter TAO_CEC_Reactive_SupplierControl::adapter_ [private]

The Adapter for the reactor events.

Definition at line 145 of file CEC_Reactive_SupplierControl.h.

Referenced by shutdown().

TAO_CEC_EventChannel* TAO_CEC_Reactive_SupplierControl::event_channel_ [private]

The event channel.

Definition at line 148 of file CEC_Reactive_SupplierControl.h.

Referenced by query_suppliers().

CORBA::ORB_var TAO_CEC_Reactive_SupplierControl::orb_ [private]

The ORB.

Definition at line 156 of file CEC_Reactive_SupplierControl.h.

Referenced by activate().

CORBA::PolicyCurrent_var TAO_CEC_Reactive_SupplierControl::policy_current_ [private]

To control the timeout policy in the thread.

Definition at line 159 of file CEC_Reactive_SupplierControl.h.

Referenced by activate(), and handle_timeout().

CORBA::PolicyList TAO_CEC_Reactive_SupplierControl::policy_list_ [private]

Precomputed policy list to the set timeout.

Definition at line 162 of file CEC_Reactive_SupplierControl.h.

Referenced by activate().

ACE_Time_Value TAO_CEC_Reactive_SupplierControl::rate_ [private]

The polling rate.

Definition at line 136 of file CEC_Reactive_SupplierControl.h.

ACE_Reactor* TAO_CEC_Reactive_SupplierControl::reactor_ [private]

The ORB reactor.

Definition at line 165 of file CEC_Reactive_SupplierControl.h.

Referenced by activate(), and shutdown().

unsigned int TAO_CEC_Reactive_SupplierControl::retries_ [private]

The number of retries per proxy until it is disconnected.

Definition at line 142 of file CEC_Reactive_SupplierControl.h.

ACE_Time_Value TAO_CEC_Reactive_SupplierControl::timeout_ [private]

The polling timeout.

Definition at line 139 of file CEC_Reactive_SupplierControl.h.

long TAO_CEC_Reactive_SupplierControl::timer_id_ [private]

The timer id.

Definition at line 169 of file CEC_Reactive_SupplierControl.h.

Referenced by activate(), and shutdown().


The documentation for this class was generated from the following files:
Generated on Tue Feb 2 17:45:01 2010 for TAO_CosEvent by  doxygen 1.4.7