#include <Consumer.h>
Inheritance diagram for TAO_Notify_Consumer:
Public Types | |
enum | DispatchStatus { DISPATCH_SUCCESS, DISPATCH_RETRY, DISPATCH_DISCARD, DISPATCH_FAIL } |
Status returned from dispatch attempts. More... | |
Public Member Functions | |
TAO_Notify_Consumer (TAO_Notify_ProxySupplier *proxy) | |
Constuctor. | |
virtual | ~TAO_Notify_Consumer () |
Destructor. | |
TAO_Notify_ProxySupplier * | proxy_supplier (void) |
Access Specific Proxy. | |
virtual TAO_Notify_Proxy * | proxy (void) |
Access Base Proxy. | |
void | deliver (TAO_Notify_Method_Request_Event *request) |
Dispatch Event to consumer. | |
virtual void | push (const CORBA::Any &event)=0 |
Push to this consumer. | |
virtual void | push (const CosNotification::StructuredEvent &event)=0 |
Push to this consumer. | |
virtual void | push (const CosNotification::EventBatch &event)=0 |
Push a batch of events to this consumer. | |
DispatchStatus | dispatch_batch (const CosNotification::EventBatch &batch) |
Dispatch the batch of events to the attached consumer. | |
void | dispatch_pending () |
Dispatch the pending events. | |
CORBA::Boolean | is_suspended (void) |
Is the connection suspended? | |
void | suspend () |
Suspend Connection. | |
void | resume () |
Resume Connection. | |
virtual void | shutdown () |
Shutdown the consumer. | |
virtual void | reconnect_from_consumer (TAO_Notify_Consumer *old_consumer)=0 |
virtual void | qos_changed (const TAO_Notify_QoSProperties &qos_properties) |
Override, Peer::qos_changed. | |
Protected Types | |
typedef ACE_Unbounded_Queue< TAO_Notify_Method_Request_Event_Queueable * > | Request_Queue |
Protected Member Functions | |
DispatchStatus | dispatch_request (TAO_Notify_Method_Request_Event *request) |
virtual bool | dispatch_from_queue (Request_Queue &requests, ACE_Guard< TAO_SYNCH_MUTEX > &ace_mon) |
Attempt to dispatch event from a queue. | |
void | enqueue_request (TAO_Notify_Method_Request_Event *request) |
virtual bool | enqueue_if_necessary (TAO_Notify_Method_Request_Event *request) |
virtual void | dispatch_updates_i (const CosNotification::EventTypeSeq &added, const CosNotification::EventTypeSeq &removed) |
Implementation of Peer specific dispatch_updates. | |
TAO_SYNCH_MUTEX * | proxy_lock (void) |
Get the shared Proxy Lock. | |
virtual int | handle_timeout (const ACE_Time_Value ¤t_time, const void *act=0) |
void | schedule_timer (bool is_error=false) |
Schedule timer. | |
void | cancel_timer (void) |
Cancel timer. | |
Request_Queue & | pending_events () |
= Protected Data Members | |
Protected Attributes | |
TAO_Notify_ProxySupplier * | proxy_ |
The Proxy that we associate with. | |
CORBA::Boolean | is_suspended_ |
Suspended Flag. | |
CosNotifyComm::NotifyPublish_var | publish_ |
Interface that accepts offer_changes. | |
const TAO_Notify_Property_Time & | pacing_ |
The Pacing Interval. | |
TAO_Notify_Property_Long | max_batch_size_ |
Max. batch size. | |
long | timer_id_ |
Timer Id. | |
TAO_Notify_Timer::Ptr | timer_ |
The Timer Manager that we use. | |
Private Attributes | |
ACE_Auto_Ptr< Request_Queue > | pending_events_ |
Events pending to be delivered. |
Definition at line 44 of file Consumer.h.
|
Definition at line 110 of file Consumer.h. Referenced by dispatch_from_queue(), and TAO_Notify_Consumer(). |
|
Status returned from dispatch attempts.
Definition at line 51 of file Consumer.h. Referenced by deliver(), dispatch_batch(), dispatch_from_queue(), and dispatch_request().
00051 { 00052 DISPATCH_SUCCESS, 00053 DISPATCH_RETRY, // retry this message 00054 DISPATCH_DISCARD, // discard this message 00055 DISPATCH_FAIL}; // discard all messages and disconnect consumer |
|
Constuctor.
Definition at line 31 of file Consumer.cpp. References ACE_NEW, pending_events(), pending_events_, Request_Queue, and ACE_Auto_Basic_Ptr< X >::reset().
00032 : proxy_ (proxy) 00033 , is_suspended_ (0) 00034 , pacing_ (proxy->qos_properties_.pacing_interval ()) 00035 , max_batch_size_ (CosNotification::MaximumBatchSize, 0) 00036 , timer_id_ (-1) 00037 , timer_ (0) 00038 { 00039 Request_Queue* pending_events = 0; 00040 ACE_NEW (pending_events, TAO_Notify_Consumer::Request_Queue ()); 00041 this->pending_events_.reset( pending_events ); 00042 00043 this->timer_.reset( this->proxy ()->timer () ); 00044 } |
|
Destructor.
Definition at line 46 of file Consumer.cpp. References cancel_timer().
00047 { 00048 if (this->timer_.isSet()) 00049 { 00050 this->cancel_timer (); 00051 this->timer_.reset (); 00052 } 00053 } |
|
Cancel timer.
Definition at line 638 of file Consumer.cpp. References ACE_DEBUG, ACE_TEXT(), DEBUG_LEVEL, and LM_DEBUG. Referenced by shutdown(), and ~TAO_Notify_Consumer().
00639 { 00640 if (this->timer_.isSet() && this->timer_id_ != -1) 00641 { 00642 if (DEBUG_LEVEL > 5) 00643 ACE_DEBUG ((LM_DEBUG, 00644 ACE_TEXT ("Consumer %d canceling dispatch timer.\n"), 00645 static_cast<int> (this->proxy ()->id ()) 00646 )); 00647 00648 this->timer_->cancel_timer (timer_id_); 00649 } 00650 this->timer_id_ = -1; 00651 } |
|
Dispatch Event to consumer.
Definition at line 150 of file Consumer.cpp. References ACE_CATCHANY, ACE_CHECK, ACE_DEBUG, ACE_DECLARE_NEW_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TEXT(), ACE_TRY, ACE_TRY_CHECK, DEBUG_LEVEL, TAO_Notify_ProxySupplier::destroy(), DISPATCH_DISCARD, DISPATCH_FAIL, dispatch_request(), DISPATCH_RETRY, DISPATCH_SUCCESS, DispatchStatus, enqueue_if_necessary(), enqueue_request(), LM_DEBUG, proxy_supplier(), and schedule_timer(). Referenced by TAO_Notify_Method_Request_Dispatch::execute_i().
00152 { 00153 // Increment reference counts (safely) to prevent this object and its proxy 00154 // from being deleted while the push is in progress. 00155 TAO_Notify_Proxy::Ptr proxy_guard (this->proxy ()); 00156 bool queued = enqueue_if_necessary (request ACE_ENV_ARG_PARAMETER); 00157 ACE_CHECK; 00158 if (!queued) 00159 { 00160 DispatchStatus status = this->dispatch_request (request); 00161 switch (status) 00162 { 00163 case DISPATCH_SUCCESS: 00164 { 00165 request->complete (); 00166 break; 00167 } 00168 case DISPATCH_RETRY: 00169 { 00170 if (DEBUG_LEVEL > 1) 00171 ACE_DEBUG ((LM_DEBUG, 00172 ACE_TEXT ("Consumer %d enqueing event %d due ") 00173 ACE_TEXT ("to failed dispatch.\n"), 00174 static_cast<int> (this->proxy ()->id ()), 00175 request->sequence ())); 00176 this->enqueue_request (request ACE_ENV_ARG_PARAMETER); 00177 ACE_CHECK; 00178 this->schedule_timer (true); 00179 break; 00180 } 00181 case DISPATCH_DISCARD: 00182 { 00183 if (DEBUG_LEVEL > 0) 00184 ACE_DEBUG ((LM_DEBUG, 00185 ACE_TEXT ("(%P|%t) Consumer %d: Error during " 00186 "direct dispatch. Discarding event:%d.\n"), 00187 static_cast<int> (this->proxy ()->id ()), 00188 request->sequence () 00189 )); 00190 request->complete (); 00191 break; 00192 } 00193 case DISPATCH_FAIL: 00194 { 00195 if (DEBUG_LEVEL > 0) 00196 ACE_DEBUG ((LM_DEBUG, 00197 ACE_TEXT ("(%P|%t) Consumer %d: Failed during " 00198 "direct dispatch :%d. Discarding event.\n"), 00199 static_cast<int> (this->proxy ()->id ()), 00200 request->sequence () 00201 )); 00202 request->complete (); 00203 ACE_DECLARE_NEW_ENV; 00204 ACE_TRY 00205 { 00206 this->proxy_supplier ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); 00207 ACE_TRY_CHECK; 00208 } 00209 ACE_CATCHANY 00210 { 00211 // todo is there something meaningful we can do here? 00212 ; 00213 } 00214 ACE_ENDTRY; 00215 break; 00216 } 00217 } 00218 } 00219 } |
|
Dispatch the batch of events to the attached consumer.
Definition at line 359 of file Consumer.cpp. References ACE_CATCH, ACE_CATCHANY, ACE_DEBUG, ACE_DECLARE_NEW_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_TEXT(), ACE_TRY, ACE_TRY_CHECK, DEBUG_LEVEL, DISPATCH_DISCARD, DISPATCH_FAIL, DISPATCH_RETRY, DISPATCH_SUCCESS, DispatchStatus, CosNotification::EventBatch, LM_ERROR, push(), TAO_INVOCATION_SEND_REQUEST_MINOR_CODE, TAO_POA_DISCARDING, and TAO_POA_HOLDING. Referenced by TAO_Notify_SequencePushConsumer::dispatch_from_queue().
00360 { 00361 DispatchStatus result = DISPATCH_SUCCESS; 00362 ACE_DECLARE_NEW_ENV; 00363 ACE_TRY 00364 { 00365 this->push (batch ACE_ENV_ARG_PARAMETER); 00366 ACE_TRY_CHECK; 00367 } 00368 ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex) 00369 { 00370 if (DEBUG_LEVEL > 0) 00371 ACE_DEBUG ((LM_ERROR, 00372 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer " 00373 "%d::dispatch_batch() %s\n"), 00374 static_cast<int> (this->proxy ()->id ()), 00375 ex._info ().c_str () 00376 )); 00377 result = DISPATCH_FAIL; 00378 } 00379 ACE_CATCH (CORBA::TRANSIENT, ex) 00380 { 00381 if (DEBUG_LEVEL > 0) 00382 ACE_DEBUG ((LM_ERROR, 00383 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer " 00384 "%d::dispatch_batch() Transient (minor=%d) %s\n"), 00385 static_cast<int> (this->proxy ()->id ()), 00386 ex.minor (), 00387 ex._info ().c_str () 00388 )); 00389 const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u; 00390 switch (ex.minor () & 0xfffff000u) 00391 { 00392 case CORBA::OMGVMCID: 00393 switch (ex.minor () & 0x00000fffu) 00394 { 00395 case 2: // No usable profile 00396 case 3: // Request cancelled 00397 case 4: // POA destroyed 00398 result = DISPATCH_FAIL; 00399 break; 00400 default: 00401 result = DISPATCH_DISCARD; 00402 } 00403 break; 00404 00405 case TAO::VMCID: 00406 default: 00407 switch (ex.minor () & BITS_5_THRU_12_MASK) 00408 { 00409 case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE: 00410 result = DISPATCH_FAIL; 00411 break; 00412 case TAO_POA_DISCARDING: 00413 case TAO_POA_HOLDING: 00414 default: 00415 result = DISPATCH_RETRY; 00416 } break; 00417 } 00418 } 00419 ACE_CATCH (CORBA::TIMEOUT, ex) 00420 { 00421 if (DEBUG_LEVEL > 0) 00422 ACE_DEBUG ((LM_ERROR, 00423 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer " 00424 "%u::dispatch_batch() %s\n"), 00425 this->proxy ()->id (), 00426 ex._info().c_str () 00427 )); 00428 result = DISPATCH_FAIL; 00429 } 00430 ACE_CATCH (CORBA::COMM_FAILURE, ex) 00431 { 00432 if (DEBUG_LEVEL > 0) 00433 ACE_DEBUG ((LM_ERROR, 00434 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer " 00435 "%u::dispatch_batch() %s\n"), 00436 this->proxy ()->id (), 00437 ex._info().c_str () 00438 )); 00439 result = DISPATCH_FAIL; 00440 } 00441 ACE_CATCH (CORBA::SystemException, ex) 00442 { 00443 if (DEBUG_LEVEL > 0) 00444 { 00445 ACE_DEBUG ((LM_ERROR, 00446 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer " 00447 "%d::dispatch_batch() SystemException %s\n"), 00448 static_cast<int> (this->proxy ()->id ()), 00449 ex._info ().c_str () 00450 )); 00451 } 00452 result = DISPATCH_DISCARD; 00453 } 00454 ACE_CATCHANY 00455 { 00456 ACE_ERROR ((LM_ERROR, 00457 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer " 00458 "%d::dispatch_batch() Caught unexpected " 00459 "exception pushing batch to consumer.\n"), 00460 static_cast<int> (this->proxy ()->id ()) 00461 )); 00462 result = DISPATCH_DISCARD; 00463 } 00464 ACE_ENDTRY; 00465 return result; 00466 } |
|
Attempt to dispatch event from a queue. Called by dispatch_pending. Deliver one or more events to the Consumer. If delivery fails, events are left in the queue (or discarded depending on QoS parameters.) Undelivered, undiscarded requests are left at the front of the queue. Overridden in sequence consumer to dispatch as an EventBatch.
Reimplemented in TAO_Notify_SequencePushConsumer. Definition at line 499 of file Consumer.cpp. References ACE_CATCHANY, ACE_DEBUG, ACE_DECLARE_NEW_ENV, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TEXT(), ACE_TRY, ACE_TRY_CHECK, ACE_Guard< ACE_LOCK >::acquire(), TAO_Notify_Method_Request_Event::complete(), DEBUG_LEVEL, ACE_Unbounded_Queue< T >::dequeue_head(), TAO_Notify_ProxySupplier::destroy(), DISPATCH_DISCARD, DISPATCH_FAIL, dispatch_request(), DISPATCH_RETRY, DISPATCH_SUCCESS, DispatchStatus, ACE_Unbounded_Queue< T >::enqueue_head(), LM_DEBUG, proxy_supplier(), ACE_Message_Block::release(), ACE_Guard< ACE_LOCK >::release(), Request_Queue, and TAO_Notify_Method_Request_Event::sequence(). Referenced by dispatch_pending().
00500 { 00501 bool result = true; 00502 TAO_Notify_Method_Request_Event_Queueable * request; 00503 if (requests.dequeue_head (request) == 0) 00504 { 00505 ace_mon.release (); 00506 DispatchStatus status = this->dispatch_request (request); 00507 switch (status) 00508 { 00509 case DISPATCH_SUCCESS: 00510 { 00511 request->complete (); 00512 request->release (); 00513 result = true; 00514 ace_mon.acquire (); 00515 break; 00516 } 00517 case DISPATCH_RETRY: 00518 { 00519 if (DEBUG_LEVEL > 0) 00520 ACE_DEBUG ((LM_DEBUG, 00521 ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"), 00522 static_cast<int> (this->proxy ()->id ()), 00523 request->sequence () 00524 )); 00525 ace_mon.acquire (); 00526 requests.enqueue_head (request); // put the failed event back where it was 00527 result = false; 00528 break; 00529 } 00530 case DISPATCH_DISCARD: 00531 { 00532 if (DEBUG_LEVEL > 0) 00533 ACE_DEBUG ((LM_DEBUG, 00534 ACE_TEXT ("(%P|%t) Consumer %d: Error during " 00535 "dispatch. Discarding event:%d.\n"), 00536 static_cast<int> (this->proxy ()->id ()), 00537 request->sequence () 00538 )); 00539 request->complete (); 00540 ace_mon.acquire (); 00541 result = true; 00542 break; 00543 } 00544 case DISPATCH_FAIL: 00545 { 00546 if (DEBUG_LEVEL > 0) 00547 ACE_DEBUG ((LM_DEBUG, 00548 ACE_TEXT ("(%P|%t) Consumer %d: Failed. " 00549 "Discarding event %d.\n"), 00550 static_cast<int> (this->proxy ()->id ()), 00551 request->sequence () 00552 )); 00553 request->complete (); 00554 ace_mon.acquire (); 00555 while (requests.dequeue_head (request) == 0) 00556 { 00557 ace_mon.release (); 00558 request->complete (); 00559 ace_mon.acquire (); 00560 } 00561 ace_mon.release (); 00562 ACE_DECLARE_NEW_ENV; 00563 ACE_TRY 00564 { 00565 this->proxy_supplier ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); 00566 ACE_TRY_CHECK; 00567 } 00568 ACE_CATCHANY 00569 { 00570 // todo is there something reasonable to do here? 00571 } 00572 ACE_ENDTRY; 00573 ace_mon.acquire (); 00574 result = true; 00575 break; 00576 } 00577 default: 00578 { 00579 ace_mon.acquire (); 00580 result = false; 00581 break; 00582 } 00583 } 00584 } 00585 return result; 00586 } |
|
Dispatch the pending events.
Definition at line 469 of file Consumer.cpp. References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), DEBUG_LEVEL, dispatch_from_queue(), TAO_Notify_Object::has_shutdown(), ACE_Unbounded_Queue< T >::is_empty(), LM_DEBUG, pending_events(), proxy_supplier(), schedule_timer(), ACE_Unbounded_Queue< T >::size(), and TAO_SYNCH_MUTEX. Referenced by TAO_Notify_SequencePushConsumer::enqueue_if_necessary(), handle_timeout(), and resume().
00470 { 00471 if (DEBUG_LEVEL > 5) 00472 ACE_DEBUG ((LM_DEBUG, 00473 ACE_TEXT ("Consumer %d dispatching pending events. Queue size: %d\n"), 00474 static_cast<int> (this->proxy ()->id ()), 00475 this->pending_events().size () 00476 )); 00477 00478 // lock ourselves in memory for the duration 00479 TAO_Notify_Consumer::Ptr self_grd (this); 00480 00481 // dispatch events until: 1) the queue is empty; 2) the proxy shuts down, or 3) the dispatch fails 00482 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ()); 00483 bool ok = true; 00484 while (ok 00485 && !this->proxy_supplier ()->has_shutdown () 00486 && !this->pending_events().is_empty ()) 00487 { 00488 if (! dispatch_from_queue ( this->pending_events(), ace_mon)) 00489 { 00490 this->schedule_timer (true); 00491 ok = false; 00492 } 00493 } 00494 } |
|
Definition at line 222 of file Consumer.cpp. References ACE_CATCH, ACE_CATCHANY, ACE_DEBUG, ACE_DECLARE_NEW_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_TEXT(), ACE_TRY, ACE_TRY_CHECK, DEBUG_LEVEL, DISPATCH_DISCARD, DISPATCH_FAIL, DISPATCH_RETRY, DISPATCH_SUCCESS, DispatchStatus, TAO_Notify_Method_Request_Event::event(), LM_DEBUG, LM_ERROR, TAO_Notify_Event::push(), TAO_Notify_Method_Request_Event::sequence(), TAO_Notify_Method_Request_Event::should_retry(), TAO_INVOCATION_SEND_REQUEST_MINOR_CODE, TAO_POA_DISCARDING, and TAO_POA_HOLDING. Referenced by deliver(), and dispatch_from_queue().
00223 { 00224 DispatchStatus result = DISPATCH_SUCCESS; 00225 ACE_DECLARE_NEW_ENV; 00226 ACE_TRY 00227 { 00228 request->event ()->push (this ACE_ENV_ARG_PARAMETER); 00229 ACE_TRY_CHECK; 00230 if (DEBUG_LEVEL > 8) 00231 ACE_DEBUG ((LM_DEBUG, 00232 ACE_TEXT ("Consumer %d dispatched single event %d.\n"), 00233 static_cast<int> (this->proxy ()->id ()), 00234 request->sequence () 00235 )); 00236 } 00237 ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex) 00238 { 00239 if (DEBUG_LEVEL > 0) 00240 { 00241 ACE_DEBUG ((LM_ERROR, 00242 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push " 00243 "(request) %s\n"), 00244 static_cast<int> (this->proxy ()->id ()), 00245 ex._info ().c_str () 00246 )); 00247 } 00248 result = DISPATCH_FAIL; 00249 } 00250 ACE_CATCH (CORBA::TRANSIENT, ex) 00251 { 00252 if (DEBUG_LEVEL > 0) 00253 ACE_DEBUG ((LM_ERROR, 00254 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push " 00255 "(request) Transient (minor=%d) %s\n"), 00256 static_cast<int> (this->proxy ()->id ()), 00257 ex.minor (), 00258 ex._info ().c_str () 00259 )); 00260 const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u; 00261 switch (ex.minor () & 0xfffff000u) 00262 { 00263 case CORBA::OMGVMCID: 00264 switch (ex.minor () & 0x00000fffu) 00265 { 00266 case 2: // No usable profile 00267 case 3: // Request cancelled 00268 case 4: // POA destroyed 00269 result = DISPATCH_FAIL; 00270 break; 00271 default: 00272 result = DISPATCH_DISCARD; 00273 } 00274 break; 00275 00276 case TAO::VMCID: 00277 default: 00278 switch (ex.minor () & BITS_5_THRU_12_MASK) 00279 { 00280 case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE: 00281 result = DISPATCH_FAIL; 00282 break; 00283 case TAO_POA_DISCARDING: 00284 case TAO_POA_HOLDING: 00285 default: 00286 result = DISPATCH_RETRY; 00287 } break; 00288 } 00289 } 00290 ACE_CATCH (CORBA::TIMEOUT, ex) 00291 { 00292 if (DEBUG_LEVEL > 0) 00293 ACE_DEBUG ((LM_ERROR, 00294 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %u::push " 00295 "(request) %s\n"), 00296 this->proxy ()->id (), 00297 ex._info().c_str () 00298 )); 00299 result = DISPATCH_FAIL; 00300 } 00301 ACE_CATCH (CORBA::COMM_FAILURE, ex) 00302 { 00303 if (DEBUG_LEVEL > 0) 00304 ACE_DEBUG ((LM_ERROR, 00305 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %u::push " 00306 "(request) %s\n"), 00307 this->proxy ()->id (), 00308 ex._info().c_str () 00309 )); 00310 result = DISPATCH_FAIL; 00311 } 00312 ACE_CATCH (CORBA::SystemException, ex) 00313 { 00314 if (DEBUG_LEVEL > 0) 00315 { 00316 ACE_DEBUG ((LM_ERROR, 00317 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push " 00318 "(request) SystemException %s\n"), 00319 static_cast<int> (this->proxy ()->id ()), 00320 ex._info ().c_str () 00321 )); 00322 } 00323 result = DISPATCH_DISCARD; 00324 } 00325 ACE_CATCHANY 00326 { 00327 ACE_ERROR ( (LM_ERROR, 00328 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push " 00329 "(request) Caught unexpected exception " 00330 "pushing event to consumer.\n"), 00331 static_cast<int> (this->proxy ()->id ()) 00332 )); 00333 result = DISPATCH_DISCARD; 00334 } 00335 ACE_ENDTRY; 00336 00337 // for persistent events that haven't timed out 00338 // convert "FAIL" & "DISCARD" to "RETRY" 00339 // for transient events, convert RETRY to DISCARD (hey, best_effort.) 00340 if (result == DISPATCH_FAIL || result == DISPATCH_DISCARD) 00341 { 00342 if (request->should_retry ()) 00343 { 00344 result = DISPATCH_RETRY; 00345 } 00346 } 00347 else if (result == DISPATCH_RETRY) 00348 { 00349 if (! request->should_retry ()) 00350 { 00351 result = DISPATCH_DISCARD; 00352 } 00353 } 00354 00355 return result; 00356 } |
|
Implementation of Peer specific dispatch_updates.
Implements TAO_Notify_Peer. Definition at line 683 of file Consumer.cpp. References ACE_ENV_ARG_PARAMETER, CosNotification::EventTypeSeq, CORBA::is_nil(), and publish_.
00685 { 00686 if (!CORBA::is_nil (this->publish_.in ())) 00687 this->publish_->offer_change (added, removed ACE_ENV_ARG_PARAMETER); 00688 } |
|
Add request to a queue if necessary. Overridden by sequence consumer to "always" put incoming events into the queue.
Reimplemented in TAO_Notify_SequencePushConsumer. Definition at line 101 of file Consumer.cpp. References ACE_CHECK_RETURN, ACE_DEBUG, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_GUARD_RETURN, ACE_NEW_THROW_EX, ACE_TEXT(), DEBUG_LEVEL, ACE_Unbounded_Queue< T >::enqueue_tail(), ACE_Unbounded_Queue< T >::is_empty(), is_suspended_, LM_DEBUG, pending_events(), schedule_timer(), and TAO_SYNCH_MUTEX. Referenced by deliver().
00102 { 00103 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock (), false); 00104 if (! this->pending_events().is_empty ()) 00105 { 00106 if (DEBUG_LEVEL > 3) 00107 ACE_DEBUG ((LM_DEBUG, 00108 ACE_TEXT ("Consumer %d: enqueuing another event. %d\n"), 00109 static_cast<int> (this->proxy ()->id ()), 00110 request->sequence () 00111 )); 00112 TAO_Notify_Event::Ptr event ( 00113 request->event ()->queueable_copy (ACE_ENV_SINGLE_ARG_PARAMETER)); 00114 ACE_CHECK_RETURN (false); 00115 TAO_Notify_Method_Request_Event_Queueable * queue_entry; 00116 ACE_NEW_THROW_EX (queue_entry, 00117 TAO_Notify_Method_Request_Event_Queueable (*request, 00118 event), 00119 CORBA::NO_MEMORY ()); 00120 ACE_CHECK_RETURN (false); 00121 this->pending_events().enqueue_tail (queue_entry); 00122 this->schedule_timer (false); 00123 return true; 00124 } 00125 if (this->is_suspended_ == 1) 00126 { 00127 if (DEBUG_LEVEL > 3) 00128 ACE_DEBUG ((LM_DEBUG, 00129 ACE_TEXT ("Suspended Consumer %d enqueing event. %d\n"), 00130 static_cast<int> (this->proxy ()->id ()), 00131 request->sequence () 00132 )); 00133 TAO_Notify_Event::Ptr event ( 00134 request->event ()->queueable_copy (ACE_ENV_SINGLE_ARG_PARAMETER)); 00135 ACE_CHECK_RETURN (false); 00136 TAO_Notify_Method_Request_Event_Queueable * queue_entry; 00137 ACE_NEW_THROW_EX (queue_entry, 00138 TAO_Notify_Method_Request_Event_Queueable (*request, 00139 event), 00140 CORBA::NO_MEMORY ()); 00141 ACE_CHECK_RETURN (false); 00142 this->pending_events().enqueue_tail (queue_entry); 00143 this->schedule_timer (false); 00144 return true; 00145 } 00146 return false; 00147 } |
|
Definition at line 76 of file Consumer.cpp. References ACE_CHECK, ACE_DEBUG, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_GUARD, ACE_NEW_THROW_EX, ACE_TEXT(), DEBUG_LEVEL, ACE_Unbounded_Queue< T >::enqueue_tail(), LM_DEBUG, pending_events(), TAO_Notify_Method_Request_Event::sequence(), and TAO_SYNCH_MUTEX. Referenced by deliver(), and TAO_Notify_SequencePushConsumer::enqueue_if_necessary().
00079 { 00080 TAO_Notify_Event::Ptr event ( 00081 request->event ()->queueable_copy (ACE_ENV_SINGLE_ARG_PARAMETER)); 00082 ACE_CHECK; 00083 00084 TAO_Notify_Method_Request_Event_Queueable * queue_entry; 00085 ACE_NEW_THROW_EX (queue_entry, 00086 TAO_Notify_Method_Request_Event_Queueable (*request, event), 00087 CORBA::NO_MEMORY ()); 00088 ACE_CHECK; 00089 00090 if (DEBUG_LEVEL > 3) ACE_DEBUG ( (LM_DEBUG, 00091 ACE_TEXT ("Consumer %d: enqueue_request (%d) @%@.\n"), 00092 static_cast<int> (this->proxy ()->id ()), 00093 request->sequence (), 00094 request 00095 )); 00096 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ()); 00097 this->pending_events().enqueue_tail (queue_entry); 00098 } |
|
Reimplemented from ACE_Event_Handler. Definition at line 654 of file Consumer.cpp. References ACE_CATCHALL, ACE_DECLARE_NEW_ENV, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, and dispatch_pending().
00655 { 00656 TAO_Notify_Consumer::Ptr grd (this); 00657 this->timer_id_ = -1; // This must come first, because dispatch_pending may try to resched 00658 ACE_DECLARE_NEW_ENV; 00659 ACE_TRY 00660 { 00661 this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER); 00662 ACE_TRY_CHECK; 00663 } 00664 ACE_CATCHALL 00665 { 00666 } 00667 ACE_ENDTRY; 00668 00669 return 0; 00670 } |
|
Is the connection suspended?
Definition at line 16 of file Consumer.inl. References is_suspended_. Referenced by schedule_timer().
00017 { 00018 return this->is_suspended_; 00019 } |
|
= Protected Data Members
Definition at line 9 of file Consumer.inl. References ACE_ASSERT, ACE_Auto_Basic_Ptr< X >::get(), and pending_events_. Referenced by dispatch_pending(), TAO_Notify_SequencePushConsumer::enqueue_if_necessary(), enqueue_if_necessary(), enqueue_request(), and TAO_Notify_Consumer().
00010 { 00011 ACE_ASSERT( pending_events_.get() != 0 ); 00012 return *pending_events_; 00013 } |
|
Access Base Proxy.
Implements TAO_Notify_Peer. Definition at line 56 of file Consumer.cpp. References proxy_supplier().
00057 { 00058 return this->proxy_supplier (); 00059 } |
|
Get the shared Proxy Lock.
Definition at line 691 of file Consumer.cpp. References TAO_Notify_Object::lock_.
|
|
Access Specific Proxy.
Definition at line 697 of file Consumer.cpp. Referenced by deliver(), TAO_Notify_SequencePushConsumer::dispatch_from_queue(), dispatch_from_queue(), dispatch_pending(), and proxy().
00698 { 00699 return this->proxy_; 00700 } |
|
Push a batch of events to this consumer.
Implemented in TAO_Notify_PushConsumer, TAO_Notify_SequencePushConsumer, and TAO_Notify_StructuredPushConsumer. |
|
Push to this consumer.
Implemented in TAO_Notify_PushConsumer, TAO_Notify_SequencePushConsumer, and TAO_Notify_StructuredPushConsumer. |
|
Push to this consumer.
Implemented in TAO_Notify_PushConsumer, TAO_Notify_SequencePushConsumer, and TAO_Notify_StructuredPushConsumer. Referenced by dispatch_batch(). |
|
Override, Peer::qos_changed.
Reimplemented from TAO_Notify_Peer. Definition at line 62 of file Consumer.cpp. References max_batch_size_, and TAO_Notify_QoSProperties::maximum_batch_size().
00063 { 00064 this->max_batch_size_ = qos_properties.maximum_batch_size (); 00065 } |
|
on reconnect we need to move events from the old consumer to the new one Implemented in TAO_Notify_PushConsumer, TAO_Notify_SequencePushConsumer, and TAO_Notify_StructuredPushConsumer. |
|
Resume Connection.
Definition at line 68 of file Consumer.cpp. References ACE_ENV_SINGLE_ARG_PARAMETER, dispatch_pending(), and is_suspended_.
00069 { 00070 this->is_suspended_ = 0; 00071 00072 this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER); 00073 } |
|
Schedule timer.
Definition at line 591 of file Consumer.cpp. References ACE_ASSERT, ACE_DEBUG, ACE_ERROR, ACE_TEXT(), DEBUG_LEVEL, DEFAULT_RETRY_TIMEOUT, is_suspended(), TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, LM_ERROR, ACE_Time_Value::msec(), and pacing_. Referenced by deliver(), dispatch_pending(), TAO_Notify_SequencePushConsumer::enqueue_if_necessary(), enqueue_if_necessary(), TAO_Notify_StructuredPushConsumer::reconnect_from_consumer(), TAO_Notify_SequencePushConsumer::reconnect_from_consumer(), and TAO_Notify_PushConsumer::reconnect_from_consumer().
00592 { 00593 if (this->timer_id_ != -1) 00594 { 00595 return; // We only want a single timeout scheduled. 00596 } 00597 // don't schedule timer if there's nothing that can be done 00598 if (this->is_suspended ()) 00599 { 00600 return; 00601 } 00602 00603 ACE_ASSERT (this->timer_.get() != 0); 00604 00605 // If we're scheduling the timer due to an error then we want to 00606 // use the retry timeout, otherwise we'll assume that the pacing 00607 // interval is sufficient for now. 00608 ACE_Time_Value tv (DEFAULT_RETRY_TIMEOUT); 00609 00610 if (! is_error) 00611 { 00612 if (this->pacing_.is_valid ()) 00613 { 00614 tv = ORBSVCS_Time::to_Time_Value (this->pacing_.value ()); 00615 } 00616 } 00617 00618 if (DEBUG_LEVEL > 5) 00619 { 00620 ACE_DEBUG ((LM_DEBUG, 00621 ACE_TEXT ("Consumer %d: scheduling pacing/retry for %dms.\n"), 00622 static_cast<int> (this->proxy ()->id ()), tv.msec ())); 00623 } 00624 00625 this->timer_id_ = 00626 this->timer_->schedule_timer (this, tv, ACE_Time_Value::zero); 00627 if (this->timer_id_ == -1) 00628 { 00629 ACE_ERROR ((LM_ERROR, 00630 ACE_TEXT ("TAO_Notify_Consumer %d::schedule_timer () " 00631 "Error scheduling timer.\n"), 00632 static_cast<int> (this->proxy ()->id ()) 00633 )); 00634 } 00635 } |
|
Shutdown the consumer.
Reimplemented from TAO_Notify_Peer. Definition at line 673 of file Consumer.cpp. References cancel_timer().
00674 { 00675 if (this->timer_.isSet ()) 00676 { 00677 this->cancel_timer (); 00678 this->timer_.reset (); 00679 } 00680 } |
|
Suspend Connection.
|
|
Suspended Flag.
Definition at line 164 of file Consumer.h. Referenced by enqueue_if_necessary(), is_suspended(), and resume(). |
|
Max. batch size.
Definition at line 173 of file Consumer.h. Referenced by qos_changed(). |
|
The Pacing Interval.
Definition at line 170 of file Consumer.h. Referenced by schedule_timer(). |
|
Events pending to be delivered.
Definition at line 184 of file Consumer.h. Referenced by pending_events(), and TAO_Notify_Consumer(). |
|
The Proxy that we associate with.
Definition at line 161 of file Consumer.h. |
|
Interface that accepts offer_changes.
Definition at line 167 of file Consumer.h. Referenced by dispatch_updates_i(). |
|
The Timer Manager that we use.
Definition at line 179 of file Consumer.h. |
|
Timer Id.
Definition at line 176 of file Consumer.h. |