#include <EC_Reactive_ConsumerControl.h>
Inheritance diagram for TAO_EC_Reactive_ConsumerControl:
Public Member Functions | |
TAO_EC_Reactive_ConsumerControl (const ACE_Time_Value &rate, const ACE_Time_Value &timeout, TAO_EC_Event_Channel_Base *event_channel, CORBA::ORB_ptr orb) | |
virtual | ~TAO_EC_Reactive_ConsumerControl (void) |
Destructor. | |
void | handle_timeout (const ACE_Time_Value &tv, const void *arg) |
Receive the timeout from the adapter. | |
virtual int | activate (void) |
virtual int | shutdown (void) |
virtual void | consumer_not_exist (TAO_EC_ProxyPushSupplier *proxy) |
virtual void | system_exception (TAO_EC_ProxyPushSupplier *proxy, CORBA::SystemException &) |
Private Member Functions | |
void | query_consumers (void) |
Private Attributes | |
ACE_Time_Value | rate_ |
The polling rate. | |
ACE_Time_Value | timeout_ |
The polling timeout. | |
TAO_EC_ConsumerControl_Adapter | adapter_ |
The Adapter for the reactor events. | |
TAO_EC_Event_Channel_Base * | event_channel_ |
The event channel. | |
CORBA::ORB_var | orb_ |
The ORB. | |
CORBA::PolicyCurrent_var | policy_current_ |
To control the timeout policy in the thread. | |
CORBA::PolicyList | policy_list_ |
Precomputed policy list to the set timeout. | |
ACE_Reactor * | reactor_ |
The ORB reactor. | |
long | timer_id_ |
The timer id. |
Defines the interface for the consumer control strategy. This strategy handles misbehaving or failing consumers.
Definition at line 71 of file EC_Reactive_ConsumerControl.h.
TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_EC_Reactive_ConsumerControl::TAO_EC_Reactive_ConsumerControl | ( | const ACE_Time_Value & | rate, | |
const ACE_Time_Value & | timeout, | |||
TAO_EC_Event_Channel_Base * | event_channel, | |||
CORBA::ORB_ptr | orb | |||
) |
Constructor. It does not assume ownership of the event_channel parameter.
Definition at line 23 of file EC_Reactive_ConsumerControl.cpp.
00027 : rate_ (rate), 00028 timeout_ (timeout), 00029 adapter_ (this), 00030 event_channel_ (ec), 00031 orb_ (CORBA::ORB::_duplicate (orb)) 00032 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00033 , timer_id_ (-1) 00034 #endif /* TAO_HAS_CORBA_MESSAGING != 0*/ 00035 { 00036 this->reactor_ = 00037 this->orb_->orb_core ()->reactor (); 00038 }
TAO_EC_Reactive_ConsumerControl::~TAO_EC_Reactive_ConsumerControl | ( | void | ) | [virtual] |
int TAO_EC_Reactive_ConsumerControl::activate | ( | void | ) | [virtual] |
Activate any internal threads or timers used to poll the state of the consumers
Reimplemented from TAO_EC_ConsumerControl.
Definition at line 93 of file EC_Reactive_ConsumerControl.cpp.
References TAO_Pseudo_Var_T< T >::in(), orb_, policy_current_, policy_list_, reactor_, ACE_Reactor::schedule_timer(), timeout_, timer_id_, ACE_Time_Value::usec(), and ACE_Time_Value::zero.
00094 { 00095 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00096 try 00097 { 00098 // Get the PolicyCurrent object 00099 CORBA::Object_var tmp = 00100 this->orb_->resolve_initial_references ("PolicyCurrent"); 00101 00102 this->policy_current_ = 00103 CORBA::PolicyCurrent::_narrow (tmp.in ()); 00104 00105 // Timeout for polling state (default = 10 msec) 00106 TimeBase::TimeT timeout = timeout_.usec() * 10; 00107 CORBA::Any any; 00108 any <<= timeout; 00109 00110 this->policy_list_.length (1); 00111 this->policy_list_[0] = 00112 this->orb_->create_policy ( 00113 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE, 00114 any); 00115 00116 // Only schedule the timer, when the rate is not zero 00117 if (this->rate_ != ACE_Time_Value::zero) 00118 { 00119 // Schedule the timer after these policies has been set, because the 00120 // handle_timeout uses these policies, if done in front, the channel 00121 // can crash when the timeout expires before initiazation is ready. 00122 timer_id_ = this->reactor_->schedule_timer (&this->adapter_, 00123 0, 00124 this->rate_, 00125 this->rate_); 00126 if (timer_id_ == -1) 00127 return -1; 00128 } 00129 } 00130 catch (const CORBA::Exception&) 00131 { 00132 return -1; 00133 } 00134 #endif /* TAO_HAS_CORBA_MESSAGING */ 00135 00136 return 0; 00137 }
void TAO_EC_Reactive_ConsumerControl::consumer_not_exist | ( | TAO_EC_ProxyPushSupplier * | proxy | ) | [virtual] |
When pushing an event to the consumer a CORBA::OBJECT_NOT_EXIST exception was raised. The only interpretation is that the object has been destroyed. The strategy has to (at the very least), reclaim all the resources attached to that object.
Reimplemented from TAO_EC_ConsumerControl.
Definition at line 152 of file EC_Reactive_ConsumerControl.cpp.
References CORBA::Exception::_tao_print_exception(), and TAO_EC_ProxyPushSupplier::disconnect_push_supplier().
00154 { 00155 try 00156 { 00157 //ACE_DEBUG ((LM_DEBUG, 00158 // "EC_Reactive_ConsumerControl(%P|%t) - " 00159 // "Consumer %x does not exists\n", long(proxy))); 00160 proxy->disconnect_push_supplier (); 00161 } 00162 catch (const CORBA::Exception& ex) 00163 { 00164 ex._tao_print_exception ("Reactive_ConsumerControl::consumer_not_exist"); 00165 // Ignore all exceptions.. 00166 } 00167 }
void TAO_EC_Reactive_ConsumerControl::handle_timeout | ( | const ACE_Time_Value & | tv, | |
const void * | arg | |||
) |
Receive the timeout from the adapter.
Definition at line 52 of file EC_Reactive_ConsumerControl.cpp.
References policy_current_, and query_consumers().
Referenced by TAO_EC_ConsumerControl_Adapter::handle_timeout().
00055 { 00056 // NOTE, setting RELATIVE_RT_TIMEOUT_POLICY for the duration of 00057 // query_consumers () below has greater impact than desired. For 00058 // example, while we are pinging consumers here, a nested upcall, 00059 // which requires making remote calls may come into the ORB. Those 00060 // remote calls will be carried out with with 00061 // RELATIVE_RT_TIMEOUT_POLICY set here in effect. 00062 00063 // @@ TODO: should use Guard to set and reset policies. 00064 try 00065 { 00066 // Query the state of the Current object *before* we initiate 00067 // the iteration... 00068 CORBA::PolicyTypeSeq types; 00069 CORBA::PolicyList_var policies = 00070 this->policy_current_->get_policy_overrides (types); 00071 00072 // Change the timeout 00073 this->policy_current_->set_policy_overrides (this->policy_list_, 00074 CORBA::ADD_OVERRIDE); 00075 00076 // Query the state of the consumers... 00077 this->query_consumers (); 00078 00079 this->policy_current_->set_policy_overrides (policies.in (), 00080 CORBA::SET_OVERRIDE); 00081 for (CORBA::ULong i = 0; i != policies->length (); ++i) 00082 { 00083 policies[i]->destroy (); 00084 } 00085 } 00086 catch (const CORBA::Exception&) 00087 { 00088 // Ignore all exceptions 00089 } 00090 }
void TAO_EC_Reactive_ConsumerControl::query_consumers | ( | void | ) | [private] |
Check if the consumers still exists. It is a helper method for handle_timeout() to isolate the exceptions.
Definition at line 45 of file EC_Reactive_ConsumerControl.cpp.
References event_channel_, and TAO_EC_Event_Channel_Base::for_each_consumer().
Referenced by handle_timeout().
00046 { 00047 TAO_EC_Ping_Consumer worker (this); 00048 this->event_channel_->for_each_consumer (&worker); 00049 }
int TAO_EC_Reactive_ConsumerControl::shutdown | ( | void | ) | [virtual] |
Reimplemented from TAO_EC_ConsumerControl.
Definition at line 140 of file EC_Reactive_ConsumerControl.cpp.
References adapter_, ACE_Reactor::cancel_timer(), ACE_Event_Handler::reactor(), reactor_, and timer_id_.
00141 { 00142 int r = 0; 00143 00144 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00145 r = this->reactor_->cancel_timer (timer_id_); 00146 #endif /* TAO_HAS_CORBA_MESSAGING */ 00147 this->adapter_.reactor (0); 00148 return r; 00149 }
void TAO_EC_Reactive_ConsumerControl::system_exception | ( | TAO_EC_ProxyPushSupplier * | proxy, | |
CORBA::SystemException & | ||||
) | [virtual] |
Some system exception was raised while trying to contact the consumer
Reimplemented from TAO_EC_ConsumerControl.
Definition at line 170 of file EC_Reactive_ConsumerControl.cpp.
References TAO_EC_ProxyPushSupplier::disconnect_push_supplier().
00173 { 00174 try 00175 { 00176 // The current implementation is very strict, and kicks out a 00177 // client on the first system exception. We may 00178 // want to be more lenient in the future, for example, 00179 // this is TAO's minor code for a failed connection. 00180 // 00181 // if (CORBA::TRANSIENT::_narrow (&exception) != 0 00182 // && exception->minor () == 0x54410085) 00183 // return; 00184 00185 // Anything else is serious, including timeouts... 00186 proxy->disconnect_push_supplier (); 00187 } 00188 catch (const CORBA::Exception&) 00189 { 00190 // Ignore all exceptions.. 00191 } 00192 }
The Adapter for the reactor events.
Definition at line 108 of file EC_Reactive_ConsumerControl.h.
Referenced by shutdown().
The event channel.
Definition at line 111 of file EC_Reactive_ConsumerControl.h.
Referenced by query_consumers().
CORBA::PolicyCurrent_var TAO_EC_Reactive_ConsumerControl::policy_current_ [private] |
To control the timeout policy in the thread.
Definition at line 117 of file EC_Reactive_ConsumerControl.h.
Referenced by activate(), and handle_timeout().
CORBA::PolicyList TAO_EC_Reactive_ConsumerControl::policy_list_ [private] |
Precomputed policy list to the set timeout.
Definition at line 120 of file EC_Reactive_ConsumerControl.h.
Referenced by activate().
The ORB reactor.
Definition at line 123 of file EC_Reactive_ConsumerControl.h.
Referenced by activate(), and shutdown().
The polling timeout.
Definition at line 105 of file EC_Reactive_ConsumerControl.h.
Referenced by activate().
long TAO_EC_Reactive_ConsumerControl::timer_id_ [private] |
The timer id.
Definition at line 127 of file EC_Reactive_ConsumerControl.h.
Referenced by activate(), and shutdown().