TAO_Notify_Consumer Class Reference

Astract Base class for wrapping consumer objects that connect to the EventChannel. More...

#include <Consumer.h>

Inheritance diagram for TAO_Notify_Consumer:

Inheritance graph
[legend]
Collaboration diagram for TAO_Notify_Consumer:

Collaboration graph
[legend]
List of all members.

Public Types

 DISPATCH_SUCCESS
 DISPATCH_RETRY
 DISPATCH_DISCARD
 DISPATCH_FAIL
enum  DispatchStatus { DISPATCH_SUCCESS, DISPATCH_RETRY, DISPATCH_DISCARD, DISPATCH_FAIL }
 Status returned from dispatch attempts. More...

Public Member Functions

 TAO_Notify_Consumer (TAO_Notify_ProxySupplier *proxy)
 Constructor.
virtual ~TAO_Notify_Consumer ()
 Destructor.
TAO_Notify_ProxySupplierproxy_supplier (void)
 Access Specific Proxy.
virtual TAO_Notify_Proxyproxy (void)
 Access Base Proxy.
void deliver (TAO_Notify_Method_Request_Event *request)
 Dispatch Event to consumer.
virtual void push (const CORBA::Any &event)=0
 Push event to this consumer.
virtual void push (const CosNotification::StructuredEvent &event)=0
 Push event to this consumer.
virtual void push (const CosNotification::EventBatch &event)=0
 Push a batch of events to this consumer.
DispatchStatus dispatch_batch (const CosNotification::EventBatch &batch)
 Dispatch the batch of events to the attached consumer.
void dispatch_pending (void)
 Dispatch the pending events.
CORBA::Boolean is_suspended (void)
 Is the connection suspended?
void suspend (void)
 Suspend Connection.
void resume (void)
 Resume Connection.
virtual void shutdown (void)
 Shutdown the consumer.
virtual void reconnect_from_consumer (TAO_Notify_Consumer *old_consumer)=0
virtual void qos_changed (const TAO_Notify_QoSProperties &qos_properties)
 Override, Peer::qos_changed.
void assume_pending_events (TAO_Notify_Consumer &rhs)

Protected Types

typedef ACE_Unbounded_Queue<
TAO_Notify_Method_Request_Event_Queueable * > 
Request_Queue

Protected Member Functions

DispatchStatus dispatch_request (TAO_Notify_Method_Request_Event *request)
virtual bool dispatch_from_queue (Request_Queue &requests, ACE_Guard< TAO_SYNCH_MUTEX > &ace_mon)
 Attempt to dispatch event from a queue.
void enqueue_request (TAO_Notify_Method_Request_Event *request)
virtual bool enqueue_if_necessary (TAO_Notify_Method_Request_Event *request)
virtual void dispatch_updates_i (const CosNotification::EventTypeSeq &added, const CosNotification::EventTypeSeq &removed)
 Implementation of Peer specific dispatch_updates.
TAO_SYNCH_MUTEXproxy_lock (void)
 Get the shared Proxy Lock.
virtual int handle_timeout (const ACE_Time_Value &current_time, const void *act=0)
void schedule_timer (bool is_error=false)
 Schedule timer.
void cancel_timer (void)
 Cancel timer.
Request_Queuepending_events ()
 = Protected Data Members

Protected Attributes

TAO_Notify_ProxySupplierproxy_
 The Proxy that we associate with.
CORBA::Boolean is_suspended_
 Suspended Flag.
CosNotifyComm::NotifyPublish_var publish_
 Interface that accepts offer_changes.
bool have_not_yet_verified_publish_
const TAO_Notify_Property_Timepacing_
 The Pacing Interval.
TAO_Notify_Property_Long max_batch_size_
 Max. batch size.
long timer_id_
 Timer Id.
TAO_Notify_Timer::Ptr timer_
 The Timer Manager that we use.

Private Attributes

ACE_Auto_Ptr< Request_Queuepending_events_
 Events pending to be delivered.

Detailed Description

Astract Base class for wrapping consumer objects that connect to the EventChannel.

Definition at line 44 of file Consumer.h.


Member Typedef Documentation

typedef ACE_Unbounded_Queue<TAO_Notify_Method_Request_Event_Queueable *> TAO_Notify_Consumer::Request_Queue [protected]

Definition at line 113 of file Consumer.h.


Member Enumeration Documentation

enum TAO_Notify_Consumer::DispatchStatus

Status returned from dispatch attempts.

Enumerator:
DISPATCH_SUCCESS 
DISPATCH_RETRY 
DISPATCH_DISCARD 
DISPATCH_FAIL 

Definition at line 51 of file Consumer.h.

00051                       {
00052     DISPATCH_SUCCESS,
00053     DISPATCH_RETRY,   // retry this message
00054     DISPATCH_DISCARD, // discard this message
00055     DISPATCH_FAIL};   // discard all messages and disconnect consumer


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_Notify_Consumer::TAO_Notify_Consumer ( TAO_Notify_ProxySupplier proxy  ) 

Constructor.

Definition at line 31 of file Consumer.cpp.

References ACE_NEW, pending_events(), pending_events_, TAO_Notify_Refcountable_Guard_T< T >::reset(), and timer_.

