#include <EC_Reactive_ConsumerControl.h>
Inheritance diagram for TAO_EC_Reactive_ConsumerControl:


Public Member Functions | |
| TAO_EC_Reactive_ConsumerControl (const ACE_Time_Value &rate, const ACE_Time_Value &timeout, TAO_EC_Event_Channel_Base *event_channel, CORBA::ORB_ptr orb) | |
| virtual | ~TAO_EC_Reactive_ConsumerControl (void) |
| Destructor. | |
| void | handle_timeout (const ACE_Time_Value &tv, const void *arg) |
| Receive the timeout from the adapter. | |
| virtual int | activate (void) |
| virtual int | shutdown (void) |
| virtual void | consumer_not_exist (TAO_EC_ProxyPushSupplier *proxy) |
| virtual void | system_exception (TAO_EC_ProxyPushSupplier *proxy, CORBA::SystemException &) |
Private Member Functions | |
| void | query_consumers () |
Private Attributes | |
| ACE_Time_Value | rate_ |
| The polling rate. | |
| ACE_Time_Value | timeout_ |
| The polling timeout. | |
| TAO_EC_ConsumerControl_Adapter | adapter_ |
| The Adapter for the reactor events. | |
| TAO_EC_Event_Channel_Base * | event_channel_ |
| The event channel. | |
| CORBA::ORB_var | orb_ |
| The ORB. | |
| CORBA::PolicyCurrent_var | policy_current_ |
| To control the timeout policy in the thread. | |
| CORBA::PolicyList | policy_list_ |
| Precomputed policy list to the set timeout. | |
| ACE_Reactor * | reactor_ |
| The ORB reactor. | |
| long | timer_id_ |
| The timer id. | |
Defines the interface for the consumer control strategy. This strategy handles misbehaving or failing consumers.
Definition at line 71 of file EC_Reactive_ConsumerControl.h.
|
||||||||||||||||||||
|
Constructor. It does not assume ownership of the event_channel parameter. Definition at line 23 of file EC_Reactive_ConsumerControl.cpp. References TAO_HAS_CORBA_MESSAGING.
00027 : rate_ (rate), 00028 timeout_ (timeout), 00029 adapter_ (this), 00030 event_channel_ (ec), 00031 orb_ (CORBA::ORB::_duplicate (orb)) 00032 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 00033 , timer_id_ (-1) 00034 #endif /* TAO_HAS_CORBA_MESSAGING != 0*/ 00035 { 00036 this->reactor_ = 00037 this->orb_->orb_core ()->reactor (); 00038 } |
|
|
Destructor.
Definition at line 40 of file EC_Reactive_ConsumerControl.cpp.
00041 {
00042 }
|
|
|
Activate any internal threads or timers used to poll the state of the consumers Reimplemented from TAO_EC_ConsumerControl. Definition at line 105 of file EC_Reactive_ConsumerControl.cpp. References CORBA::PolicyCurrent::_narrow(), ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_NEW_ENV, TAO::unbounded_object_reference_sequence< Policy, Policy_var >::length(), rate_, ACE_Reactor::schedule_timer(), and ACE_Time_Value::usec().
00106 {
00107 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00108 ACE_TRY_NEW_ENV
00109 {
00110 // Get the PolicyCurrent object
00111 CORBA::Object_var tmp =
00112 this->orb_->resolve_initial_references ("PolicyCurrent"
00113 ACE_ENV_ARG_PARAMETER);
00114 ACE_TRY_CHECK;
00115
00116 this->policy_current_ =
00117 CORBA::PolicyCurrent::_narrow (tmp.in ()
00118 ACE_ENV_ARG_PARAMETER);
00119 ACE_TRY_CHECK;
00120
00121 // Timeout for polling state (default = 10 msec)
00122 TimeBase::TimeT timeout = timeout_.usec() * 10;
00123 CORBA::Any any;
00124 any <<= timeout;
00125
00126 this->policy_list_.length (1);
00127 this->policy_list_[0] =
00128 this->orb_->create_policy (
00129 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00130 any
00131 ACE_ENV_ARG_PARAMETER);
00132 ACE_TRY_CHECK;
00133
00134 // Only schedule the timer, when the rate is not zero
00135 if (this->rate_ != ACE_Time_Value::zero)
00136 {
00137 // Schedule the timer after these policies has been set, because the
00138 // handle_timeout uses these policies, if done in front, the channel
00139 // can crash when the timeout expires before initiazation is ready.
00140 timer_id_ = this->reactor_->schedule_timer (&this->adapter_,
00141 0,
00142 this->rate_,
00143 this->rate_);
00144 if (timer_id_ == -1)
00145 return -1;
00146 }
00147 }
00148 ACE_CATCHANY
00149 {
00150 return -1;
00151 }
00152 ACE_ENDTRY;
00153 #endif /* TAO_HAS_CORBA_MESSAGING */
00154
00155 return 0;
00156 }
|
|
|
When pushing an event to the consumer a CORBA::OBJECT_NOT_EXIST exception was raised. The only interpretation is that the object has been destroyed. The strategy has to (at the very least), reclaim all the resources attached to that object. Reimplemented from TAO_EC_ConsumerControl. Definition at line 171 of file EC_Reactive_ConsumerControl.cpp. References ACE_ANY_EXCEPTION, ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_PRINT_EXCEPTION, ACE_TRY, and ACE_TRY_CHECK.
00174 {
00175 ACE_TRY
00176 {
00177 //ACE_DEBUG ((LM_DEBUG,
00178 // "EC_Reactive_ConsumerControl(%P|%t) - "
00179 // "Consumer %x does not exists\n", long(proxy)));
00180 proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00181 ACE_TRY_CHECK;
00182 }
00183 ACE_CATCHANY
00184 {
00185 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00186 "Reactive_ConsumerControl::consumer_not_exist");
00187 // Ignore all exceptions..
00188 }
00189 ACE_ENDTRY;
00190 }
|
|
||||||||||||
|
Receive the timeout from the adapter.
Definition at line 55 of file EC_Reactive_ConsumerControl.cpp. References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY_CHECK, ACE_TRY_NEW_ENV, and query_consumers(). Referenced by TAO_EC_ConsumerControl_Adapter::handle_timeout().
00058 {
00059 // NOTE, setting RELATIVE_RT_TIMEOUT_POLICY for the duration of
00060 // query_consumers () below has greater impact than desired. For
00061 // example, while we are pinging consumers here, a nested upcall,
00062 // which requires making remote calls may come into the ORB. Those
00063 // remote calls will be carried out with with
00064 // RELATIVE_RT_TIMEOUT_POLICY set here in effect.
00065
00066 // @@ TODO: should use Guard to set and reset policies.
00067 ACE_TRY_NEW_ENV
00068 {
00069 // Query the state of the Current object *before* we initiate
00070 // the iteration...
00071 CORBA::PolicyTypeSeq types;
00072 CORBA::PolicyList_var policies =
00073 this->policy_current_->get_policy_overrides (types
00074 ACE_ENV_ARG_PARAMETER);
00075 ACE_TRY_CHECK;
00076
00077 // Change the timeout
00078 this->policy_current_->set_policy_overrides (this->policy_list_,
00079 CORBA::ADD_OVERRIDE
00080 ACE_ENV_ARG_PARAMETER);
00081 ACE_TRY_CHECK;
00082
00083 // Query the state of the consumers...
00084 this->query_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
00085 ACE_TRY_CHECK;
00086
00087 this->policy_current_->set_policy_overrides (policies.in (),
00088 CORBA::SET_OVERRIDE
00089 ACE_ENV_ARG_PARAMETER);
00090 ACE_TRY_CHECK;
00091 for (CORBA::ULong i = 0; i != policies->length (); ++i)
00092 {
00093 policies[i]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00094 ACE_TRY_CHECK;
00095 }
00096 }
00097 ACE_CATCHANY
00098 {
00099 // Ignore all exceptions
00100 }
00101 ACE_ENDTRY;
00102 }
|
|
|
Check if the consumers still exists. It is a helper method for handle_timeout() to isolate the exceptions. Definition at line 45 of file EC_Reactive_ConsumerControl.cpp. References ACE_CHECK, ACE_ENV_ARG_PARAMETER, and TAO_EC_Event_Channel_Base::for_each_consumer(). Referenced by handle_timeout().
00047 {
00048 TAO_EC_Ping_Consumer worker (this);
00049 this->event_channel_->for_each_consumer (&worker
00050 ACE_ENV_ARG_PARAMETER);
00051 ACE_CHECK;
00052 }
|
|
|
Reimplemented from TAO_EC_ConsumerControl. Definition at line 159 of file EC_Reactive_ConsumerControl.cpp. References ACE_Reactor::cancel_timer(), and ACE_Event_Handler::reactor().
00160 {
00161 int r = 0;
00162
00163 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00164 r = this->reactor_->cancel_timer (timer_id_);
00165 #endif /* TAO_HAS_CORBA_MESSAGING */
00166 this->adapter_.reactor (0);
00167 return r;
00168 }
|
|
||||||||||||
|
Some system exception was raised while trying to contact the consumer Reimplemented from TAO_EC_ConsumerControl. Definition at line 193 of file EC_Reactive_ConsumerControl.cpp. References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, and TAO_EC_ProxyPushSupplier::disconnect_push_supplier().
00197 {
00198 ACE_TRY
00199 {
00200 // The current implementation is very strict, and kicks out a
00201 // client on the first system exception. We may
00202 // want to be more lenient in the future, for example,
00203 // this is TAO's minor code for a failed connection.
00204 //
00205 // if (CORBA::TRANSIENT::_narrow (&exception) != 0
00206 // && exception->minor () == 0x54410085)
00207 // return;
00208
00209 // Anything else is serious, including timeouts...
00210 proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00211 ACE_TRY_CHECK;
00212 }
00213 ACE_CATCHANY
00214 {
00215 // Ignore all exceptions..
00216 }
00217 ACE_ENDTRY;
00218 }
|
|
|
The Adapter for the reactor events.
Definition at line 110 of file EC_Reactive_ConsumerControl.h. |
|
|
The event channel.
Definition at line 113 of file EC_Reactive_ConsumerControl.h. |
|
|
The ORB.
Definition at line 116 of file EC_Reactive_ConsumerControl.h. |
|
|
To control the timeout policy in the thread.
Definition at line 119 of file EC_Reactive_ConsumerControl.h. |
|
|
Precomputed policy list to the set timeout.
Definition at line 122 of file EC_Reactive_ConsumerControl.h. |
|
|
The polling rate.
Definition at line 104 of file EC_Reactive_ConsumerControl.h. Referenced by activate(). |
|
|
The ORB reactor.
Definition at line 125 of file EC_Reactive_ConsumerControl.h. |
|
|
The polling timeout.
Definition at line 107 of file EC_Reactive_ConsumerControl.h. |
|
|
The timer id.
Definition at line 129 of file EC_Reactive_ConsumerControl.h. |
1.3.6