#include <EC_Reactive_ConsumerControl.h>
Inheritance diagram for TAO_EC_Reactive_ConsumerControl:
Public Member Functions | |
TAO_EC_Reactive_ConsumerControl (const ACE_Time_Value &rate, const ACE_Time_Value &timeout, TAO_EC_Event_Channel_Base *event_channel, CORBA::ORB_ptr orb) | |
virtual | ~TAO_EC_Reactive_ConsumerControl (void) |
Destructor. | |
void | handle_timeout (const ACE_Time_Value &tv, const void *arg) |
Receive the timeout from the adapter. | |
virtual int | activate (void) |
virtual int | shutdown (void) |
virtual void | consumer_not_exist (TAO_EC_ProxyPushSupplier *proxy) |
virtual void | system_exception (TAO_EC_ProxyPushSupplier *proxy, CORBA::SystemException &) |
Private Member Functions | |
void | query_consumers () |
Private Attributes | |
ACE_Time_Value | rate_ |
The polling rate. | |
ACE_Time_Value | timeout_ |
The polling timeout. | |
TAO_EC_ConsumerControl_Adapter | adapter_ |
The Adapter for the reactor events. | |
TAO_EC_Event_Channel_Base * | event_channel_ |
The event channel. | |
CORBA::ORB_var | orb_ |
The ORB. | |
CORBA::PolicyCurrent_var | policy_current_ |
To control the timeout policy in the thread. | |
CORBA::PolicyList | policy_list_ |
Precomputed policy list to the set timeout. | |
ACE_Reactor * | reactor_ |
The ORB reactor. | |
long | timer_id_ |
The timer id. |
Defines the interface for the consumer control strategy. This strategy handles misbehaving or failing consumers.
Definition at line 71 of file EC_Reactive_ConsumerControl.h.
|
Constructor. It does not assume ownership of the event_channel parameter. Definition at line 23 of file EC_Reactive_ConsumerControl.cpp. References TAO_HAS_CORBA_MESSAGING.
00027 : rate_ (rate), 00028 timeout_ (timeout), 00029 adapter_ (this), 00030 event_channel_ (ec), 00031 orb_ (CORBA::ORB::_duplicate (orb)) 00032 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00033 , timer_id_ (-1) 00034 #endif /* TAO_HAS_CORBA_MESSAGING != 0*/ 00035 { 00036 this->reactor_ = 00037 this->orb_->orb_core ()->reactor (); 00038 } |
|
Destructor.
Definition at line 40 of file EC_Reactive_ConsumerControl.cpp.
00041 { 00042 } |
|
Activate any internal threads or timers used to poll the state of the consumers Reimplemented from TAO_EC_ConsumerControl. Definition at line 105 of file EC_Reactive_ConsumerControl.cpp. References CORBA::PolicyCurrent::_narrow(), ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_NEW_ENV, TAO::unbounded_object_reference_sequence< Policy, Policy_var >::length(), rate_, ACE_Reactor::schedule_timer(), and ACE_Time_Value::usec().
00106 { 00107 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00108 ACE_TRY_NEW_ENV 00109 { 00110 // Get the PolicyCurrent object 00111 CORBA::Object_var tmp = 00112 this->orb_->resolve_initial_references ("PolicyCurrent" 00113 ACE_ENV_ARG_PARAMETER); 00114 ACE_TRY_CHECK; 00115 00116 this->policy_current_ = 00117 CORBA::PolicyCurrent::_narrow (tmp.in () 00118 ACE_ENV_ARG_PARAMETER); 00119 ACE_TRY_CHECK; 00120 00121 // Timeout for polling state (default = 10 msec) 00122 TimeBase::TimeT timeout = timeout_.usec() * 10; 00123 CORBA::Any any; 00124 any <<= timeout; 00125 00126 this->policy_list_.length (1); 00127 this->policy_list_[0] = 00128 this->orb_->create_policy ( 00129 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE, 00130 any 00131 ACE_ENV_ARG_PARAMETER); 00132 ACE_TRY_CHECK; 00133 00134 // Only schedule the timer, when the rate is not zero 00135 if (this->rate_ != ACE_Time_Value::zero) 00136 { 00137 // Schedule the timer after these policies has been set, because the 00138 // handle_timeout uses these policies, if done in front, the channel 00139 // can crash when the timeout expires before initiazation is ready. 00140 timer_id_ = this->reactor_->schedule_timer (&this->adapter_, 00141 0, 00142 this->rate_, 00143 this->rate_); 00144 if (timer_id_ == -1) 00145 return -1; 00146 } 00147 } 00148 ACE_CATCHANY 00149 { 00150 return -1; 00151 } 00152 ACE_ENDTRY; 00153 #endif /* TAO_HAS_CORBA_MESSAGING */ 00154 00155 return 0; 00156 } |
|
When pushing an event to the consumer a CORBA::OBJECT_NOT_EXIST exception was raised. The only interpretation is that the object has been destroyed. The strategy has to (at the very least), reclaim all the resources attached to that object. Reimplemented from TAO_EC_ConsumerControl. Definition at line 171 of file EC_Reactive_ConsumerControl.cpp. References ACE_ANY_EXCEPTION, ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_PRINT_EXCEPTION, ACE_TRY, and ACE_TRY_CHECK.
00174 { 00175 ACE_TRY 00176 { 00177 //ACE_DEBUG ((LM_DEBUG, 00178 // "EC_Reactive_ConsumerControl(%P|%t) - " 00179 // "Consumer %x does not exists\n", long(proxy))); 00180 proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); 00181 ACE_TRY_CHECK; 00182 } 00183 ACE_CATCHANY 00184 { 00185 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, 00186 "Reactive_ConsumerControl::consumer_not_exist"); 00187 // Ignore all exceptions.. 00188 } 00189 ACE_ENDTRY; 00190 } |
|
Receive the timeout from the adapter.
Definition at line 55 of file EC_Reactive_ConsumerControl.cpp. References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_NEW_ENV, and query_consumers(). Referenced by TAO_EC_ConsumerControl_Adapter::handle_timeout().
00058 { 00059 // NOTE, setting RELATIVE_RT_TIMEOUT_POLICY for the duration of 00060 // query_consumers () below has greater impact than desired. For 00061 // example, while we are pinging consumers here, a nested upcall, 00062 // which requires making remote calls may come into the ORB. Those 00063 // remote calls will be carried out with with 00064 // RELATIVE_RT_TIMEOUT_POLICY set here in effect. 00065 00066 // @@ TODO: should use Guard to set and reset policies. 00067 ACE_TRY_NEW_ENV 00068 { 00069 // Query the state of the Current object *before* we initiate 00070 // the iteration... 00071 CORBA::PolicyTypeSeq types; 00072 CORBA::PolicyList_var policies = 00073 this->policy_current_->get_policy_overrides (types 00074 ACE_ENV_ARG_PARAMETER); 00075 ACE_TRY_CHECK; 00076 00077 // Change the timeout 00078 this->policy_current_->set_policy_overrides (this->policy_list_, 00079 CORBA::ADD_OVERRIDE 00080 ACE_ENV_ARG_PARAMETER); 00081 ACE_TRY_CHECK; 00082 00083 // Query the state of the consumers... 00084 this->query_consumers (ACE_ENV_SINGLE_ARG_PARAMETER); 00085 ACE_TRY_CHECK; 00086 00087 this->policy_current_->set_policy_overrides (policies.in (), 00088 CORBA::SET_OVERRIDE 00089 ACE_ENV_ARG_PARAMETER); 00090 ACE_TRY_CHECK; 00091 for (CORBA::ULong i = 0; i != policies->length (); ++i) 00092 { 00093 policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); 00094 ACE_TRY_CHECK; 00095 } 00096 } 00097 ACE_CATCHANY 00098 { 00099 // Ignore all exceptions 00100 } 00101 ACE_ENDTRY; 00102 } |
|
Check if the consumers still exists. It is a helper method for handle_timeout() to isolate the exceptions. Definition at line 45 of file EC_Reactive_ConsumerControl.cpp. References ACE_CHECK, ACE_ENV_ARG_PARAMETER, and TAO_EC_Event_Channel_Base::for_each_consumer(). Referenced by handle_timeout().
00047 { 00048 TAO_EC_Ping_Consumer worker (this); 00049 this->event_channel_->for_each_consumer (&worker 00050 ACE_ENV_ARG_PARAMETER); 00051 ACE_CHECK; 00052 } |
|
Reimplemented from TAO_EC_ConsumerControl. Definition at line 159 of file EC_Reactive_ConsumerControl.cpp. References ACE_Reactor::cancel_timer(), and ACE_Event_Handler::reactor().
00160 { 00161 int r = 0; 00162 00163 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00164 r = this->reactor_->cancel_timer (timer_id_); 00165 #endif /* TAO_HAS_CORBA_MESSAGING */ 00166 this->adapter_.reactor (0); 00167 return r; 00168 } |
|
Some system exception was raised while trying to contact the consumer Reimplemented from TAO_EC_ConsumerControl. Definition at line 193 of file EC_Reactive_ConsumerControl.cpp. References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, and TAO_EC_ProxyPushSupplier::disconnect_push_supplier().
00197 { 00198 ACE_TRY 00199 { 00200 // The current implementation is very strict, and kicks out a 00201 // client on the first system exception. We may 00202 // want to be more lenient in the future, for example, 00203 // this is TAO's minor code for a failed connection. 00204 // 00205 // if (CORBA::TRANSIENT::_narrow (&exception) != 0 00206 // && exception->minor () == 0x54410085) 00207 // return; 00208 00209 // Anything else is serious, including timeouts... 00210 proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); 00211 ACE_TRY_CHECK; 00212 } 00213 ACE_CATCHANY 00214 { 00215 // Ignore all exceptions.. 00216 } 00217 ACE_ENDTRY; 00218 } |
|
The Adapter for the reactor events.
Definition at line 110 of file EC_Reactive_ConsumerControl.h. |
|
The event channel.
Definition at line 113 of file EC_Reactive_ConsumerControl.h. |
|
The ORB.
Definition at line 116 of file EC_Reactive_ConsumerControl.h. |
|
To control the timeout policy in the thread.
Definition at line 119 of file EC_Reactive_ConsumerControl.h. |
|
Precomputed policy list to the set timeout.
Definition at line 122 of file EC_Reactive_ConsumerControl.h. |
|
The polling rate.
Definition at line 104 of file EC_Reactive_ConsumerControl.h. Referenced by activate(). |
|
The ORB reactor.
Definition at line 125 of file EC_Reactive_ConsumerControl.h. |
|
The polling timeout.
Definition at line 107 of file EC_Reactive_ConsumerControl.h. |
|
The timer id.
Definition at line 129 of file EC_Reactive_ConsumerControl.h. |