00032 : proxy_ (proxy)
00033 , is_suspended_ (0)
00034 , have_not_yet_verified_publish_ (true)
00035 , pacing_ (proxy->qos_properties_.pacing_interval ())
00036 , max_batch_size_ (CosNotification::MaximumBatchSize, 0)
00037 , timer_id_ (-1)
00038 , timer_ (0)
00039 {
00040   Request_Queue* pending_events = 0;
00041   ACE_NEW (pending_events, TAO_Notify_Consumer::Request_Queue ());
00042   this->pending_events_.reset( pending_events );
00043 
00044   this->timer_.reset( this->proxy ()->timer () );
00045 }

TAO_Notify_Consumer::~TAO_Notify_Consumer (  )  [virtual]

Destructor.

Definition at line 47 of file Consumer.cpp.

References TAO_Notify_Refcountable_Guard_T< T >::reset(), and timer_.

00048 {
00049   if (this->timer_.isSet())
00050   {
00051     this->cancel_timer ();
00052     this->timer_.reset ();
00053   }
00054 }


Member Function Documentation

void TAO_Notify_Consumer::assume_pending_events ( TAO_Notify_Consumer rhs  ) 

Take the pending queue from the rhs, cancel it's timer and schedule our timer. The caller should have locked the proxy lock before calling this method.

Definition at line 684 of file Consumer.cpp.

References cancel_timer(), ACE_Unbounded_Queue< T >::is_empty(), TAO_Notify_Refcountable_Guard_T< T >::isSet(), pending_events(), pending_events_, schedule_timer(), and timer_.

00685 {
00686   // No need to lock the this proxy's lock.  It should have been locked
00687   // by the caller.
00688 
00689   // If the original consumer has pending events
00690   if (!rhs.pending_events ().is_empty ())
00691     {
00692       // We will take them away and cancel it's timer
00693       this->pending_events_.reset (rhs.pending_events_.release ());
00694       if (rhs.timer_.isSet ())
00695         {
00696           rhs.cancel_timer ();
00697         }
00698 
00699       // Schedule a new timer for us, which will use the default
00700       // timer value (unless we have a valid pacing interval).
00701       this->schedule_timer ();
00702     }
00703 }

void TAO_Notify_Consumer::cancel_timer ( void   )  [protected]

Cancel timer.

Definition at line 617 of file Consumer.cpp.

References ACE_DEBUG, ACE_TEXT(), DEBUG_LEVEL, LM_DEBUG, timer_, and timer_id_.

Referenced by assume_pending_events().

00618 {
00619   if (this->timer_.isSet() && this->timer_id_ != -1)
00620     {
00621       if (DEBUG_LEVEL  > 5)
00622         ACE_DEBUG ((LM_DEBUG,
00623                     ACE_TEXT ("Consumer %d canceling dispatch timer.\n"),
00624                     static_cast<int> (this->proxy ()->id ())
00625                     ));
00626 
00627       this->timer_->cancel_timer (timer_id_);
00628     }
00629   this->timer_id_ = -1;
00630 }

void TAO_Notify_Consumer::deliver ( TAO_Notify_Method_Request_Event request  ) 

Dispatch Event to consumer.

Definition at line 144 of file Consumer.cpp.

References ACE_DEBUG, ACE_TEXT(), TAO_Notify_Method_Request_Event::complete(), DEBUG_LEVEL, TAO_Notify_ProxySupplier::destroy(), DISPATCH_DISCARD, DISPATCH_FAIL, dispatch_request(), DISPATCH_RETRY, DISPATCH_SUCCESS, enqueue_if_necessary(), enqueue_request(), LM_DEBUG, proxy_supplier(), schedule_timer(), and TAO_Notify_Method_Request_Event::sequence().

Referenced by TAO_Notify_Method_Request_Dispatch::execute_i().

00145 {
00146   // Increment reference counts (safely) to prevent this object and its proxy
00147   // from being deleted while the push is in progress.
00148   TAO_Notify_Proxy::Ptr proxy_guard (this->proxy ());
00149   bool queued = enqueue_if_necessary (request);
00150   if (!queued)
00151     {
00152       DispatchStatus status = this->dispatch_request (request);
00153       switch (status)
00154         {
00155         case DISPATCH_SUCCESS:
00156           {
00157             request->complete ();
00158             break;
00159           }
00160         case DISPATCH_RETRY:
00161           {
00162             if (DEBUG_LEVEL > 1)
00163               ACE_DEBUG ((LM_DEBUG,
00164                           ACE_TEXT ("Consumer %d enqueing event %d due ")
00165                           ACE_TEXT ("to failed dispatch.\n"),
00166                           static_cast<int> (this->proxy ()->id ()),
00167                           request->sequence ()));
00168             this->enqueue_request (request);
00169             this->schedule_timer (true);
00170             break;
00171           }
00172         case DISPATCH_DISCARD:
00173           {
00174             if (DEBUG_LEVEL  > 0)
00175               ACE_DEBUG ((LM_DEBUG,
00176                           ACE_TEXT ("(%P|%t) Consumer %d: Error during "
00177                                     "direct dispatch. Discarding event:%d.\n"),
00178                           static_cast<int> (this->proxy ()->id ()),
00179                           request->sequence ()
00180                           ));
00181             request->complete ();
00182             break;
00183           }
00184         case DISPATCH_FAIL:
00185           {
00186             if (DEBUG_LEVEL  > 0)
00187               ACE_DEBUG ((LM_DEBUG,
00188                           ACE_TEXT ("(%P|%t) Consumer %d: Failed during "
00189                                     "direct dispatch :%d. Discarding event.\n"),
00190                           static_cast<int> (this->proxy ()->id ()),
00191                           request->sequence ()
00192                           ));
00193             request->complete ();
00194             try
00195               {
00196                 this->proxy_supplier ()->destroy ();
00197               }
00198             catch (const CORBA::Exception&)
00199               {
00200                 // todo is there something meaningful we can do here?
00201                 ;
00202               }
00203             break;
00204           }
00205         }
00206     }
00207 }

