#include <Consumer.h>
Inheritance diagram for TAO_Notify_Consumer:
Public Types | |
DISPATCH_SUCCESS | |
DISPATCH_RETRY | |
DISPATCH_DISCARD | |
DISPATCH_FAIL | |
enum | DispatchStatus { DISPATCH_SUCCESS, DISPATCH_RETRY, DISPATCH_DISCARD, DISPATCH_FAIL } |
Status returned from dispatch attempts. More... | |
Public Member Functions | |
TAO_Notify_Consumer (TAO_Notify_ProxySupplier *proxy) | |
Constructor. | |
virtual | ~TAO_Notify_Consumer () |
Destructor. | |
TAO_Notify_ProxySupplier * | proxy_supplier (void) |
Access Specific Proxy. | |
virtual TAO_Notify_Proxy * | proxy (void) |
Access Base Proxy. | |
void | deliver (TAO_Notify_Method_Request_Event *request) |
Dispatch Event to consumer. | |
virtual void | push (const CORBA::Any &event)=0 |
Push event to this consumer. | |
virtual void | push (const CosNotification::StructuredEvent &event)=0 |
Push event to this consumer. | |
virtual void | push (const CosNotification::EventBatch &event)=0 |
Push a batch of events to this consumer. | |
DispatchStatus | dispatch_batch (const CosNotification::EventBatch &batch) |
Dispatch the batch of events to the attached consumer. | |
void | dispatch_pending (void) |
Dispatch the pending events. | |
CORBA::Boolean | is_suspended (void) |
Is the connection suspended? | |
void | suspend (void) |
Suspend Connection. | |
void | resume (void) |
Resume Connection. | |
virtual void | shutdown (void) |
Shutdown the consumer. | |
virtual void | reconnect_from_consumer (TAO_Notify_Consumer *old_consumer)=0 |
virtual void | qos_changed (const TAO_Notify_QoSProperties &qos_properties) |
Override, Peer::qos_changed. | |
void | assume_pending_events (TAO_Notify_Consumer &rhs) |
Protected Types | |
typedef ACE_Unbounded_Queue< TAO_Notify_Method_Request_Event_Queueable * > | Request_Queue |
Protected Member Functions | |
DispatchStatus | dispatch_request (TAO_Notify_Method_Request_Event *request) |
virtual bool | dispatch_from_queue (Request_Queue &requests, ACE_Guard< TAO_SYNCH_MUTEX > &ace_mon) |
Attempt to dispatch event from a queue. | |
void | enqueue_request (TAO_Notify_Method_Request_Event *request) |
virtual bool | enqueue_if_necessary (TAO_Notify_Method_Request_Event *request) |
virtual void | dispatch_updates_i (const CosNotification::EventTypeSeq &added, const CosNotification::EventTypeSeq &removed) |
Implementation of Peer specific dispatch_updates. | |
TAO_SYNCH_MUTEX * | proxy_lock (void) |
Get the shared Proxy Lock. | |
virtual int | handle_timeout (const ACE_Time_Value ¤t_time, const void *act=0) |
void | schedule_timer (bool is_error=false) |
Schedule timer. | |
void | cancel_timer (void) |
Cancel timer. | |
Request_Queue & | pending_events () |
= Protected Data Members | |
Protected Attributes | |
TAO_Notify_ProxySupplier * | proxy_ |
The Proxy that we associate with. | |
CORBA::Boolean | is_suspended_ |
Suspended Flag. | |
CosNotifyComm::NotifyPublish_var | publish_ |
Interface that accepts offer_changes. | |
bool | have_not_yet_verified_publish_ |
const TAO_Notify_Property_Time & | pacing_ |
The Pacing Interval. | |
TAO_Notify_Property_Long | max_batch_size_ |
Max. batch size. | |
long | timer_id_ |
Timer Id. | |
TAO_Notify_Timer::Ptr | timer_ |
The Timer Manager that we use. | |
Private Attributes | |
ACE_Auto_Ptr< Request_Queue > | pending_events_ |
Events pending to be delivered. |
Definition at line 44 of file Consumer.h.
typedef ACE_Unbounded_Queue<TAO_Notify_Method_Request_Event_Queueable *> TAO_Notify_Consumer::Request_Queue [protected] |
Definition at line 113 of file Consumer.h.
Status returned from dispatch attempts.
Definition at line 51 of file Consumer.h.
00051 { 00052 DISPATCH_SUCCESS, 00053 DISPATCH_RETRY, // retry this message 00054 DISPATCH_DISCARD, // discard this message 00055 DISPATCH_FAIL}; // discard all messages and disconnect consumer
TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_Notify_Consumer::TAO_Notify_Consumer | ( | TAO_Notify_ProxySupplier * | proxy | ) |
Constructor.
Definition at line 31 of file Consumer.cpp.
References ACE_NEW, pending_events(), pending_events_, TAO_Notify_Refcountable_Guard_T< T >::reset(), and timer_.
00032 : proxy_ (proxy) 00033 , is_suspended_ (0) 00034 , have_not_yet_verified_publish_ (true) 00035 , pacing_ (proxy->qos_properties_.pacing_interval ()) 00036 , max_batch_size_ (CosNotification::MaximumBatchSize, 0) 00037 , timer_id_ (-1) 00038 , timer_ (0) 00039 { 00040 Request_Queue* pending_events = 0; 00041 ACE_NEW (pending_events, TAO_Notify_Consumer::Request_Queue ()); 00042 this->pending_events_.reset( pending_events ); 00043 00044 this->timer_.reset( this->proxy ()->timer () ); 00045 }
TAO_Notify_Consumer::~TAO_Notify_Consumer | ( | ) | [virtual] |
Destructor.
Definition at line 47 of file Consumer.cpp.
References TAO_Notify_Refcountable_Guard_T< T >::reset(), and timer_.
00048 { 00049 if (this->timer_.isSet()) 00050 { 00051 this->cancel_timer (); 00052 this->timer_.reset (); 00053 } 00054 }
void TAO_Notify_Consumer::assume_pending_events | ( | TAO_Notify_Consumer & | rhs | ) |
Take the pending queue from the rhs, cancel it's timer and schedule our timer. The caller should have locked the proxy lock before calling this method.
Definition at line 684 of file Consumer.cpp.
References cancel_timer(), ACE_Unbounded_Queue< T >::is_empty(), TAO_Notify_Refcountable_Guard_T< T >::isSet(), pending_events(), pending_events_, schedule_timer(), and timer_.
00685 { 00686 // No need to lock the this proxy's lock. It should have been locked 00687 // by the caller. 00688 00689 // If the original consumer has pending events 00690 if (!rhs.pending_events ().is_empty ()) 00691 { 00692 // We will take them away and cancel it's timer 00693 this->pending_events_.reset (rhs.pending_events_.release ()); 00694 if (rhs.timer_.isSet ()) 00695 { 00696 rhs.cancel_timer (); 00697 } 00698 00699 // Schedule a new timer for us, which will use the default 00700 // timer value (unless we have a valid pacing interval). 00701 this->schedule_timer (); 00702 } 00703 }
void TAO_Notify_Consumer::cancel_timer | ( | void | ) | [protected] |
Cancel timer.
Definition at line 617 of file Consumer.cpp.
References ACE_DEBUG, ACE_TEXT(), DEBUG_LEVEL, LM_DEBUG, timer_, and timer_id_.
Referenced by assume_pending_events().
00618 { 00619 if (this->timer_.isSet() && this->timer_id_ != -1) 00620 { 00621 if (DEBUG_LEVEL > 5) 00622 ACE_DEBUG ((LM_DEBUG, 00623 ACE_TEXT ("Consumer %d canceling dispatch timer.\n"), 00624 static_cast<int> (this->proxy ()->id ()) 00625 )); 00626 00627 this->timer_->cancel_timer (timer_id_); 00628 } 00629 this->timer_id_ = -1; 00630 }
void TAO_Notify_Consumer::deliver | ( | TAO_Notify_Method_Request_Event * | request | ) |
Dispatch Event to consumer.
Definition at line 144 of file Consumer.cpp.
References ACE_DEBUG, ACE_TEXT(), TAO_Notify_Method_Request_Event::complete(), DEBUG_LEVEL, TAO_Notify_ProxySupplier::destroy(), DISPATCH_DISCARD, DISPATCH_FAIL, dispatch_request(), DISPATCH_RETRY, DISPATCH_SUCCESS, enqueue_if_necessary(), enqueue_request(), LM_DEBUG, proxy_supplier(), schedule_timer(), and TAO_Notify_Method_Request_Event::sequence().
Referenced by TAO_Notify_Method_Request_Dispatch::execute_i().
00145 { 00146 // Increment reference counts (safely) to prevent this object and its proxy 00147 // from being deleted while the push is in progress. 00148 TAO_Notify_Proxy::Ptr proxy_guard (this->proxy ()); 00149 bool queued = enqueue_if_necessary (request); 00150 if (!queued) 00151 { 00152 DispatchStatus status = this->dispatch_request (request); 00153 switch (status) 00154 { 00155 case DISPATCH_SUCCESS: 00156 { 00157 request->complete (); 00158 break; 00159 } 00160 case DISPATCH_RETRY: 00161 { 00162 if (DEBUG_LEVEL > 1) 00163 ACE_DEBUG ((LM_DEBUG, 00164 ACE_TEXT ("Consumer %d enqueing event %d due ") 00165 ACE_TEXT ("to failed dispatch.\n"), 00166 static_cast<int> (this->proxy ()->id ()), 00167 request->sequence ())); 00168 this->enqueue_request (request); 00169 this->schedule_timer (true); 00170 break; 00171 } 00172 case DISPATCH_DISCARD: 00173 { 00174 if (DEBUG_LEVEL > 0) 00175 ACE_DEBUG ((LM_DEBUG, 00176 ACE_TEXT ("(%P|%t) Consumer %d: Error during " 00177 "direct dispatch. Discarding event:%d.\n"), 00178 static_cast<int> (this->proxy ()->id ()), 00179 request->sequence () 00180 )); 00181 request->complete (); 00182 break; 00183 } 00184 case DISPATCH_FAIL: 00185 { 00186 if (DEBUG_LEVEL > 0) 00187 ACE_DEBUG ((LM_DEBUG, 00188 ACE_TEXT ("(%P|%t) Consumer %d: Failed during " 00189 "direct dispatch :%d. Discarding event.\n"), 00190 static_cast<int> (this->proxy ()->id ()), 00191 request->sequence () 00192 )); 00193 request->complete (); 00194 try 00195 { 00196 this->proxy_supplier ()->destroy (); 00197 } 00198 catch (const CORBA::Exception&) 00199 { 00200 // todo is there something meaningful we can do here? 00201 ; 00202 } 00203 break; 00204 } 00205 } 00206 } 00207 }
TAO_Notify_Consumer::DispatchStatus TAO_Notify_Consumer::dispatch_batch | ( | const CosNotification::EventBatch & | batch | ) |
Dispatch the batch of events to the attached consumer.
Definition at line 344 of file Consumer.cpp.
References CORBA::SystemException::_info(), ACE_DEBUG, ACE_ERROR, ACE_TEXT(), ACE_String_Base< CHAR >::c_str(), DEBUG_LEVEL, DISPATCH_DISCARD, DISPATCH_FAIL, DISPATCH_RETRY, DISPATCH_SUCCESS, LM_ERROR, CORBA::OMGVMCID, push(), TAO_INVOCATION_SEND_REQUEST_MINOR_CODE, TAO_POA_DISCARDING, TAO_POA_HOLDING, and TAO::VMCID.
Referenced by TAO_Notify_SequencePushConsumer::dispatch_from_queue().
00345 { 00346 DispatchStatus result = DISPATCH_SUCCESS; 00347 try 00348 { 00349 this->push (batch); 00350 } 00351 catch (const CORBA::OBJECT_NOT_EXIST& ex) 00352 { 00353 if (DEBUG_LEVEL > 0) 00354 ACE_DEBUG ((LM_ERROR, 00355 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer " 00356 "%d::dispatch_batch() %s\n"), 00357 static_cast<int> (this->proxy ()->id ()), 00358 ex._info ().c_str () 00359 )); 00360 result = DISPATCH_FAIL; 00361 } 00362 catch (const CORBA::TRANSIENT& ex) 00363 { 00364 if (DEBUG_LEVEL > 0) 00365 ACE_DEBUG ((LM_ERROR, 00366 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer " 00367 "%d::dispatch_batch() Transient (minor=%d) %s\n"), 00368 static_cast<int> (this->proxy ()->id ()), 00369 ex.minor (), 00370 ex._info ().c_str () 00371 )); 00372 const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u; 00373 switch (ex.minor () & 0xfffff000u) 00374 { 00375 case CORBA::OMGVMCID: 00376 switch (ex.minor () & 0x00000fffu) 00377 { 00378 case 2: // No usable profile 00379 case 3: // Request cancelled 00380 case 4: // POA destroyed 00381 result = DISPATCH_FAIL; 00382 break; 00383 default: 00384 result = DISPATCH_DISCARD; 00385 } 00386 break; 00387 00388 case TAO::VMCID: 00389 default: 00390 switch (ex.minor () & BITS_5_THRU_12_MASK) 00391 { 00392 case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE: 00393 result = DISPATCH_FAIL; 00394 break; 00395 case TAO_POA_DISCARDING: 00396 case TAO_POA_HOLDING: 00397 default: 00398 result = DISPATCH_RETRY; 00399 } break; 00400 } 00401 } 00402 catch (const CORBA::TIMEOUT& ex) 00403 { 00404 if (DEBUG_LEVEL > 0) 00405 ACE_DEBUG ((LM_ERROR, 00406 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer " 00407 "%u::dispatch_batch() %s\n"), 00408 this->proxy ()->id (), 00409 ex._info().c_str () 00410 )); 00411 result = DISPATCH_FAIL; 00412 } 00413 catch (const CORBA::COMM_FAILURE& ex) 00414 { 00415 if (DEBUG_LEVEL > 0) 00416 ACE_DEBUG ((LM_ERROR, 00417 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer " 00418 "%u::dispatch_batch() %s\n"), 00419 this->proxy ()->id (), 00420 ex._info().c_str () 00421 )); 00422 result = DISPATCH_FAIL; 00423 } 00424 catch (const CORBA::SystemException& ex) 00425 { 00426 if (DEBUG_LEVEL > 0) 00427 { 00428 ACE_DEBUG ((LM_ERROR, 00429 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer " 00430 "%d::dispatch_batch() SystemException %s\n"), 00431 static_cast<int> (this->proxy ()->id ()), 00432 ex._info ().c_str () 00433 )); 00434 } 00435 result = DISPATCH_DISCARD; 00436 } 00437 catch (const CORBA::Exception&) 00438 { 00439 ACE_ERROR ((LM_ERROR, 00440 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer " 00441 "%d::dispatch_batch() Caught unexpected " 00442 "exception pushing batch to consumer.\n"), 00443 static_cast<int> (this->proxy ()->id ()) 00444 )); 00445 result = DISPATCH_DISCARD; 00446 } 00447 return result; 00448 }
bool TAO_Notify_Consumer::dispatch_from_queue | ( | Request_Queue & | requests, | |
ACE_Guard< TAO_SYNCH_MUTEX > & | ace_mon | |||
) | [protected, virtual] |
Attempt to dispatch event from a queue.
Called by dispatch_pending. Deliver one or more events to the Consumer. If delivery fails, events are left in the queue (or discarded depending on QoS parameters.) Undelivered, undiscarded requests are left at the front of the queue. Overridden in sequence consumer to dispatch as an EventBatch.
Definition at line 481 of file Consumer.cpp.
References ACE_DEBUG, ACE_TEXT(), ACE_Guard< ACE_LOCK >::acquire(), TAO_Notify_Method_Request_Event::complete(), DEBUG_LEVEL, ACE_Unbounded_Queue< T >::dequeue_head(), TAO_Notify_ProxySupplier::destroy(), DISPATCH_DISCARD, DISPATCH_FAIL, dispatch_request(), DISPATCH_RETRY, DISPATCH_SUCCESS, ACE_Unbounded_Queue< T >::enqueue_head(), LM_DEBUG, proxy_supplier(), ACE_Message_Block::release(), ACE_Guard< ACE_LOCK >::release(), and TAO_Notify_Method_Request_Event::sequence().
Referenced by dispatch_pending().
00482 { 00483 bool result = true; 00484 TAO_Notify_Method_Request_Event_Queueable * request; 00485 if (requests.dequeue_head (request) == 0) 00486 { 00487 ace_mon.release (); 00488 DispatchStatus status = this->dispatch_request (request); 00489 switch (status) 00490 { 00491 case DISPATCH_SUCCESS: 00492 { 00493 request->complete (); 00494 request->release (); 00495 result = true; 00496 ace_mon.acquire (); 00497 break; 00498 } 00499 case DISPATCH_RETRY: 00500 { 00501 if (DEBUG_LEVEL > 0) 00502 ACE_DEBUG ((LM_DEBUG, 00503 ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"), 00504 static_cast<int> (this->proxy ()->id ()), 00505 request->sequence () 00506 )); 00507 ace_mon.acquire (); 00508 requests.enqueue_head (request); // put the failed event back where it was 00509 result = false; 00510 break; 00511 } 00512 case DISPATCH_DISCARD: 00513 { 00514 if (DEBUG_LEVEL > 0) 00515 ACE_DEBUG ((LM_DEBUG, 00516 ACE_TEXT ("(%P|%t) Consumer %d: Error during " 00517 "dispatch. Discarding event:%d.\n"), 00518 static_cast<int> (this->proxy ()->id ()), 00519 request->sequence () 00520 )); 00521 request->complete (); 00522 ace_mon.acquire (); 00523 result = true; 00524 break; 00525 } 00526 case DISPATCH_FAIL: 00527 { 00528 if (DEBUG_LEVEL > 0) 00529 ACE_DEBUG ((LM_DEBUG, 00530 ACE_TEXT ("(%P|%t) Consumer %d: Failed. " 00531 "Discarding event %d.\n"), 00532 static_cast<int> (this->proxy ()->id ()), 00533 request->sequence () 00534 )); 00535 request->complete (); 00536 ace_mon.acquire (); 00537 while (requests.dequeue_head (request) == 0) 00538 { 00539 ace_mon.release (); 00540 request->complete (); 00541 ace_mon.acquire (); 00542 } 00543 ace_mon.release (); 00544 try 00545 { 00546 this->proxy_supplier ()->destroy (); 00547 } 00548 catch (const CORBA::Exception&) 00549 { 00550 // todo is there something reasonable to do here? 00551 } 00552 ace_mon.acquire (); 00553 result = true; 00554 break; 00555 } 00556 default: 00557 { 00558 ace_mon.acquire (); 00559 result = false; 00560 break; 00561 } 00562 } 00563 } 00564 return result; 00565 }
void TAO_Notify_Consumer::dispatch_pending | ( | void | ) |
Dispatch the pending events.
Definition at line 451 of file Consumer.cpp.
References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), DEBUG_LEVEL, dispatch_from_queue(), ACE_Unbounded_Queue< T >::is_empty(), LM_DEBUG, pending_events(), schedule_timer(), and TAO_SYNCH_MUTEX.
Referenced by handle_timeout(), and resume().
00452 { 00453 if (DEBUG_LEVEL > 5) 00454 ACE_DEBUG ((LM_DEBUG, 00455 ACE_TEXT ("Consumer %d dispatching pending events. Queue size: %d\n"), 00456 static_cast<int> (this->proxy ()->id ()), 00457 this->pending_events().size () 00458 )); 00459 00460 // lock ourselves in memory for the duration 00461 TAO_Notify_Consumer::Ptr self_grd (this); 00462 00463 // dispatch events until: 1) the queue is empty; 2) the proxy shuts down, or 3) the dispatch fails 00464 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ()); 00465 bool ok = true; 00466 while (ok 00467 && !this->proxy_supplier ()->has_shutdown () 00468 && !this->pending_events().is_empty ()) 00469 { 00470 if (! dispatch_from_queue ( this->pending_events(), ace_mon)) 00471 { 00472 this->schedule_timer (true); 00473 ok = false; 00474 } 00475 } 00476 }
TAO_Notify_Consumer::DispatchStatus TAO_Notify_Consumer::dispatch_request | ( | TAO_Notify_Method_Request_Event * | request | ) | [protected] |
Definition at line 210 of file Consumer.cpp.
References CORBA::SystemException::_info(), ACE_DEBUG, ACE_ERROR, ACE_TEXT(), ACE_String_Base< CHAR >::c_str(), DEBUG_LEVEL, DISPATCH_DISCARD, DISPATCH_FAIL, DISPATCH_RETRY, DISPATCH_SUCCESS, TAO_Notify_Method_Request_Event::event(), LM_DEBUG, LM_ERROR, CORBA::OMGVMCID, TAO_Notify_Event::push(), TAO_Notify_Method_Request_Event::sequence(), TAO_Notify_Method_Request_Event::should_retry(), TAO_INVOCATION_SEND_REQUEST_MINOR_CODE, TAO_POA_DISCARDING, TAO_POA_HOLDING, and TAO::VMCID.
Referenced by deliver(), and dispatch_from_queue().
00211 { 00212 DispatchStatus result = DISPATCH_SUCCESS; 00213 try 00214 { 00215 request->event ()->push (this); 00216 if (DEBUG_LEVEL > 8) 00217 ACE_DEBUG ((LM_DEBUG, 00218 ACE_TEXT ("Consumer %d dispatched single event %d.\n"), 00219 static_cast<int> (this->proxy ()->id ()), 00220 request->sequence () 00221 )); 00222 } 00223 catch (const CORBA::OBJECT_NOT_EXIST& ex) 00224 { 00225 if (DEBUG_LEVEL > 0) 00226 { 00227 ACE_DEBUG ((LM_ERROR, 00228 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push " 00229 "(request) %s\n"), 00230 static_cast<int> (this->proxy ()->id ()), 00231 ex._info ().c_str () 00232 )); 00233 } 00234 result = DISPATCH_FAIL; 00235 } 00236 catch (const CORBA::TRANSIENT& ex) 00237 { 00238 if (DEBUG_LEVEL > 0) 00239 ACE_DEBUG ((LM_ERROR, 00240 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push " 00241 "(request) Transient (minor=%d) %s\n"), 00242 static_cast<int> (this->proxy ()->id ()), 00243 ex.minor (), 00244 ex._info ().c_str () 00245 )); 00246 const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u; 00247 switch (ex.minor () & 0xfffff000u) 00248 { 00249 case CORBA::OMGVMCID: 00250 switch (ex.minor () & 0x00000fffu) 00251 { 00252 case 2: // No usable profile 00253 case 3: // Request cancelled 00254 case 4: // POA destroyed 00255 result = DISPATCH_FAIL; 00256 break; 00257 default: 00258 result = DISPATCH_DISCARD; 00259 } 00260 break; 00261 00262 case TAO::VMCID: 00263 default: 00264 switch (ex.minor () & BITS_5_THRU_12_MASK) 00265 { 00266 case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE: 00267 result = DISPATCH_FAIL; 00268 break; 00269 case TAO_POA_DISCARDING: 00270 case TAO_POA_HOLDING: 00271 default: 00272 result = DISPATCH_RETRY; 00273 } break; 00274 } 00275 } 00276 catch (const CORBA::TIMEOUT& ex) 00277 { 00278 if (DEBUG_LEVEL > 0) 00279 ACE_DEBUG ((LM_ERROR, 00280 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %u::push " 00281 "(request) %s\n"), 00282 this->proxy ()->id (), 00283 ex._info().c_str () 00284 )); 00285 result = DISPATCH_FAIL; 00286 } 00287 catch (const CORBA::COMM_FAILURE& ex) 00288 { 00289 if (DEBUG_LEVEL > 0) 00290 ACE_DEBUG ((LM_ERROR, 00291 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %u::push " 00292 "(request) %s\n"), 00293 this->proxy ()->id (), 00294 ex._info().c_str () 00295 )); 00296 result = DISPATCH_FAIL; 00297 } 00298 catch (const CORBA::SystemException& ex) 00299 { 00300 if (DEBUG_LEVEL > 0) 00301 { 00302 ACE_DEBUG ((LM_ERROR, 00303 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push " 00304 "(request) SystemException %s\n"), 00305 static_cast<int> (this->proxy ()->id ()), 00306 ex._info ().c_str () 00307 )); 00308 } 00309 result = DISPATCH_DISCARD; 00310 } 00311 catch (const CORBA::Exception&) 00312 { 00313 ACE_ERROR ( (LM_ERROR, 00314 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push " 00315 "(request) Caught unexpected exception " 00316 "pushing event to consumer.\n"), 00317 static_cast<int> (this->proxy ()->id ()) 00318 )); 00319 result = DISPATCH_DISCARD; 00320 } 00321 00322 // for persistent events that haven't timed out 00323 // convert "FAIL" & "DISCARD" to "RETRY" 00324 // for transient events, convert RETRY to DISCARD (hey, best_effort.) 00325 if (result == DISPATCH_FAIL || result == DISPATCH_DISCARD) 00326 { 00327 if (request->should_retry ()) 00328 { 00329 result = DISPATCH_RETRY; 00330 } 00331 } 00332 else if (result == DISPATCH_RETRY) 00333 { 00334 if (! request->should_retry ()) 00335 { 00336 result = DISPATCH_DISCARD; 00337 } 00338 } 00339 00340 return result; 00341 }
void TAO_Notify_Consumer::dispatch_updates_i | ( | const CosNotification::EventTypeSeq & | added, | |
const CosNotification::EventTypeSeq & | removed | |||
) | [protected, virtual] |
Implementation of Peer specific dispatch_updates.
Implements TAO_Notify_Peer.
Definition at line 659 of file Consumer.cpp.
References have_not_yet_verified_publish_, CORBA::is_nil(), and publish_.
00660 { 00661 if (this->have_not_yet_verified_publish_) 00662 { 00663 this->have_not_yet_verified_publish_ = false; // no need to check again 00664 if (! this->publish_->_is_a ("IDL:omg.org/CosNotifyComm/NotifyPublish:1.0")) 00665 this->publish_ = CosNotifyComm::NotifyPublish::_nil(); 00666 } 00667 if (! CORBA::is_nil (this->publish_.in ())) 00668 this->publish_->offer_change (added, removed); 00669 }
bool TAO_Notify_Consumer::enqueue_if_necessary | ( | TAO_Notify_Method_Request_Event * | request | ) | [protected, virtual] |
Add request to a queue if necessary. Overridden by sequence consumer to "always" put incoming events into the queue.
Reimplemented in TAO_Notify_SequencePushConsumer.
Definition at line 99 of file Consumer.cpp.
References ACE_DEBUG, ACE_GUARD_RETURN, ACE_NEW_THROW_EX, ACE_TEXT(), DEBUG_LEVEL, ACE_Unbounded_Queue< T >::enqueue_tail(), TAO_Notify_Method_Request_Event::event(), LM_DEBUG, pending_events(), TAO_Notify_Event::queueable_copy(), schedule_timer(), TAO_Notify_Method_Request_Event::sequence(), and TAO_SYNCH_MUTEX.
Referenced by deliver().
00100 { 00101 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock (), false); 00102 if (! this->pending_events().is_empty ()) 00103 { 00104 if (DEBUG_LEVEL > 3) 00105 ACE_DEBUG ((LM_DEBUG, 00106 ACE_TEXT ("Consumer %d: enqueuing another event. %d\n"), 00107 static_cast<int> (this->proxy ()->id ()), 00108 request->sequence () 00109 )); 00110 TAO_Notify_Event::Ptr event ( 00111 request->event ()->queueable_copy ()); 00112 TAO_Notify_Method_Request_Event_Queueable * queue_entry; 00113 ACE_NEW_THROW_EX (queue_entry, 00114 TAO_Notify_Method_Request_Event_Queueable (*request, 00115 event), 00116 CORBA::NO_MEMORY ()); 00117 this->pending_events().enqueue_tail (queue_entry); 00118 this->schedule_timer (false); 00119 return true; 00120 } 00121 if (this->is_suspended_ == 1) 00122 { 00123 if (DEBUG_LEVEL > 3) 00124 ACE_DEBUG ((LM_DEBUG, 00125 ACE_TEXT ("Suspended Consumer %d enqueing event. %d\n"), 00126 static_cast<int> (this->proxy ()->id ()), 00127 request->sequence () 00128 )); 00129 TAO_Notify_Event::Ptr event ( 00130 request->event ()->queueable_copy ()); 00131 TAO_Notify_Method_Request_Event_Queueable * queue_entry; 00132 ACE_NEW_THROW_EX (queue_entry, 00133 TAO_Notify_Method_Request_Event_Queueable (*request, 00134 event), 00135 CORBA::NO_MEMORY ()); 00136 this->pending_events().enqueue_tail (queue_entry); 00137 this->schedule_timer (false); 00138 return true; 00139 } 00140 return false; 00141 }
void TAO_Notify_Consumer::enqueue_request | ( | TAO_Notify_Method_Request_Event * | request | ) | [protected] |
Definition at line 77 of file Consumer.cpp.
References ACE_DEBUG, ACE_GUARD, ACE_NEW_THROW_EX, ACE_TEXT(), DEBUG_LEVEL, ACE_Unbounded_Queue< T >::enqueue_tail(), TAO_Notify_Method_Request_Event::event(), LM_DEBUG, pending_events(), TAO_Notify_Event::queueable_copy(), TAO_Notify_Method_Request_Event::sequence(), and TAO_SYNCH_MUTEX.
Referenced by deliver(), and TAO_Notify_SequencePushConsumer::enqueue_if_necessary().
00079 { 00080 TAO_Notify_Event::Ptr event ( 00081 request->event ()->queueable_copy ()); 00082 00083 TAO_Notify_Method_Request_Event_Queueable * queue_entry; 00084 ACE_NEW_THROW_EX (queue_entry, 00085 TAO_Notify_Method_Request_Event_Queueable (*request, event), 00086 CORBA::NO_MEMORY ()); 00087 00088 if (DEBUG_LEVEL > 3) ACE_DEBUG ( (LM_DEBUG, 00089 ACE_TEXT ("Consumer %d: enqueue_request (%d) @%@.\n"), 00090 static_cast<int> (this->proxy ()->id ()), 00091 request->sequence (), 00092 request 00093 )); 00094 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ()); 00095 this->pending_events().enqueue_tail (queue_entry); 00096 }
int TAO_Notify_Consumer::handle_timeout | ( | const ACE_Time_Value & | current_time, | |
const void * | act = 0 | |||
) | [protected, virtual] |
Reimplemented from ACE_Event_Handler.
Definition at line 633 of file Consumer.cpp.
References dispatch_pending(), and timer_id_.
00634 { 00635 TAO_Notify_Consumer::Ptr grd (this); 00636 this->timer_id_ = -1; // This must come first, because dispatch_pending may try to resched 00637 try 00638 { 00639 this->dispatch_pending (); 00640 } 00641 catch (...) 00642 { 00643 } 00644 00645 return 0; 00646 }
ACE_INLINE CORBA::Boolean TAO_Notify_Consumer::is_suspended | ( | void | ) |
Is the connection suspended?
Definition at line 16 of file Consumer.inl.
References is_suspended_.
00017 { 00018 return this->is_suspended_; 00019 }
TAO_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE TAO_Notify_Consumer::Request_Queue & TAO_Notify_Consumer::pending_events | ( | ) | [protected] |
= Protected Data Members
Definition at line 9 of file Consumer.inl.
References ACE_ASSERT, and pending_events_.
Referenced by assume_pending_events(), dispatch_pending(), enqueue_if_necessary(), enqueue_request(), and TAO_Notify_Consumer().
00010 { 00011 ACE_ASSERT( pending_events_.get() != 0 ); 00012 return *pending_events_; 00013 }
TAO_Notify_Proxy * TAO_Notify_Consumer::proxy | ( | void | ) | [virtual] |
Access Base Proxy.
Implements TAO_Notify_Peer.
Definition at line 57 of file Consumer.cpp.
References proxy_supplier().
00058 { 00059 return this->proxy_supplier (); 00060 }
TAO_SYNCH_MUTEX * TAO_Notify_Consumer::proxy_lock | ( | void | ) | [protected] |
Get the shared Proxy Lock.
Definition at line 672 of file Consumer.cpp.
References TAO_Notify_Object::lock_, and proxy_.
TAO_Notify_ProxySupplier * TAO_Notify_Consumer::proxy_supplier | ( | void | ) |
Access Specific Proxy.
Definition at line 678 of file Consumer.cpp.
References proxy_.
Referenced by deliver(), TAO_Notify_SequencePushConsumer::dispatch_from_queue(), dispatch_from_queue(), and proxy().
00679 { 00680 return this->proxy_; 00681 }
virtual void TAO_Notify_Consumer::push | ( | const CosNotification::EventBatch & | event | ) | [pure virtual] |
Push a batch of events to this consumer.
Implemented in TAO_Notify_PushConsumer, TAO_Notify_SequencePushConsumer, and TAO_Notify_StructuredPushConsumer.
virtual void TAO_Notify_Consumer::push | ( | const CosNotification::StructuredEvent & | event | ) | [pure virtual] |
Push event to this consumer.
Implemented in TAO_Notify_PushConsumer, TAO_Notify_SequencePushConsumer, and TAO_Notify_StructuredPushConsumer.
virtual void TAO_Notify_Consumer::push | ( | const CORBA::Any & | event | ) | [pure virtual] |
Push event to this consumer.
Implemented in TAO_Notify_PushConsumer, TAO_Notify_SequencePushConsumer, and TAO_Notify_StructuredPushConsumer.
Referenced by dispatch_batch(), TAO_Notify_StructuredEvent_No_Copy::push(), and TAO_Notify_AnyEvent_No_Copy::push().
void TAO_Notify_Consumer::qos_changed | ( | const TAO_Notify_QoSProperties & | qos_properties | ) | [virtual] |
Override, Peer::qos_changed.
Reimplemented from TAO_Notify_Peer.
Definition at line 63 of file Consumer.cpp.
References max_batch_size_, and TAO_Notify_QoSProperties::maximum_batch_size().
00064 { 00065 this->max_batch_size_ = qos_properties.maximum_batch_size (); 00066 }
virtual void TAO_Notify_Consumer::reconnect_from_consumer | ( | TAO_Notify_Consumer * | old_consumer | ) | [pure virtual] |
On reconnect we need to move events from the old consumer to the new one
Implemented in TAO_Notify_PushConsumer, TAO_Notify_SequencePushConsumer, and TAO_Notify_StructuredPushConsumer.
void TAO_Notify_Consumer::resume | ( | void | ) |
Resume Connection.
Definition at line 69 of file Consumer.cpp.
References dispatch_pending(), and is_suspended_.
Referenced by TAO_Notify_ProxySupplier_T< SERVANT_TYPE >::resume_connection().
00070 { 00071 this->is_suspended_ = 0; 00072 00073 this->dispatch_pending (); 00074 }
void TAO_Notify_Consumer::schedule_timer | ( | bool | is_error = false |
) | [protected] |
Schedule timer.
Definition at line 570 of file Consumer.cpp.
References ACE_ASSERT, ACE_DEBUG, ACE_ERROR, ACE_TEXT(), DEBUG_LEVEL, LM_DEBUG, LM_ERROR, timer_, timer_id_, and ACE_Time_Value::zero.
Referenced by assume_pending_events(), deliver(), dispatch_pending(), TAO_Notify_SequencePushConsumer::enqueue_if_necessary(), enqueue_if_necessary(), TAO_Notify_StructuredPushConsumer::reconnect_from_consumer(), TAO_Notify_SequencePushConsumer::reconnect_from_consumer(), and TAO_Notify_PushConsumer::reconnect_from_consumer().
00571 { 00572 if (this->timer_id_ != -1) 00573 { 00574 return; // We only want a single timeout scheduled. 00575 } 00576 // don't schedule timer if there's nothing that can be done 00577 if (this->is_suspended ()) 00578 { 00579 return; 00580 } 00581 00582 ACE_ASSERT (this->timer_.get() != 0); 00583 00584 // If we're scheduling the timer due to an error then we want to 00585 // use the retry timeout, otherwise we'll assume that the pacing 00586 // interval is sufficient for now. 00587 ACE_Time_Value tv (DEFAULT_RETRY_TIMEOUT); 00588 00589 if (! is_error) 00590 { 00591 if (this->pacing_.is_valid ()) 00592 { 00593 tv = ORBSVCS_Time::to_Time_Value (this->pacing_.value ()); 00594 } 00595 } 00596 00597 if (DEBUG_LEVEL > 5) 00598 { 00599 ACE_DEBUG ((LM_DEBUG, 00600 ACE_TEXT ("Consumer %d: scheduling pacing/retry for %dms.\n"), 00601 static_cast<int> (this->proxy ()->id ()), tv.msec ())); 00602 } 00603 00604 this->timer_id_ = 00605 this->timer_->schedule_timer (this, tv, ACE_Time_Value::zero); 00606 if (this->timer_id_ == -1) 00607 { 00608 ACE_ERROR ((LM_ERROR, 00609 ACE_TEXT ("TAO_Notify_Consumer %d::schedule_timer () " 00610 "Error scheduling timer.\n"), 00611 static_cast<int> (this->proxy ()->id ()) 00612 )); 00613 } 00614 }
void TAO_Notify_Consumer::shutdown | ( | void | ) | [virtual] |
Shutdown the consumer.
Reimplemented from TAO_Notify_Peer.
Definition at line 649 of file Consumer.cpp.
References TAO_Notify_Refcountable_Guard_T< T >::reset(), and timer_.
00650 { 00651 if (this->timer_.isSet ()) 00652 { 00653 this->cancel_timer (); 00654 this->timer_.reset (); 00655 } 00656 }
ACE_INLINE void TAO_Notify_Consumer::suspend | ( | void | ) |
Suspend Connection.
Definition at line 22 of file Consumer.inl.
References is_suspended_.
Referenced by TAO_Notify_ProxySupplier_T< SERVANT_TYPE >::suspend_connection().
00023 { 00024 this->is_suspended_ = 1; 00025 }
bool TAO_Notify_Consumer::have_not_yet_verified_publish_ [protected] |
CORBA::Boolean TAO_Notify_Consumer::is_suspended_ [protected] |
Suspended Flag.
Definition at line 165 of file Consumer.h.
Referenced by is_suspended(), resume(), and suspend().
Max. batch size.
Definition at line 175 of file Consumer.h.
Referenced by TAO_Notify_SequencePushConsumer::enqueue_if_necessary(), and qos_changed().
const TAO_Notify_Property_Time& TAO_Notify_Consumer::pacing_ [protected] |
The Pacing Interval.
Definition at line 172 of file Consumer.h.
Referenced by TAO_Notify_SequencePushConsumer::enqueue_if_necessary().
Events pending to be delivered.
Definition at line 186 of file Consumer.h.
Referenced by assume_pending_events(), pending_events(), and TAO_Notify_Consumer().
TAO_Notify_ProxySupplier* TAO_Notify_Consumer::proxy_ [protected] |
The Proxy that we associate with.
Definition at line 162 of file Consumer.h.
Referenced by proxy_lock(), and proxy_supplier().
CosNotifyComm::NotifyPublish_var TAO_Notify_Consumer::publish_ [protected] |
Interface that accepts offer_changes.
Definition at line 168 of file Consumer.h.
Referenced by dispatch_updates_i(), TAO_Notify_StructuredPushConsumer::init(), TAO_Notify_SequencePushConsumer::init(), and TAO_Notify_PushConsumer::init().
TAO_Notify_Timer::Ptr TAO_Notify_Consumer::timer_ [protected] |
The Timer Manager that we use.
Definition at line 181 of file Consumer.h.
Referenced by assume_pending_events(), cancel_timer(), schedule_timer(), shutdown(), TAO_Notify_Consumer(), and ~TAO_Notify_Consumer().
long TAO_Notify_Consumer::timer_id_ [protected] |
Timer Id.
Definition at line 178 of file Consumer.h.
Referenced by cancel_timer(), handle_timeout(), and schedule_timer().