#include <ECG_Reactive_ConsumerEC_Control.h>
Inheritance diagram for TAO_ECG_Reactive_ConsumerEC_Control:
Public Member Functions | |
TAO_ECG_Reactive_ConsumerEC_Control (const ACE_Time_Value &rate, const ACE_Time_Value &timeout, TAO_EC_Gateway_IIOP *gateway, CORBA::ORB_ptr orb) | |
virtual | ~TAO_ECG_Reactive_ConsumerEC_Control (void) |
Destructor. | |
void | handle_timeout (const ACE_Time_Value &tv, const void *arg) |
Receive the timeout from the adapter. | |
virtual int | activate (void) |
virtual int | shutdown (void) |
virtual void | event_channel_not_exist (TAO_EC_Gateway_IIOP *gateway) |
virtual void | system_exception (TAO_EC_Gateway_IIOP *gateway, CORBA::SystemException &) |
Private Member Functions | |
void | query_eventchannel () |
Private Attributes | |
ACE_Time_Value | rate_ |
The polling rate. | |
ACE_Time_Value | timeout_ |
The polling timeout. | |
TAO_ECG_Reactive_ConsumerEC_Control_Adapter | adapter_ |
The Adapter for the reactor events. | |
TAO_EC_Gateway_IIOP * | gateway_ |
The gateway. | |
CORBA::ORB_var | orb_ |
The ORB. | |
CORBA::PolicyCurrent_var | policy_current_ |
To control the timeout policy in the thread. | |
CORBA::PolicyList | policy_list_ |
Precomputed policy list to the set timeout. | |
ACE_Reactor * | reactor_ |
The ORB reactor. | |
long | timer_id_ |
The timer id. |
Defines the interface for the consumer event channel control strategy. This strategy handles misbehaving or failing consumer event channels.
Definition at line 66 of file ECG_Reactive_ConsumerEC_Control.h.
|
Constructor. It does not assume ownership of the event_channel parameter. Definition at line 15 of file ECG_Reactive_ConsumerEC_Control.cpp. References TAO_HAS_CORBA_MESSAGING.
00019 : rate_ (rate), 00020 timeout_ (timeout), 00021 adapter_ (this), 00022 gateway_ (gateway), 00023 orb_ (CORBA::ORB::_duplicate (orb)) 00024 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00025 , timer_id_ (-1) 00026 #endif /* TAO_HAS_CORBA_MESSAGING != 0*/ 00027 { 00028 this->reactor_ = 00029 this->orb_->orb_core ()->reactor (); 00030 } |
|
Destructor.
Definition at line 32 of file ECG_Reactive_ConsumerEC_Control.cpp.
00033 { 00034 } |
|
Activate any internal threads or timers used to poll the state of the event channel. Reimplemented from TAO_ECG_ConsumerEC_Control. Definition at line 124 of file ECG_Reactive_ConsumerEC_Control.cpp. References CORBA::PolicyCurrent::_narrow(), ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_NEW_ENV, TAO::unbounded_object_reference_sequence< Policy, Policy_var >::length(), ACE_Reactor::schedule_timer(), and ACE_Time_Value::usec().
00125 { 00126 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00127 ACE_TRY_NEW_ENV 00128 { 00129 // Get the PolicyCurrent object 00130 CORBA::Object_var tmp = 00131 this->orb_->resolve_initial_references ("PolicyCurrent" 00132 ACE_ENV_ARG_PARAMETER); 00133 ACE_TRY_CHECK; 00134 00135 this->policy_current_ = 00136 CORBA::PolicyCurrent::_narrow (tmp.in () 00137 ACE_ENV_ARG_PARAMETER); 00138 ACE_TRY_CHECK; 00139 00140 // Timeout for polling state (default = 10 msec) 00141 TimeBase::TimeT timeout = timeout_.usec() * 10; 00142 CORBA::Any any; 00143 any <<= timeout; 00144 00145 this->policy_list_.length (1); 00146 this->policy_list_[0] = 00147 this->orb_->create_policy ( 00148 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE, 00149 any 00150 ACE_ENV_ARG_PARAMETER); 00151 ACE_TRY_CHECK; 00152 00153 // Only schedule the timer, when the rate is not zero 00154 if (this->rate_ != ACE_Time_Value::zero) 00155 { 00156 // Schedule the timer after these policies has been set, because the 00157 // handle_timeout uses these policies, if done in front, the channel 00158 // can crash when the timeout expires before initiazation is ready. 00159 timer_id_ = this->reactor_->schedule_timer (&this->adapter_, 00160 0, 00161 this->rate_, 00162 this->rate_); 00163 if (timer_id_ == -1) 00164 return -1; 00165 } 00166 } 00167 ACE_CATCHANY 00168 { 00169 return -1; 00170 } 00171 ACE_ENDTRY; 00172 #endif /* TAO_HAS_CORBA_MESSAGING */ 00173 00174 return 0; 00175 } |
|
When pushing an event to the event channel a CORBA::OBJECT_NOT_EXIST exception was raised. The only interpretation is that the object has been destroyed. The strategy has to (at the very least), reclaim all the resources attached to that object. Reimplemented from TAO_ECG_ConsumerEC_Control. Definition at line 191 of file ECG_Reactive_ConsumerEC_Control.cpp. References ACE_ANY_EXCEPTION, ACE_CATCHANY, ACE_DEBUG, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_PRINT_EXCEPTION, ACE_TRY, ACE_TRY_CHECK, and LM_DEBUG. Referenced by query_eventchannel().
00194 { 00195 ACE_TRY 00196 { 00197 ACE_DEBUG ((LM_DEBUG, 00198 "EC_Reactive_ConsumerControl(%P|%t) - " 00199 "channel %x does not exists\n")); 00200 gateway->cleanup_consumer_ec (); 00201 00202 gateway->cleanup_consumer_proxies (ACE_ENV_SINGLE_ARG_PARAMETER); 00203 ACE_TRY_CHECK; 00204 00205 } 00206 ACE_CATCHANY 00207 { 00208 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, 00209 "TAO_EC_Reactive_ConsumerControl::event_channel_not_exist"); 00210 // Ignore all exceptions.. 00211 } 00212 ACE_ENDTRY; 00213 } |
|
Receive the timeout from the adapter.
Definition at line 74 of file ECG_Reactive_ConsumerEC_Control.cpp. References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_NEW_ENV, and query_eventchannel(). Referenced by TAO_ECG_Reactive_ConsumerEC_Control_Adapter::handle_timeout().
00077 { 00078 // NOTE, setting RELATIVE_RT_TIMEOUT_POLICY for the duration of 00079 // query_eventchannel () below has greater impact than desired. For 00080 // example, while we are pinging ec here, a nested upcall, 00081 // which requires making remote calls may come into the ORB. Those 00082 // remote calls will be carried out with with 00083 // RELATIVE_RT_TIMEOUT_POLICY set here in effect. 00084 00085 // @@ TODO: should use Guard to set and reset policies. 00086 ACE_TRY_NEW_ENV 00087 { 00088 // Query the state of the Current object *before* we initiate 00089 // the iteration... 00090 CORBA::PolicyTypeSeq types; 00091 CORBA::PolicyList_var policies = 00092 this->policy_current_->get_policy_overrides (types 00093 ACE_ENV_ARG_PARAMETER); 00094 ACE_TRY_CHECK; 00095 00096 // Change the timeout 00097 this->policy_current_->set_policy_overrides (this->policy_list_, 00098 CORBA::ADD_OVERRIDE 00099 ACE_ENV_ARG_PARAMETER); 00100 ACE_TRY_CHECK; 00101 00102 // Query the state of the consumers... 00103 this->query_eventchannel (ACE_ENV_SINGLE_ARG_PARAMETER); 00104 ACE_TRY_CHECK; 00105 00106 this->policy_current_->set_policy_overrides (policies.in (), 00107 CORBA::SET_OVERRIDE 00108 ACE_ENV_ARG_PARAMETER); 00109 ACE_TRY_CHECK; 00110 for (CORBA::ULong i = 0; i != policies->length (); ++i) 00111 { 00112 policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); 00113 ACE_TRY_CHECK; 00114 } 00115 } 00116 ACE_CATCHANY 00117 { 00118 // Ignore all exceptions 00119 } 00120 ACE_ENDTRY; 00121 } |
|
Check if the consumers still exists. It is a helper method for handle_timeout() to isolate the exceptions. Definition at line 37 of file ECG_Reactive_ConsumerEC_Control.cpp. References ACE_CATCH, ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, TAO_EC_Gateway_IIOP::consumer_ec_non_existent(), event_channel_not_exist(), and gateway_. Referenced by handle_timeout().
00039 { 00040 ACE_TRY 00041 { 00042 CORBA::Boolean disconnected; 00043 CORBA::Boolean non_existent = 00044 gateway_->consumer_ec_non_existent (disconnected 00045 ACE_ENV_ARG_PARAMETER); 00046 ACE_TRY_CHECK; 00047 if (non_existent && !disconnected) 00048 { 00049 this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER); 00050 ACE_TRY_CHECK; 00051 } 00052 } 00053 ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex) 00054 { 00055 this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER); 00056 ACE_TRY_CHECK; 00057 } 00058 ACE_CATCH (CORBA::TRANSIENT, transient) 00059 { 00060 // This is TAO's minor code for a failed connection, we may 00061 // want to be more lenient in the future.. 00062 // if (transient.minor () == 0x54410085) 00063 this->event_channel_not_exist (gateway_ ACE_ENV_ARG_PARAMETER); 00064 ACE_TRY_CHECK; 00065 } 00066 ACE_CATCHANY 00067 { 00068 // Ignore all exceptions 00069 } 00070 ACE_ENDTRY; 00071 } |
|
Shutdown any internal threads or timers used to poll the state of the event channel. Reimplemented from TAO_ECG_ConsumerEC_Control. Definition at line 178 of file ECG_Reactive_ConsumerEC_Control.cpp. References ACE_Reactor::cancel_timer(), and ACE_Event_Handler::reactor().
00179 { 00180 int r = 0; 00181 00182 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00183 r = this->reactor_->cancel_timer (timer_id_); 00184 #endif /* TAO_HAS_CORBA_MESSAGING */ 00185 00186 this->adapter_.reactor (0); 00187 return r; 00188 } |
|
Some system exception was raised while trying to contact the event channel Reimplemented from TAO_ECG_ConsumerEC_Control. Definition at line 216 of file ECG_Reactive_ConsumerEC_Control.cpp. References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, TAO_EC_Gateway_IIOP::cleanup_consumer_ec(), and TAO_EC_Gateway_IIOP::cleanup_consumer_proxies().
00220 { 00221 ACE_TRY 00222 { 00223 gateway->cleanup_consumer_ec (); 00224 00225 gateway->cleanup_consumer_proxies (ACE_ENV_SINGLE_ARG_PARAMETER); 00226 ACE_TRY_CHECK; 00227 00228 } 00229 ACE_CATCHANY 00230 { 00231 // Ignore all exceptions.. 00232 } 00233 ACE_ENDTRY; 00234 } |
|
The Adapter for the reactor events.
Definition at line 106 of file ECG_Reactive_ConsumerEC_Control.h. |
|
The gateway.
Definition at line 109 of file ECG_Reactive_ConsumerEC_Control.h. Referenced by query_eventchannel(). |
|
The ORB.
Definition at line 112 of file ECG_Reactive_ConsumerEC_Control.h. |
|
To control the timeout policy in the thread.
Definition at line 115 of file ECG_Reactive_ConsumerEC_Control.h. |
|
Precomputed policy list to the set timeout.
Definition at line 118 of file ECG_Reactive_ConsumerEC_Control.h. |
|
The polling rate.
Definition at line 100 of file ECG_Reactive_ConsumerEC_Control.h. |
|
The ORB reactor.
Definition at line 121 of file ECG_Reactive_ConsumerEC_Control.h. |
|
The polling timeout.
Definition at line 103 of file ECG_Reactive_ConsumerEC_Control.h. |
|
The timer id.
Definition at line 125 of file ECG_Reactive_ConsumerEC_Control.h. |