TAO_Notify_Consumer::DispatchStatus TAO_Notify_Consumer::dispatch_batch ( const CosNotification::EventBatch batch  ) 

Dispatch the batch of events to the attached consumer.

Definition at line 344 of file Consumer.cpp.

References CORBA::SystemException::_info(), ACE_DEBUG, ACE_ERROR, ACE_TEXT(), ACE_String_Base< CHAR >::c_str(), DEBUG_LEVEL, DISPATCH_DISCARD, DISPATCH_FAIL, DISPATCH_RETRY, DISPATCH_SUCCESS, LM_ERROR, CORBA::OMGVMCID, push(), TAO_INVOCATION_SEND_REQUEST_MINOR_CODE, TAO_POA_DISCARDING, TAO_POA_HOLDING, and TAO::VMCID.

Referenced by TAO_Notify_SequencePushConsumer::dispatch_from_queue().

00345 {
00346   DispatchStatus result = DISPATCH_SUCCESS;
00347   try
00348     {
00349       this->push (batch);
00350     }
00351   catch (const CORBA::OBJECT_NOT_EXIST& ex)
00352     {
00353       if (DEBUG_LEVEL  > 0)
00354         ACE_DEBUG ((LM_ERROR,
00355                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00356                               "%d::dispatch_batch() %s\n"),
00357                     static_cast<int> (this->proxy ()->id ()),
00358                     ex._info ().c_str ()
00359                     ));
00360       result = DISPATCH_FAIL;
00361     }
00362   catch (const CORBA::TRANSIENT& ex)
00363     {
00364       if (DEBUG_LEVEL  > 0)
00365         ACE_DEBUG ((LM_ERROR,
00366                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00367                               "%d::dispatch_batch() Transient (minor=%d) %s\n"),
00368                     static_cast<int> (this->proxy ()->id ()),
00369                     ex.minor (),
00370                     ex._info ().c_str ()
00371                     ));
00372       const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u;
00373       switch (ex.minor () & 0xfffff000u)
00374         {
00375         case CORBA::OMGVMCID:
00376           switch (ex.minor () & 0x00000fffu)
00377             {
00378             case 2: // No usable profile
00379             case 3: // Request cancelled
00380             case 4: // POA destroyed
00381               result = DISPATCH_FAIL;
00382               break;
00383             default:
00384               result = DISPATCH_DISCARD;
00385             }
00386           break;
00387 
00388         case TAO::VMCID:
00389         default:
00390           switch (ex.minor () & BITS_5_THRU_12_MASK)
00391             {
00392             case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE:
00393               result = DISPATCH_FAIL;
00394               break;
00395             case TAO_POA_DISCARDING:
00396             case TAO_POA_HOLDING:
00397             default:
00398               result = DISPATCH_RETRY;
00399             } break;
00400         }
00401     }
00402   catch (const CORBA::TIMEOUT& ex)
00403     {
00404       if (DEBUG_LEVEL  > 0)
00405         ACE_DEBUG ((LM_ERROR,
00406                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00407                               "%u::dispatch_batch() %s\n"),
00408                     this->proxy ()->id (),
00409                     ex._info().c_str ()
00410                     ));
00411       result = DISPATCH_FAIL;
00412     }
00413   catch (const CORBA::COMM_FAILURE& ex)
00414     {
00415       if (DEBUG_LEVEL  > 0)
00416         ACE_DEBUG ((LM_ERROR,
00417                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00418                               "%u::dispatch_batch() %s\n"),
00419                     this->proxy ()->id (),
00420                     ex._info().c_str ()
00421                     ));
00422       result = DISPATCH_FAIL;
00423     }
00424   catch (const CORBA::SystemException& ex)
00425     {
00426       if (DEBUG_LEVEL  > 0)
00427         {
00428           ACE_DEBUG ((LM_ERROR,
00429                       ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00430                                 "%d::dispatch_batch() SystemException %s\n"),
00431                       static_cast<int>  (this->proxy ()->id ()),
00432                       ex._info ().c_str ()
00433                       ));
00434         }
00435       result = DISPATCH_DISCARD;
00436     }
00437   catch (const CORBA::Exception&)
00438     {
00439       ACE_ERROR ((LM_ERROR,
00440                   ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00441                             "%d::dispatch_batch() Caught unexpected "
00442                             "exception pushing batch to consumer.\n"),
00443                   static_cast<int> (this->proxy ()->id ())
00444                   ));
00445       result = DISPATCH_DISCARD;
00446     }
00447   return result;
00448 }

bool TAO_Notify_Consumer::dispatch_from_queue ( Request_Queue requests,
ACE_Guard< TAO_SYNCH_MUTEX > &  ace_mon 
) [protected, virtual]

Attempt to dispatch event from a queue.

Called by dispatch_pending. Deliver one or more events to the Consumer. If delivery fails, events are left in the queue (or discarded depending on QoS parameters.) Undelivered, undiscarded requests are left at the front of the queue. Overridden in sequence consumer to dispatch as an EventBatch.

Returns:
false if delivery failed and the request(s) cannot be discarded.

Definition at line 481 of file Consumer.cpp.

References ACE_DEBUG, ACE_TEXT(), ACE_Guard< ACE_LOCK >::acquire(), TAO_Notify_Method_Request_Event::complete(), DEBUG_LEVEL, ACE_Unbounded_Queue< T >::dequeue_head(), TAO_Notify_ProxySupplier::destroy(), DISPATCH_DISCARD, DISPATCH_FAIL, dispatch_request(), DISPATCH_RETRY, DISPATCH_SUCCESS, ACE_Unbounded_Queue< T >::enqueue_head(), LM_DEBUG, proxy_supplier(), ACE_Message_Block::release(), ACE_Guard< ACE_LOCK >::release(), and TAO_Notify_Method_Request_Event::sequence().

Referenced by dispatch_pending().

00482 {
00483   bool result = true;
00484   TAO_Notify_Method_Request_Event_Queueable * request;
00485   if (requests.dequeue_head (request) == 0)
00486     {
00487       ace_mon.release ();
00488       DispatchStatus status = this->dispatch_request (request);
00489       switch (status)
00490         {
00491         case DISPATCH_SUCCESS:
00492           {
00493             request->complete ();
00494             request->release ();
00495             result = true;
00496             ace_mon.acquire ();
00497             break;
00498           }
00499         case DISPATCH_RETRY:
00500           {
00501             if (DEBUG_LEVEL  > 0)
00502               ACE_DEBUG ((LM_DEBUG,
00503                           ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00504                           static_cast<int> (this->proxy ()->id ()),
00505                           request->sequence ()
00506                           ));
00507             ace_mon.acquire ();
00508             requests.enqueue_head (request); // put the failed event back where it was
00509             result = false;
00510             break;
00511           }
00512         case DISPATCH_DISCARD:
00513           {
00514             if (DEBUG_LEVEL  > 0)
00515               ACE_DEBUG ((LM_DEBUG,
00516                           ACE_TEXT ("(%P|%t) Consumer %d: Error during "
00517                                     "dispatch. Discarding event:%d.\n"),
00518                           static_cast<int> (this->proxy ()->id ()),
00519                           request->sequence ()
00520                           ));
00521             request->complete ();
00522             ace_mon.acquire ();
00523             result = true;
00524             break;
00525           }
00526         case DISPATCH_FAIL:
00527           {
00528             if (DEBUG_LEVEL  > 0)
00529               ACE_DEBUG ((LM_DEBUG,
00530                           ACE_TEXT ("(%P|%t) Consumer %d: Failed. "
00531                                     "Discarding event %d.\n"),
00532                           static_cast<int> (this->proxy ()->id ()),
00533                           request->sequence ()
00534                           ));
00535             request->complete ();
00536             ace_mon.acquire ();
00537             while (requests.dequeue_head (request) == 0)
00538               {
00539                 ace_mon.release ();
00540                 request->complete ();
00541                 ace_mon.acquire ();
00542               }
00543             ace_mon.release ();
00544             try
00545               {
00546                 this->proxy_supplier ()->destroy ();
00547               }
00548             catch (const CORBA::Exception&)
00549               {
00550                 // todo is there something reasonable to do here?
00551               }
00552             ace_mon.acquire ();
00553             result = true;
00554             break;
00555           }
00556         default:
00557           {
00558             ace_mon.acquire ();
00559             result = false;
00560             break;
00561           }
00562         }
00563     }
00564   return result;
00565 }

void TAO_Notify_Consumer::dispatch_pending ( void   ) 

Dispatch the pending events.

Definition at line 451 of file Consumer.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), DEBUG_LEVEL, dispatch_from_queue(), ACE_Unbounded_Queue< T >::is_empty(), LM_DEBUG, pending_events(), schedule_timer(), and TAO_SYNCH_MUTEX.

Referenced by handle_timeout(), and resume().

00452 {
00453   if (DEBUG_LEVEL  > 5)
00454     ACE_DEBUG ((LM_DEBUG,
00455                 ACE_TEXT ("Consumer %d dispatching pending events.  Queue size: %d\n"),
00456                 static_cast<int> (this->proxy ()->id ()),
00457                 this->pending_events().size ()
00458                 ));
00459 
00460   // lock ourselves in memory for the duration
00461   TAO_Notify_Consumer::Ptr self_grd (this);
00462 
00463   // dispatch events until: 1) the queue is empty; 2) the proxy shuts down, or 3) the dispatch fails
00464   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
00465   bool ok = true;
00466   while (ok
00467          && !this->proxy_supplier ()->has_shutdown ()
00468          && !this->pending_events().is_empty ())
00469     {
00470       if (! dispatch_from_queue ( this->pending_events(), ace_mon))
00471         {
00472           this->schedule_timer (true);
00473           ok = false;
00474         }
00475     }
00476 }

TAO_Notify_Consumer::DispatchStatus TAO_Notify_Consumer::dispatch_request ( TAO_Notify_Method_Request_Event request  )  [protected]

Definition at line 210 of file Consumer.cpp.

References CORBA::SystemException::_info(), ACE_DEBUG, ACE_ERROR, ACE_TEXT(), ACE_String_Base< CHAR >::c_str(), DEBUG_LEVEL, DISPATCH_DISCARD, DISPATCH_FAIL, DISPATCH_RETRY, DISPATCH_SUCCESS, TAO_Notify_Method_Request_Event::event(), LM_DEBUG, LM_ERROR, CORBA::OMGVMCID, TAO_Notify_Event::push(), TAO_Notify_Method_Request_Event::sequence(), TAO_Notify_Method_Request_Event::should_retry(), TAO_INVOCATION_SEND_REQUEST_MINOR_CODE, TAO_POA_DISCARDING, TAO_POA_HOLDING, and TAO::VMCID.

Referenced by deliver(), and dispatch_from_queue().

00211 {
00212   DispatchStatus result = DISPATCH_SUCCESS;
00213   try
00214     {
00215       request->event ()->push (this);
00216       if (DEBUG_LEVEL  > 8)
00217         ACE_DEBUG ((LM_DEBUG,
00218                     ACE_TEXT ("Consumer %d dispatched single event %d.\n"),
00219                     static_cast<int> (this->proxy ()->id ()),
00220                     request->sequence ()
00221                     ));
00222     }
00223   catch (const CORBA::OBJECT_NOT_EXIST& ex)
00224     {
00225       if (DEBUG_LEVEL  > 0)
00226         {
00227           ACE_DEBUG ((LM_ERROR,
00228                       ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00229                                 "(request) %s\n"),
00230                       static_cast<int> (this->proxy ()->id ()),
00231                       ex._info ().c_str ()
00232                       ));
00233         }
00234       result = DISPATCH_FAIL;
00235     }
00236   catch (const CORBA::TRANSIENT& ex)
00237     {
00238       if (DEBUG_LEVEL  > 0)
00239         ACE_DEBUG ((LM_ERROR,
00240                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00241                               "(request) Transient (minor=%d) %s\n"),
00242                     static_cast<int> (this->proxy ()->id ()),
00243                     ex.minor (),
00244                     ex._info ().c_str ()
00245                     ));
00246       const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u;
00247       switch (ex.minor () & 0xfffff000u)
00248         {
00249         case CORBA::OMGVMCID:
00250           switch (ex.minor () & 0x00000fffu)
00251             {
00252             case 2: // No usable profile
00253             case 3: // Request cancelled
00254             case 4: // POA destroyed
00255               result = DISPATCH_FAIL;
00256               break;
00257             default:
00258               result = DISPATCH_DISCARD;
00259             }
00260           break;
00261 
00262         case TAO::VMCID:
00263         default:
00264           switch (ex.minor () & BITS_5_THRU_12_MASK)
00265             {
00266             case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE:
00267               result = DISPATCH_FAIL;
00268               break;
00269             case TAO_POA_DISCARDING:
00270             case TAO_POA_HOLDING:
00271             default:
00272               result = DISPATCH_RETRY;
00273             } break;
00274         }
00275     }
00276   catch (const CORBA::TIMEOUT& ex)
00277     {
00278       if (DEBUG_LEVEL  > 0)
00279         ACE_DEBUG ((LM_ERROR,
00280                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %u::push "
00281                               "(request) %s\n"),
00282                     this->proxy ()->id (),
00283                     ex._info().c_str ()
00284                     ));
00285       result = DISPATCH_FAIL;
00286     }
00287   catch (const CORBA::COMM_FAILURE& ex)
00288     {
00289       if (DEBUG_LEVEL  > 0)
00290         ACE_DEBUG ((LM_ERROR,
00291                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %u::push "
00292                               "(request) %s\n"),
00293                     this->proxy ()->id (),
00294                     ex._info().c_str ()
00295                     ));
00296       result = DISPATCH_FAIL;
00297     }
00298   catch (const CORBA::SystemException& ex)
00299     {
00300       if (DEBUG_LEVEL  > 0)
00301         {
00302           ACE_DEBUG ((LM_ERROR,
00303                       ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00304                                 "(request) SystemException %s\n"),
00305                       static_cast<int> (this->proxy ()->id ()),
00306                       ex._info ().c_str ()
00307                       ));
00308         }
00309       result = DISPATCH_DISCARD;
00310     }
00311   catch (const CORBA::Exception&)
00312     {
00313       ACE_ERROR ( (LM_ERROR,
00314                    ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00315                              "(request) Caught unexpected exception "
00316                              "pushing event to consumer.\n"),
00317                    static_cast<int> (this->proxy ()->id ())
00318                    ));
00319       result = DISPATCH_DISCARD;
00320     }
00321 
00322   // for persistent events that haven't timed out
00323   // convert "FAIL" & "DISCARD" to "RETRY"
00324   // for transient events, convert RETRY to DISCARD (hey, best_effort.)
00325   if (result == DISPATCH_FAIL || result == DISPATCH_DISCARD)
00326     {
00327       if (request->should_retry ())
00328         {
00329           result = DISPATCH_RETRY;
00330         }
00331     }
00332   else if (result == DISPATCH_RETRY)
00333     {
00334       if (! request->should_retry ())
00335         {
00336           result = DISPATCH_DISCARD;
00337         }
00338     }
00339 
00340   return result;
00341 }

void TAO_Notify_Consumer::dispatch_updates_i ( const CosNotification::EventTypeSeq added,
const CosNotification::EventTypeSeq removed 
) [protected, virtual]

Implementation of Peer specific dispatch_updates.

Implements TAO_Notify_Peer.

Definition at line 659 of file Consumer.cpp.

References have_not_yet_verified_publish_, CORBA::is_nil(), and publish_.

00660 {
00661   if (this->have_not_yet_verified_publish_)
00662     {
00663       this->have_not_yet_verified_publish_ = false; // no need to check again
00664       if (! this->publish_->_is_a ("IDL:omg.org/CosNotifyComm/NotifyPublish:1.0"))
00665         this->publish_ = CosNotifyComm::NotifyPublish::_nil();
00666     }
00667   if (! CORBA::is_nil (this->publish_.in ()))
00668     this->publish_->offer_change (added, removed);
00669 }

bool TAO_Notify_Consumer::enqueue_if_necessary ( TAO_Notify_Method_Request_Event request  )  [protected, virtual]

Add request to a queue if necessary. Overridden by sequence consumer to "always" put incoming events into the queue.

Returns:
true the request has been enqueued; false the request should be handled now.

Reimplemented in TAO_Notify_SequencePushConsumer.

Definition at line 99 of file Consumer.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_NEW_THROW_EX, ACE_TEXT(), DEBUG_LEVEL, ACE_Unbounded_Queue< T >::enqueue_tail(), TAO_Notify_Method_Request_Event::event(), LM_DEBUG, pending_events(), TAO_Notify_Event::queueable_copy(), schedule_timer(), TAO_Notify_Method_Request_Event::sequence(), and TAO_SYNCH_MUTEX.

Referenced by deliver().

00100 {
00101   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock (), false);
00102   if (! this->pending_events().is_empty ())
00103     {
00104       if (DEBUG_LEVEL > 3)
00105         ACE_DEBUG ((LM_DEBUG,
00106                     ACE_TEXT ("Consumer %d: enqueuing another event. %d\n"),
00107                     static_cast<int> (this->proxy ()->id ()),
00108                     request->sequence ()
00109                     ));
00110       TAO_Notify_Event::Ptr event (
00111         request->event ()->queueable_copy ());
00112       TAO_Notify_Method_Request_Event_Queueable * queue_entry;
00113       ACE_NEW_THROW_EX (queue_entry,
00114                         TAO_Notify_Method_Request_Event_Queueable (*request,
00115                                                                    event),
00116                         CORBA::NO_MEMORY ());
00117       this->pending_events().enqueue_tail (queue_entry);
00118       this->schedule_timer (false);
00119       return true;
00120     }
00121   if (this->is_suspended_ == 1)
00122     {
00123       if (DEBUG_LEVEL > 3)
00124         ACE_DEBUG ((LM_DEBUG,
00125                     ACE_TEXT ("Suspended Consumer %d enqueing event. %d\n"),
00126                     static_cast<int> (this->proxy ()->id ()),
00127                     request->sequence ()
00128                     ));
00129       TAO_Notify_Event::Ptr event (
00130         request->event ()->queueable_copy ());
00131       TAO_Notify_Method_Request_Event_Queueable * queue_entry;
00132       ACE_NEW_THROW_EX (queue_entry,
00133                         TAO_Notify_Method_Request_Event_Queueable (*request,
00134                                                                    event),
00135                         CORBA::NO_MEMORY ());
00136       this->pending_events().enqueue_tail (queue_entry);
00137       this->schedule_timer (false);
00138       return true;
00139     }
00140   return false;
00141 }

void TAO_Notify_Consumer::enqueue_request ( TAO_Notify_Method_Request_Event request  )  [protected]

Definition at line 77 of file Consumer.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_NEW_THROW_EX, ACE_TEXT(), DEBUG_LEVEL, ACE_Unbounded_Queue< T >::enqueue_tail(), TAO_Notify_Method_Request_Event::event(), LM_DEBUG, pending_events(), TAO_Notify_Event::queueable_copy(), TAO_Notify_Method_Request_Event::sequence(), and TAO_SYNCH_MUTEX.

Referenced by deliver(), and TAO_Notify_SequencePushConsumer::enqueue_if_necessary().

00079 {
00080   TAO_Notify_Event::Ptr event (
00081     request->event ()->queueable_copy ());
00082 
00083   TAO_Notify_Method_Request_Event_Queueable * queue_entry;
00084   ACE_NEW_THROW_EX (queue_entry,
00085     TAO_Notify_Method_Request_Event_Queueable (*request, event),
00086     CORBA::NO_MEMORY ());
00087 
00088   if (DEBUG_LEVEL  > 3) ACE_DEBUG ( (LM_DEBUG,
00089     ACE_TEXT ("Consumer %d: enqueue_request (%d) @%@.\n"),
00090     static_cast<int> (this->proxy ()->id ()),
00091     request->sequence (),
00092     request
00093     ));
00094   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
00095   this->pending_events().enqueue_tail (queue_entry);
00096 }

int TAO_Notify_Consumer::handle_timeout ( const ACE_Time_Value current_time,
const void *  act = 0 
) [protected, virtual]

Reimplemented from ACE_Event_Handler.

Definition at line 633 of file Consumer.cpp.

References dispatch_pending(), and timer_id_.

00634 {
00635   TAO_Notify_Consumer::Ptr grd (this);
00636   this->timer_id_ = -1;  // This must come first, because dispatch_pending may try to resched
00637   try
00638     {
00639       this->dispatch_pending ();
00640     }
00641   catch (...)
00642     {
00643     }
00644 
00645   return 0;
00646 }

ACE_INLINE CORBA::Boolean TAO_Notify_Consumer::is_suspended ( void   ) 

Is the connection suspended?

Definition at line 16 of file Consumer.inl.

References is_suspended_.

00017 {
00018   return this->is_suspended_;
00019 }

TAO_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE TAO_Notify_Consumer::Request_Queue & TAO_Notify_Consumer::pending_events (  )  [protected]

= Protected Data Members

Definition at line 9 of file Consumer.inl.

References ACE_ASSERT, and pending_events_.

Referenced by assume_pending_events(), dispatch_pending(), enqueue_if_necessary(), enqueue_request(), and TAO_Notify_Consumer().

00010 {
00011   ACE_ASSERT( pending_events_.get() != 0 );
00012   return *pending_events_;
00013 }

TAO_Notify_Proxy * TAO_Notify_Consumer::proxy ( void   )  [virtual]

Access Base Proxy.

Implements TAO_Notify_Peer.

Definition at line 57 of file Consumer.cpp.

References proxy_supplier().

00058 {
00059   return this->proxy_supplier ();
00060 }

TAO_SYNCH_MUTEX * TAO_Notify_Consumer::proxy_lock ( void   )  [protected]

Get the shared Proxy Lock.

Definition at line 672 of file Consumer.cpp.

References TAO_Notify_Object::lock_, and proxy_.

00673 {
00674   return &this->proxy_->lock_;
00675 }

TAO_Notify_ProxySupplier * TAO_Notify_Consumer::proxy_supplier ( void   ) 

Access Specific Proxy.

Definition at line 678 of file Consumer.cpp.

References proxy_.

Referenced by deliver(), TAO_Notify_SequencePushConsumer::dispatch_from_queue(), dispatch_from_queue(), and proxy().

00679 {
00680   return this->proxy_;
00681 }

virtual void TAO_Notify_Consumer::push ( const CosNotification::EventBatch event  )  [pure virtual]

Push a batch of events to this consumer.

Implemented in TAO_Notify_PushConsumer, TAO_Notify_SequencePushConsumer, and TAO_Notify_StructuredPushConsumer.

virtual void TAO_Notify_Consumer::push ( const CosNotification::StructuredEvent event  )  [pure virtual]

Push event to this consumer.

Implemented in TAO_Notify_PushConsumer, TAO_Notify_SequencePushConsumer, and TAO_Notify_StructuredPushConsumer.

virtual void TAO_Notify_Consumer::push ( const CORBA::Any &  event  )  [pure virtual]

Push event to this consumer.

Implemented in TAO_Notify_PushConsumer, TAO_Notify_SequencePushConsumer, and TAO_Notify_StructuredPushConsumer.

Referenced by dispatch_batch(), TAO_Notify_StructuredEvent_No_Copy::push(), and TAO_Notify_AnyEvent_No_Copy::push().

void TAO_Notify_Consumer::qos_changed ( const TAO_Notify_QoSProperties qos_properties  )  [virtual]

Override, Peer::qos_changed.

Reimplemented from TAO_Notify_Peer.

Definition at line 63 of file Consumer.cpp.

References max_batch_size_, and TAO_Notify_QoSProperties::maximum_batch_size().

00064 {
00065   this->max_batch_size_ = qos_properties.maximum_batch_size ();
00066 }

virtual void TAO_Notify_Consumer::reconnect_from_consumer ( TAO_Notify_Consumer old_consumer  )  [pure virtual]

On reconnect we need to move events from the old consumer to the new one

Implemented in TAO_Notify_PushConsumer, TAO_Notify_SequencePushConsumer, and TAO_Notify_StructuredPushConsumer.

void TAO_Notify_Consumer::resume ( void   ) 

Resume Connection.

Definition at line 69 of file Consumer.cpp.

References dispatch_pending(), and is_suspended_.

Referenced by TAO_Notify_ProxySupplier_T< SERVANT_TYPE >::resume_connection().

00070 {
00071   this->is_suspended_ = 0;
00072 
00073   this->dispatch_pending ();
00074 }

void TAO_Notify_Consumer::schedule_timer ( bool  is_error = false  )  [protected]

Schedule timer.

Definition at line 570 of file Consumer.cpp.

References ACE_ASSERT, ACE_DEBUG, ACE_ERROR, ACE_TEXT(), DEBUG_LEVEL, LM_DEBUG, LM_ERROR, timer_, timer_id_, and ACE_Time_Value::zero.

Referenced by assume_pending_events(), deliver(), dispatch_pending(), TAO_Notify_SequencePushConsumer::enqueue_if_necessary(), enqueue_if_necessary(), TAO_Notify_StructuredPushConsumer::reconnect_from_consumer(), TAO_Notify_SequencePushConsumer::reconnect_from_consumer(), and TAO_Notify_PushConsumer::reconnect_from_consumer().

00571 {
00572   if (this->timer_id_ != -1)
00573     {
00574       return; // We only want a single timeout scheduled.
00575     }
00576   // don't schedule timer if there's nothing that can be done
00577   if (this->is_suspended ())
00578     {
00579       return;
00580     }
00581 
00582   ACE_ASSERT (this->timer_.get() != 0);
00583 
00584   // If we're scheduling the timer due to an error then we want to
00585   // use the retry timeout, otherwise we'll assume that the pacing
00586   // interval is sufficient for now.
00587   ACE_Time_Value tv (DEFAULT_RETRY_TIMEOUT);
00588 
00589   if (! is_error)
00590     {
00591       if (this->pacing_.is_valid ())
00592         {
00593           tv = ORBSVCS_Time::to_Time_Value (this->pacing_.value ());
00594         }
00595     }
00596 
00597   if (DEBUG_LEVEL  > 5)
00598     {
00599       ACE_DEBUG ((LM_DEBUG,
00600                   ACE_TEXT ("Consumer %d: scheduling pacing/retry for %dms.\n"),
00601                   static_cast<int> (this->proxy ()->id ()), tv.msec ()));
00602     }
00603 
00604   this->timer_id_ =
00605     this->timer_->schedule_timer (this, tv, ACE_Time_Value::zero);
00606   if (this->timer_id_ == -1)
00607     {
00608       ACE_ERROR ((LM_ERROR,
00609                   ACE_TEXT ("TAO_Notify_Consumer %d::schedule_timer () "
00610                             "Error scheduling timer.\n"),
00611                   static_cast<int> (this->proxy ()->id ())
00612                   ));
00613     }
00614 }

void TAO_Notify_Consumer::shutdown ( void   )  [virtual]

Shutdown the consumer.

Reimplemented from TAO_Notify_Peer.

Definition at line 649 of file Consumer.cpp.

References TAO_Notify_Refcountable_Guard_T< T >::reset(), and timer_.

00650 {
00651   if (this->timer_.isSet ())
00652     {
00653       this->cancel_timer ();
00654       this->timer_.reset ();
00655     }
00656 }

ACE_INLINE void TAO_Notify_Consumer::suspend ( void   ) 

Suspend Connection.

Definition at line 22 of file Consumer.inl.

References is_suspended_.

Referenced by TAO_Notify_ProxySupplier_T< SERVANT_TYPE >::suspend_connection().

00023 {
00024   this->is_suspended_ = 1;
00025 }


Member Data Documentation

bool TAO_Notify_Consumer::have_not_yet_verified_publish_ [protected]

Definition at line 169 of file Consumer.h.

Referenced by dispatch_updates_i().

CORBA::Boolean TAO_Notify_Consumer::is_suspended_ [protected]

Suspended Flag.

Definition at line 165 of file Consumer.h.

Referenced by is_suspended(), resume(), and suspend().

TAO_Notify_Property_Long TAO_Notify_Consumer::max_batch_size_ [protected]

Max. batch size.

Definition at line 175 of file Consumer.h.

Referenced by TAO_Notify_SequencePushConsumer::enqueue_if_necessary(), and qos_changed().

const TAO_Notify_Property_Time& TAO_Notify_Consumer::pacing_ [protected]

The Pacing Interval.

Definition at line 172 of file Consumer.h.

Referenced by TAO_Notify_SequencePushConsumer::enqueue_if_necessary().

ACE_Auto_Ptr< Request_Queue > TAO_Notify_Consumer::pending_events_ [private]

Events pending to be delivered.

Definition at line 186 of file Consumer.h.

Referenced by assume_pending_events(), pending_events(), and TAO_Notify_Consumer().

TAO_Notify_ProxySupplier* TAO_Notify_Consumer::proxy_ [protected]

The Proxy that we associate with.

Definition at line 162 of file Consumer.h.

Referenced by proxy_lock(), and proxy_supplier().

CosNotifyComm::NotifyPublish_var TAO_Notify_Consumer::publish_ [protected]

Interface that accepts offer_changes.

Definition at line 168 of file Consumer.h.

Referenced by dispatch_updates_i(), TAO_Notify_StructuredPushConsumer::init(), TAO_Notify_SequencePushConsumer::init(), and TAO_Notify_PushConsumer::init().

TAO_Notify_Timer::Ptr TAO_Notify_Consumer::timer_ [protected]

The Timer Manager that we use.

Definition at line 181 of file Consumer.h.

Referenced by assume_pending_events(), cancel_timer(), schedule_timer(), shutdown(), TAO_Notify_Consumer(), and ~TAO_Notify_Consumer().

long TAO_Notify_Consumer::timer_id_ [protected]

Timer Id.

Definition at line 178 of file Consumer.h.

Referenced by cancel_timer(), handle_timeout(), and schedule_timer().


The documentation for this class was generated from the following files:
Generated on Tue Feb 2 17:46:16 2010 for TAO_CosNotification by  doxygen 1.4.7