TAO_Notify_Consumer Class Reference

Astract Base class for wrapping consumer objects that connect to the EventChannel. More...

#include <Consumer.h>

Inheritance diagram for TAO_Notify_Consumer:

Inheritance graph
[legend]
Collaboration diagram for TAO_Notify_Consumer:

Collaboration graph
[legend]
List of all members.

Public Types

enum  DispatchStatus { DISPATCH_SUCCESS, DISPATCH_RETRY, DISPATCH_DISCARD, DISPATCH_FAIL }
 Status returned from dispatch attempts. More...


Public Member Functions

 TAO_Notify_Consumer (TAO_Notify_ProxySupplier *proxy)
 Constuctor.

virtual ~TAO_Notify_Consumer ()
 Destructor.

TAO_Notify_ProxySupplierproxy_supplier (void)
 Access Specific Proxy.

virtual TAO_Notify_Proxyproxy (void)
 Access Base Proxy.

void deliver (TAO_Notify_Method_Request_Event *request)
 Dispatch Event to consumer.

virtual void push (const CORBA::Any &event)=0
 Push to this consumer.

virtual void push (const CosNotification::StructuredEvent &event)=0
 Push to this consumer.

virtual void push (const CosNotification::EventBatch &event)=0
 Push a batch of events to this consumer.

DispatchStatus dispatch_batch (const CosNotification::EventBatch &batch)
 Dispatch the batch of events to the attached consumer.

void dispatch_pending ()
 Dispatch the pending events.

CORBA::Boolean is_suspended (void)
 Is the connection suspended?

void suspend ()
 Suspend Connection.

void resume ()
 Resume Connection.

virtual void shutdown ()
 Shutdown the consumer.

virtual void reconnect_from_consumer (TAO_Notify_Consumer *old_consumer)=0
virtual void qos_changed (const TAO_Notify_QoSProperties &qos_properties)
 Override, Peer::qos_changed.


Protected Types

typedef ACE_Unbounded_Queue<
TAO_Notify_Method_Request_Event_Queueable * > 
Request_Queue

Protected Member Functions

DispatchStatus dispatch_request (TAO_Notify_Method_Request_Event *request)
virtual bool dispatch_from_queue (Request_Queue &requests, ACE_Guard< TAO_SYNCH_MUTEX > &ace_mon)
 Attempt to dispatch event from a queue.

void enqueue_request (TAO_Notify_Method_Request_Event *request)
virtual bool enqueue_if_necessary (TAO_Notify_Method_Request_Event *request)
virtual void dispatch_updates_i (const CosNotification::EventTypeSeq &added, const CosNotification::EventTypeSeq &removed)
 Implementation of Peer specific dispatch_updates.

TAO_SYNCH_MUTEXproxy_lock (void)
 Get the shared Proxy Lock.

virtual int handle_timeout (const ACE_Time_Value &current_time, const void *act=0)
void schedule_timer (bool is_error=false)
 Schedule timer.

void cancel_timer (void)
 Cancel timer.

Request_Queuepending_events ()
 = Protected Data Members


Protected Attributes

TAO_Notify_ProxySupplierproxy_
 The Proxy that we associate with.

CORBA::Boolean is_suspended_
 Suspended Flag.

CosNotifyComm::NotifyPublish_var publish_
 Interface that accepts offer_changes.

const TAO_Notify_Property_Timepacing_
 The Pacing Interval.

TAO_Notify_Property_Long max_batch_size_
 Max. batch size.

long timer_id_
 Timer Id.

TAO_Notify_Timer::Ptr timer_
 The Timer Manager that we use.


Private Attributes

ACE_Auto_Ptr< Request_Queuepending_events_
 Events pending to be delivered.


Detailed Description

Astract Base class for wrapping consumer objects that connect to the EventChannel.

Definition at line 44 of file Consumer.h.


Member Typedef Documentation

typedef ACE_Unbounded_Queue<TAO_Notify_Method_Request_Event_Queueable *> TAO_Notify_Consumer::Request_Queue [protected]
 

Definition at line 110 of file Consumer.h.

Referenced by dispatch_from_queue(), and TAO_Notify_Consumer().


Member Enumeration Documentation

enum TAO_Notify_Consumer::DispatchStatus
 

Status returned from dispatch attempts.

Enumeration values:
DISPATCH_SUCCESS 
DISPATCH_RETRY 
DISPATCH_DISCARD 
DISPATCH_FAIL 

Definition at line 51 of file Consumer.h.

Referenced by deliver(), dispatch_batch(), dispatch_from_queue(), and dispatch_request().

00051                       {
00052     DISPATCH_SUCCESS,
00053     DISPATCH_RETRY,   // retry this message
00054     DISPATCH_DISCARD, // discard this message
00055     DISPATCH_FAIL};   // discard all messages and disconnect consumer


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_Notify_Consumer::TAO_Notify_Consumer TAO_Notify_ProxySupplier proxy  ) 
 

Constuctor.

Definition at line 31 of file Consumer.cpp.

References ACE_NEW, pending_events(), pending_events_, Request_Queue, and ACE_Auto_Basic_Ptr< X >::reset().

00032 : proxy_ (proxy)
00033 , is_suspended_ (0)
00034 , pacing_ (proxy->qos_properties_.pacing_interval ())
00035 , max_batch_size_ (CosNotification::MaximumBatchSize, 0)
00036 , timer_id_ (-1)
00037 , timer_ (0)
00038 {
00039   Request_Queue* pending_events = 0;
00040   ACE_NEW (pending_events, TAO_Notify_Consumer::Request_Queue ());
00041   this->pending_events_.reset( pending_events );
00042 
00043   this->timer_.reset( this->proxy ()->timer () );
00044 }

TAO_Notify_Consumer::~TAO_Notify_Consumer  )  [virtual]
 

Destructor.

Definition at line 46 of file Consumer.cpp.

References cancel_timer().

00047 {
00048   if (this->timer_.isSet())
00049   {
00050     this->cancel_timer ();
00051     this->timer_.reset ();
00052   }
00053 }


Member Function Documentation

void TAO_Notify_Consumer::cancel_timer void   )  [protected]
 

Cancel timer.

Definition at line 638 of file Consumer.cpp.

References ACE_DEBUG, ACE_TEXT(), DEBUG_LEVEL, and LM_DEBUG.

Referenced by shutdown(), and ~TAO_Notify_Consumer().

00639 {
00640   if (this->timer_.isSet() && this->timer_id_ != -1)
00641     {
00642       if (DEBUG_LEVEL  > 5)
00643         ACE_DEBUG ((LM_DEBUG,
00644                     ACE_TEXT ("Consumer %d canceling dispatch timer.\n"),
00645                     static_cast<int> (this->proxy ()->id ())
00646                     ));
00647 
00648       this->timer_->cancel_timer (timer_id_);
00649     }
00650   this->timer_id_ = -1;
00651 }

void TAO_Notify_Consumer::deliver TAO_Notify_Method_Request_Event request  ) 
 

Dispatch Event to consumer.

Definition at line 150 of file Consumer.cpp.

References ACE_CATCHANY, ACE_CHECK, ACE_DEBUG, ACE_DECLARE_NEW_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TEXT(), ACE_TRY, ACE_TRY_CHECK, DEBUG_LEVEL, TAO_Notify_ProxySupplier::destroy(), DISPATCH_DISCARD, DISPATCH_FAIL, dispatch_request(), DISPATCH_RETRY, DISPATCH_SUCCESS, DispatchStatus, enqueue_if_necessary(), enqueue_request(), LM_DEBUG, proxy_supplier(), and schedule_timer().

Referenced by TAO_Notify_Method_Request_Dispatch::execute_i().

00152 {
00153   // Increment reference counts (safely) to prevent this object and its proxy
00154   // from being deleted while the push is in progress.
00155   TAO_Notify_Proxy::Ptr proxy_guard (this->proxy ());
00156   bool queued = enqueue_if_necessary (request ACE_ENV_ARG_PARAMETER);
00157   ACE_CHECK;
00158   if (!queued)
00159     {
00160       DispatchStatus status = this->dispatch_request (request);
00161       switch (status)
00162         {
00163         case DISPATCH_SUCCESS:
00164           {
00165             request->complete ();
00166             break;
00167           }
00168         case DISPATCH_RETRY:
00169           {
00170             if (DEBUG_LEVEL > 1)
00171               ACE_DEBUG ((LM_DEBUG,
00172                           ACE_TEXT ("Consumer %d enqueing event %d due ")
00173                           ACE_TEXT ("to failed dispatch.\n"),
00174                           static_cast<int> (this->proxy ()->id ()),
00175                           request->sequence ()));
00176             this->enqueue_request (request ACE_ENV_ARG_PARAMETER);
00177             ACE_CHECK;
00178             this->schedule_timer (true);
00179             break;
00180           }
00181         case DISPATCH_DISCARD:
00182           {
00183             if (DEBUG_LEVEL  > 0)
00184               ACE_DEBUG ((LM_DEBUG,
00185                           ACE_TEXT ("(%P|%t) Consumer %d: Error during "
00186                                     "direct dispatch. Discarding event:%d.\n"),
00187                           static_cast<int> (this->proxy ()->id ()),
00188                           request->sequence ()
00189                           ));
00190             request->complete ();
00191             break;
00192           }
00193         case DISPATCH_FAIL:
00194           {
00195             if (DEBUG_LEVEL  > 0)
00196               ACE_DEBUG ((LM_DEBUG,
00197                           ACE_TEXT ("(%P|%t) Consumer %d: Failed during "
00198                                     "direct dispatch :%d. Discarding event.\n"),
00199                           static_cast<int> (this->proxy ()->id ()),
00200                           request->sequence ()
00201                           ));
00202             request->complete ();
00203             ACE_DECLARE_NEW_ENV;
00204             ACE_TRY
00205               {
00206                 this->proxy_supplier ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00207                 ACE_TRY_CHECK;
00208               }
00209             ACE_CATCHANY
00210               {
00211                 // todo is there something meaningful we can do here?
00212                 ;
00213               }
00214             ACE_ENDTRY;
00215             break;
00216           }
00217         }
00218     }
00219 }

TAO_Notify_Consumer::DispatchStatus TAO_Notify_Consumer::dispatch_batch const CosNotification::EventBatch batch  ) 
 

Dispatch the batch of events to the attached consumer.

Definition at line 359 of file Consumer.cpp.

References ACE_CATCH, ACE_CATCHANY, ACE_DEBUG, ACE_DECLARE_NEW_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_TEXT(), ACE_TRY, ACE_TRY_CHECK, DEBUG_LEVEL, DISPATCH_DISCARD, DISPATCH_FAIL, DISPATCH_RETRY, DISPATCH_SUCCESS, DispatchStatus, CosNotification::EventBatch, LM_ERROR, push(), TAO_INVOCATION_SEND_REQUEST_MINOR_CODE, TAO_POA_DISCARDING, and TAO_POA_HOLDING.

Referenced by TAO_Notify_SequencePushConsumer::dispatch_from_queue().

00360 {
00361   DispatchStatus result = DISPATCH_SUCCESS;
00362   ACE_DECLARE_NEW_ENV;
00363   ACE_TRY
00364     {
00365       this->push (batch ACE_ENV_ARG_PARAMETER);
00366       ACE_TRY_CHECK;
00367     }
00368   ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00369     {
00370       if (DEBUG_LEVEL  > 0)
00371         ACE_DEBUG ((LM_ERROR,
00372                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00373                               "%d::dispatch_batch() %s\n"),
00374                     static_cast<int> (this->proxy ()->id ()),
00375                     ex._info ().c_str ()
00376                     ));
00377       result = DISPATCH_FAIL;
00378     }
00379   ACE_CATCH (CORBA::TRANSIENT, ex)
00380     {
00381       if (DEBUG_LEVEL  > 0)
00382         ACE_DEBUG ((LM_ERROR,
00383                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00384                               "%d::dispatch_batch() Transient (minor=%d) %s\n"),
00385                     static_cast<int> (this->proxy ()->id ()),
00386                     ex.minor (),
00387                     ex._info ().c_str ()
00388                     ));
00389       const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u;
00390       switch (ex.minor () & 0xfffff000u)
00391         {
00392         case CORBA::OMGVMCID:
00393           switch (ex.minor () & 0x00000fffu)
00394             {
00395             case 2: // No usable profile
00396             case 3: // Request cancelled
00397             case 4: // POA destroyed
00398               result = DISPATCH_FAIL;
00399               break;
00400             default:
00401               result = DISPATCH_DISCARD;
00402             }
00403           break;
00404 
00405         case TAO::VMCID:
00406         default:
00407           switch (ex.minor () & BITS_5_THRU_12_MASK)
00408             {
00409             case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE:
00410               result = DISPATCH_FAIL;
00411               break;
00412             case TAO_POA_DISCARDING:
00413             case TAO_POA_HOLDING:
00414             default:
00415               result = DISPATCH_RETRY;
00416             } break;
00417         }
00418     }
00419   ACE_CATCH (CORBA::TIMEOUT, ex)
00420     {
00421       if (DEBUG_LEVEL  > 0)
00422         ACE_DEBUG ((LM_ERROR,
00423                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00424                               "%u::dispatch_batch() %s\n"),
00425                     this->proxy ()->id (),
00426                     ex._info().c_str ()
00427                     ));
00428       result = DISPATCH_FAIL;
00429     }
00430   ACE_CATCH (CORBA::COMM_FAILURE, ex)
00431     {
00432       if (DEBUG_LEVEL  > 0)
00433         ACE_DEBUG ((LM_ERROR,
00434                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00435                               "%u::dispatch_batch() %s\n"),
00436                     this->proxy ()->id (),
00437                     ex._info().c_str ()
00438                     ));
00439       result = DISPATCH_FAIL;
00440     }
00441   ACE_CATCH (CORBA::SystemException, ex)
00442     {
00443       if (DEBUG_LEVEL  > 0)
00444         {
00445           ACE_DEBUG ((LM_ERROR,
00446                       ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00447                                 "%d::dispatch_batch() SystemException %s\n"),
00448                       static_cast<int>  (this->proxy ()->id ()),
00449                       ex._info ().c_str ()
00450                       ));
00451         }
00452       result = DISPATCH_DISCARD;
00453     }
00454   ACE_CATCHANY
00455     {
00456       ACE_ERROR ((LM_ERROR,
00457                   ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00458                             "%d::dispatch_batch() Caught unexpected "
00459                             "exception pushing batch to consumer.\n"),
00460                   static_cast<int> (this->proxy ()->id ())
00461                   ));
00462       result = DISPATCH_DISCARD;
00463     }
00464   ACE_ENDTRY;
00465   return result;
00466 }

bool TAO_Notify_Consumer::dispatch_from_queue Request_Queue requests,
ACE_Guard< TAO_SYNCH_MUTEX > &  ace_mon
[protected, virtual]
 

Attempt to dispatch event from a queue.

Called by dispatch_pending. Deliver one or more events to the Consumer. If delivery fails, events are left in the queue (or discarded depending on QoS parameters.) Undelivered, undiscarded requests are left at the front of the queue. Overridden in sequence consumer to dispatch as an EventBatch.

Returns:
false if delivery failed and the request(s) cannot be discarded.

Reimplemented in TAO_Notify_SequencePushConsumer.

Definition at line 499 of file Consumer.cpp.

References ACE_CATCHANY, ACE_DEBUG, ACE_DECLARE_NEW_ENV, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TEXT(), ACE_TRY, ACE_TRY_CHECK, ACE_Guard< ACE_LOCK >::acquire(), TAO_Notify_Method_Request_Event::complete(), DEBUG_LEVEL, ACE_Unbounded_Queue< T >::dequeue_head(), TAO_Notify_ProxySupplier::destroy(), DISPATCH_DISCARD, DISPATCH_FAIL, dispatch_request(), DISPATCH_RETRY, DISPATCH_SUCCESS, DispatchStatus, ACE_Unbounded_Queue< T >::enqueue_head(), LM_DEBUG, proxy_supplier(), ACE_Message_Block::release(), ACE_Guard< ACE_LOCK >::release(), Request_Queue, and TAO_Notify_Method_Request_Event::sequence().

Referenced by dispatch_pending().

00500 {
00501   bool result = true;
00502   TAO_Notify_Method_Request_Event_Queueable * request;
00503   if (requests.dequeue_head (request) == 0)
00504     {
00505       ace_mon.release ();
00506       DispatchStatus status = this->dispatch_request (request);
00507       switch (status)
00508         {
00509         case DISPATCH_SUCCESS:
00510           {
00511             request->complete ();
00512             request->release ();
00513             result = true;
00514             ace_mon.acquire ();
00515             break;
00516           }
00517         case DISPATCH_RETRY:
00518           {
00519             if (DEBUG_LEVEL  > 0)
00520               ACE_DEBUG ((LM_DEBUG,
00521                           ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00522                           static_cast<int> (this->proxy ()->id ()),
00523                           request->sequence ()
00524                           ));
00525             ace_mon.acquire ();
00526             requests.enqueue_head (request); // put the failed event back where it was
00527             result = false;
00528             break;
00529           }
00530         case DISPATCH_DISCARD:
00531           {
00532             if (DEBUG_LEVEL  > 0)
00533               ACE_DEBUG ((LM_DEBUG,
00534                           ACE_TEXT ("(%P|%t) Consumer %d: Error during "
00535                                     "dispatch. Discarding event:%d.\n"),
00536                           static_cast<int> (this->proxy ()->id ()),
00537                           request->sequence ()
00538                           ));
00539             request->complete ();
00540             ace_mon.acquire ();
00541             result = true;
00542             break;
00543           }
00544         case DISPATCH_FAIL:
00545           {
00546             if (DEBUG_LEVEL  > 0)
00547               ACE_DEBUG ((LM_DEBUG,
00548                           ACE_TEXT ("(%P|%t) Consumer %d: Failed. "
00549                                     "Discarding event %d.\n"),
00550                           static_cast<int> (this->proxy ()->id ()),
00551                           request->sequence ()
00552                           ));
00553             request->complete ();
00554             ace_mon.acquire ();
00555             while (requests.dequeue_head (request) == 0)
00556               {
00557                 ace_mon.release ();
00558                 request->complete ();
00559                 ace_mon.acquire ();
00560               }
00561             ace_mon.release ();
00562             ACE_DECLARE_NEW_ENV;
00563             ACE_TRY
00564               {
00565                 this->proxy_supplier ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00566                 ACE_TRY_CHECK;
00567               }
00568             ACE_CATCHANY
00569               {
00570                 // todo is there something reasonable to do here?
00571               }
00572             ACE_ENDTRY;
00573             ace_mon.acquire ();
00574             result = true;
00575             break;
00576           }
00577         default:
00578           {
00579             ace_mon.acquire ();
00580             result = false;
00581             break;
00582           }
00583         }
00584     }
00585   return result;
00586 }

void TAO_Notify_Consumer::dispatch_pending  ) 
 

Dispatch the pending events.

Definition at line 469 of file Consumer.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), DEBUG_LEVEL, dispatch_from_queue(), TAO_Notify_Object::has_shutdown(), ACE_Unbounded_Queue< T >::is_empty(), LM_DEBUG, pending_events(), proxy_supplier(), schedule_timer(), ACE_Unbounded_Queue< T >::size(), and TAO_SYNCH_MUTEX.

Referenced by TAO_Notify_SequencePushConsumer::enqueue_if_necessary(), handle_timeout(), and resume().

00470 {
00471   if (DEBUG_LEVEL  > 5)
00472     ACE_DEBUG ((LM_DEBUG,
00473                 ACE_TEXT ("Consumer %d dispatching pending events.  Queue size: %d\n"),
00474                 static_cast<int> (this->proxy ()->id ()),
00475                 this->pending_events().size ()
00476                 ));
00477 
00478   // lock ourselves in memory for the duration
00479   TAO_Notify_Consumer::Ptr self_grd (this);
00480 
00481   // dispatch events until: 1) the queue is empty; 2) the proxy shuts down, or 3) the dispatch fails
00482   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
00483   bool ok = true;
00484   while (ok
00485          && !this->proxy_supplier ()->has_shutdown ()
00486          && !this->pending_events().is_empty ())
00487     {
00488       if (! dispatch_from_queue ( this->pending_events(), ace_mon))
00489         {
00490           this->schedule_timer (true);
00491           ok = false;
00492         }
00493     }
00494 }

TAO_Notify_Consumer::DispatchStatus TAO_Notify_Consumer::dispatch_request TAO_Notify_Method_Request_Event request  )  [protected]
 

Definition at line 222 of file Consumer.cpp.

References ACE_CATCH, ACE_CATCHANY, ACE_DEBUG, ACE_DECLARE_NEW_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_TEXT(), ACE_TRY, ACE_TRY_CHECK, DEBUG_LEVEL, DISPATCH_DISCARD, DISPATCH_FAIL, DISPATCH_RETRY, DISPATCH_SUCCESS, DispatchStatus, TAO_Notify_Method_Request_Event::event(), LM_DEBUG, LM_ERROR, TAO_Notify_Event::push(), TAO_Notify_Method_Request_Event::sequence(), TAO_Notify_Method_Request_Event::should_retry(), TAO_INVOCATION_SEND_REQUEST_MINOR_CODE, TAO_POA_DISCARDING, and TAO_POA_HOLDING.

Referenced by deliver(), and dispatch_from_queue().

00223 {
00224   DispatchStatus result = DISPATCH_SUCCESS;
00225   ACE_DECLARE_NEW_ENV;
00226   ACE_TRY
00227     {
00228       request->event ()->push (this ACE_ENV_ARG_PARAMETER);
00229       ACE_TRY_CHECK;
00230       if (DEBUG_LEVEL  > 8)
00231         ACE_DEBUG ((LM_DEBUG,
00232                     ACE_TEXT ("Consumer %d dispatched single event %d.\n"),
00233                     static_cast<int> (this->proxy ()->id ()),
00234                     request->sequence ()
00235                     ));
00236     }
00237   ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00238     {
00239       if (DEBUG_LEVEL  > 0)
00240         {
00241           ACE_DEBUG ((LM_ERROR,
00242                       ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00243                                 "(request) %s\n"),
00244                       static_cast<int> (this->proxy ()->id ()),
00245                       ex._info ().c_str ()
00246                       ));
00247         }
00248       result = DISPATCH_FAIL;
00249     }
00250   ACE_CATCH (CORBA::TRANSIENT, ex)
00251     {
00252       if (DEBUG_LEVEL  > 0)
00253         ACE_DEBUG ((LM_ERROR,
00254                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00255                               "(request) Transient (minor=%d) %s\n"),
00256                     static_cast<int> (this->proxy ()->id ()),
00257                     ex.minor (),
00258                     ex._info ().c_str ()
00259                     ));
00260       const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u;
00261       switch (ex.minor () & 0xfffff000u)
00262         {
00263         case CORBA::OMGVMCID:
00264           switch (ex.minor () & 0x00000fffu)
00265             {
00266             case 2: // No usable profile
00267             case 3: // Request cancelled
00268             case 4: // POA destroyed
00269               result = DISPATCH_FAIL;
00270               break;
00271             default:
00272               result = DISPATCH_DISCARD;
00273             }
00274           break;
00275 
00276         case TAO::VMCID:
00277         default:
00278           switch (ex.minor () & BITS_5_THRU_12_MASK)
00279             {
00280             case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE:
00281               result = DISPATCH_FAIL;
00282               break;
00283             case TAO_POA_DISCARDING:
00284             case TAO_POA_HOLDING:
00285             default:
00286               result = DISPATCH_RETRY;
00287             } break;
00288         }
00289     }
00290   ACE_CATCH (CORBA::TIMEOUT, ex)
00291     {
00292       if (DEBUG_LEVEL  > 0)
00293         ACE_DEBUG ((LM_ERROR,
00294                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %u::push "
00295                               "(request) %s\n"),
00296                     this->proxy ()->id (),
00297                     ex._info().c_str ()
00298                     ));
00299       result = DISPATCH_FAIL;
00300     }
00301   ACE_CATCH (CORBA::COMM_FAILURE, ex)
00302     {
00303       if (DEBUG_LEVEL  > 0)
00304         ACE_DEBUG ((LM_ERROR,
00305                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %u::push "
00306                               "(request) %s\n"),
00307                     this->proxy ()->id (),
00308                     ex._info().c_str ()
00309                     ));
00310       result = DISPATCH_FAIL;
00311     }
00312   ACE_CATCH (CORBA::SystemException, ex)
00313     {
00314       if (DEBUG_LEVEL  > 0)
00315         {
00316           ACE_DEBUG ((LM_ERROR,
00317                       ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00318                                 "(request) SystemException %s\n"),
00319                       static_cast<int> (this->proxy ()->id ()),
00320                       ex._info ().c_str ()
00321                       ));
00322         }
00323       result = DISPATCH_DISCARD;
00324     }
00325   ACE_CATCHANY
00326     {
00327       ACE_ERROR ( (LM_ERROR,
00328                    ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00329                              "(request) Caught unexpected exception "
00330                              "pushing event to consumer.\n"),
00331                    static_cast<int> (this->proxy ()->id ())
00332                    ));
00333       result = DISPATCH_DISCARD;
00334     }
00335   ACE_ENDTRY;
00336 
00337   // for persistent events that haven't timed out
00338   // convert "FAIL" & "DISCARD" to "RETRY"
00339   // for transient events, convert RETRY to DISCARD (hey, best_effort.)
00340   if (result == DISPATCH_FAIL || result == DISPATCH_DISCARD)
00341     {
00342       if (request->should_retry ())
00343         {
00344           result = DISPATCH_RETRY;
00345         }
00346     }
00347   else if (result == DISPATCH_RETRY)
00348     {
00349       if (! request->should_retry ())
00350         {
00351           result = DISPATCH_DISCARD;
00352         }
00353     }
00354 
00355   return result;
00356 }

void TAO_Notify_Consumer::dispatch_updates_i const CosNotification::EventTypeSeq added,
const CosNotification::EventTypeSeq removed
[protected, virtual]
 

Implementation of Peer specific dispatch_updates.

Implements TAO_Notify_Peer.

Definition at line 683 of file Consumer.cpp.

References ACE_ENV_ARG_PARAMETER, CosNotification::EventTypeSeq, CORBA::is_nil(), and publish_.

00685 {
00686   if (!CORBA::is_nil (this->publish_.in ()))
00687     this->publish_->offer_change (added, removed ACE_ENV_ARG_PARAMETER);
00688 }

bool TAO_Notify_Consumer::enqueue_if_necessary TAO_Notify_Method_Request_Event request  )  [protected, virtual]
 

Add request to a queue if necessary. Overridden by sequence consumer to "always" put incoming events into the queue.

Returns:
true the request has been enqueued; false the request should be handled now.

Reimplemented in TAO_Notify_SequencePushConsumer.

Definition at line 101 of file Consumer.cpp.

References ACE_CHECK_RETURN, ACE_DEBUG, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_GUARD_RETURN, ACE_NEW_THROW_EX, ACE_TEXT(), DEBUG_LEVEL, ACE_Unbounded_Queue< T >::enqueue_tail(), ACE_Unbounded_Queue< T >::is_empty(), is_suspended_, LM_DEBUG, pending_events(), schedule_timer(), and TAO_SYNCH_MUTEX.

Referenced by deliver().

00102 {
00103   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock (), false);
00104   if (! this->pending_events().is_empty ())
00105     {
00106       if (DEBUG_LEVEL > 3)
00107         ACE_DEBUG ((LM_DEBUG,
00108                     ACE_TEXT ("Consumer %d: enqueuing another event. %d\n"),
00109                     static_cast<int> (this->proxy ()->id ()),
00110                     request->sequence ()
00111                     ));
00112       TAO_Notify_Event::Ptr event (
00113         request->event ()->queueable_copy (ACE_ENV_SINGLE_ARG_PARAMETER));
00114       ACE_CHECK_RETURN (false);
00115       TAO_Notify_Method_Request_Event_Queueable * queue_entry;
00116       ACE_NEW_THROW_EX (queue_entry,
00117                         TAO_Notify_Method_Request_Event_Queueable (*request,
00118                                                                    event),
00119                         CORBA::NO_MEMORY ());
00120       ACE_CHECK_RETURN (false);
00121       this->pending_events().enqueue_tail (queue_entry);
00122       this->schedule_timer (false);
00123       return true;
00124     }
00125   if (this->is_suspended_ == 1)
00126     {
00127       if (DEBUG_LEVEL > 3)
00128         ACE_DEBUG ((LM_DEBUG,
00129                     ACE_TEXT ("Suspended Consumer %d enqueing event. %d\n"),
00130                     static_cast<int> (this->proxy ()->id ()),
00131                     request->sequence ()
00132                     ));
00133       TAO_Notify_Event::Ptr event (
00134         request->event ()->queueable_copy (ACE_ENV_SINGLE_ARG_PARAMETER));
00135       ACE_CHECK_RETURN (false);
00136       TAO_Notify_Method_Request_Event_Queueable * queue_entry;
00137       ACE_NEW_THROW_EX (queue_entry,
00138                         TAO_Notify_Method_Request_Event_Queueable (*request,
00139                                                                    event),
00140                         CORBA::NO_MEMORY ());
00141       ACE_CHECK_RETURN (false);
00142       this->pending_events().enqueue_tail (queue_entry);
00143       this->schedule_timer (false);
00144       return true;
00145     }
00146   return false;
00147 }

void TAO_Notify_Consumer::enqueue_request TAO_Notify_Method_Request_Event request  )  [protected]
 

Definition at line 76 of file Consumer.cpp.

References ACE_CHECK, ACE_DEBUG, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_GUARD, ACE_NEW_THROW_EX, ACE_TEXT(), DEBUG_LEVEL, ACE_Unbounded_Queue< T >::enqueue_tail(), LM_DEBUG, pending_events(), TAO_Notify_Method_Request_Event::sequence(), and TAO_SYNCH_MUTEX.

Referenced by deliver(), and TAO_Notify_SequencePushConsumer::enqueue_if_necessary().

00079 {
00080   TAO_Notify_Event::Ptr event (
00081     request->event ()->queueable_copy (ACE_ENV_SINGLE_ARG_PARAMETER));
00082   ACE_CHECK;
00083 
00084   TAO_Notify_Method_Request_Event_Queueable * queue_entry;
00085   ACE_NEW_THROW_EX (queue_entry,
00086     TAO_Notify_Method_Request_Event_Queueable (*request, event),
00087     CORBA::NO_MEMORY ());
00088   ACE_CHECK;
00089 
00090   if (DEBUG_LEVEL  > 3) ACE_DEBUG ( (LM_DEBUG,
00091     ACE_TEXT ("Consumer %d: enqueue_request (%d) @%@.\n"),
00092     static_cast<int> (this->proxy ()->id ()),
00093     request->sequence (),
00094     request
00095     ));
00096   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
00097   this->pending_events().enqueue_tail (queue_entry);
00098 }

int TAO_Notify_Consumer::handle_timeout const ACE_Time_Value current_time,
const void *  act = 0
[protected, virtual]
 

Reimplemented from ACE_Event_Handler.

Definition at line 654 of file Consumer.cpp.

References ACE_CATCHALL, ACE_DECLARE_NEW_ENV, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, and dispatch_pending().

00655 {
00656   TAO_Notify_Consumer::Ptr grd (this);
00657   this->timer_id_ = -1;  // This must come first, because dispatch_pending may try to resched
00658   ACE_DECLARE_NEW_ENV;
00659   ACE_TRY
00660     {
00661       this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER);
00662       ACE_TRY_CHECK;
00663     }
00664   ACE_CATCHALL
00665     {
00666     }
00667   ACE_ENDTRY;
00668 
00669   return 0;
00670 }

ACE_INLINE CORBA::Boolean TAO_Notify_Consumer::is_suspended void   ) 
 

Is the connection suspended?

Definition at line 16 of file Consumer.inl.

References is_suspended_.

Referenced by schedule_timer().

00017 {
00018   return this->is_suspended_;
00019 }

TAO_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE TAO_Notify_Consumer::Request_Queue & TAO_Notify_Consumer::pending_events  )  [protected]
 

= Protected Data Members

Definition at line 9 of file Consumer.inl.

References ACE_ASSERT, ACE_Auto_Basic_Ptr< X >::get(), and pending_events_.

Referenced by dispatch_pending(), TAO_Notify_SequencePushConsumer::enqueue_if_necessary(), enqueue_if_necessary(), enqueue_request(), and TAO_Notify_Consumer().

00010 {
00011   ACE_ASSERT( pending_events_.get() != 0 );
00012   return *pending_events_;
00013 }

TAO_Notify_Proxy * TAO_Notify_Consumer::proxy void   )  [virtual]
 

Access Base Proxy.

Implements TAO_Notify_Peer.

Definition at line 56 of file Consumer.cpp.

References proxy_supplier().

00057 {
00058   return this->proxy_supplier ();
00059 }

TAO_SYNCH_MUTEX * TAO_Notify_Consumer::proxy_lock void   )  [protected]
 

Get the shared Proxy Lock.

Definition at line 691 of file Consumer.cpp.

References TAO_Notify_Object::lock_.

00692 {
00693   return &this->proxy_->lock_;
00694 }

TAO_Notify_ProxySupplier * TAO_Notify_Consumer::proxy_supplier void   ) 
 

Access Specific Proxy.

Definition at line 697 of file Consumer.cpp.

Referenced by deliver(), TAO_Notify_SequencePushConsumer::dispatch_from_queue(), dispatch_from_queue(), dispatch_pending(), and proxy().

00698 {
00699   return this->proxy_;
00700 }

virtual void TAO_Notify_Consumer::push const CosNotification::EventBatch event  )  [pure virtual]
 

Push a batch of events to this consumer.

Implemented in TAO_Notify_PushConsumer, TAO_Notify_SequencePushConsumer, and TAO_Notify_StructuredPushConsumer.

virtual void TAO_Notify_Consumer::push const CosNotification::StructuredEvent event  )  [pure virtual]
 

Push to this consumer.

Implemented in TAO_Notify_PushConsumer, TAO_Notify_SequencePushConsumer, and TAO_Notify_StructuredPushConsumer.

virtual void TAO_Notify_Consumer::push const CORBA::Any &  event  )  [pure virtual]
 

Push to this consumer.

Implemented in TAO_Notify_PushConsumer, TAO_Notify_SequencePushConsumer, and TAO_Notify_StructuredPushConsumer.

Referenced by dispatch_batch().

void TAO_Notify_Consumer::qos_changed const TAO_Notify_QoSProperties qos_properties  )  [virtual]
 

Override, Peer::qos_changed.

Reimplemented from TAO_Notify_Peer.

Definition at line 62 of file Consumer.cpp.

References max_batch_size_, and TAO_Notify_QoSProperties::maximum_batch_size().

00063 {
00064   this->max_batch_size_ = qos_properties.maximum_batch_size ();
00065 }

virtual void TAO_Notify_Consumer::reconnect_from_consumer TAO_Notify_Consumer old_consumer  )  [pure virtual]
 

on reconnect we need to move events from the old consumer to the new one

Implemented in TAO_Notify_PushConsumer, TAO_Notify_SequencePushConsumer, and TAO_Notify_StructuredPushConsumer.

void TAO_Notify_Consumer::resume  ) 
 

Resume Connection.

Definition at line 68 of file Consumer.cpp.

References ACE_ENV_SINGLE_ARG_PARAMETER, dispatch_pending(), and is_suspended_.

00069 {
00070   this->is_suspended_ = 0;
00071 
00072   this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER);
00073 }

void TAO_Notify_Consumer::schedule_timer bool  is_error = false  )  [protected]
 

Schedule timer.

Definition at line 591 of file Consumer.cpp.

References ACE_ASSERT, ACE_DEBUG, ACE_ERROR, ACE_TEXT(), DEBUG_LEVEL, DEFAULT_RETRY_TIMEOUT, is_suspended(), TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, LM_ERROR, ACE_Time_Value::msec(), and pacing_.

Referenced by deliver(), dispatch_pending(), TAO_Notify_SequencePushConsumer::enqueue_if_necessary(), enqueue_if_necessary(), TAO_Notify_StructuredPushConsumer::reconnect_from_consumer(), TAO_Notify_SequencePushConsumer::reconnect_from_consumer(), and TAO_Notify_PushConsumer::reconnect_from_consumer().

00592 {
00593   if (this->timer_id_ != -1)
00594     {
00595       return; // We only want a single timeout scheduled.
00596     }
00597   // don't schedule timer if there's nothing that can be done
00598   if (this->is_suspended ())
00599     {
00600       return;
00601     }
00602 
00603   ACE_ASSERT (this->timer_.get() != 0);
00604 
00605   // If we're scheduling the timer due to an error then we want to
00606   // use the retry timeout, otherwise we'll assume that the pacing
00607   // interval is sufficient for now.
00608   ACE_Time_Value tv (DEFAULT_RETRY_TIMEOUT);
00609 
00610   if (! is_error)
00611     {
00612       if (this->pacing_.is_valid ())
00613         {
00614           tv = ORBSVCS_Time::to_Time_Value (this->pacing_.value ());
00615         }
00616     }
00617 
00618   if (DEBUG_LEVEL  > 5)
00619     {
00620       ACE_DEBUG ((LM_DEBUG,
00621                   ACE_TEXT ("Consumer %d: scheduling pacing/retry for %dms.\n"),
00622                   static_cast<int> (this->proxy ()->id ()), tv.msec ()));
00623     }
00624 
00625   this->timer_id_ =
00626     this->timer_->schedule_timer (this, tv, ACE_Time_Value::zero);
00627   if (this->timer_id_ == -1)
00628     {
00629       ACE_ERROR ((LM_ERROR,
00630                   ACE_TEXT ("TAO_Notify_Consumer %d::schedule_timer () "
00631                             "Error scheduling timer.\n"),
00632                   static_cast<int> (this->proxy ()->id ())
00633                   ));
00634     }
00635 }

void TAO_Notify_Consumer::shutdown  )  [virtual]
 

Shutdown the consumer.

Reimplemented from TAO_Notify_Peer.

Definition at line 673 of file Consumer.cpp.

References cancel_timer().

00674 {
00675   if (this->timer_.isSet ())
00676     {
00677       this->cancel_timer ();
00678       this->timer_.reset ();
00679     }
00680 }

void TAO_Notify_Consumer::suspend  ) 
 

Suspend Connection.


Member Data Documentation

CORBA::Boolean TAO_Notify_Consumer::is_suspended_ [protected]
 

Suspended Flag.

Definition at line 164 of file Consumer.h.

Referenced by enqueue_if_necessary(), is_suspended(), and resume().

TAO_Notify_Property_Long TAO_Notify_Consumer::max_batch_size_ [protected]
 

Max. batch size.

Definition at line 173 of file Consumer.h.

Referenced by qos_changed().

const TAO_Notify_Property_Time& TAO_Notify_Consumer::pacing_ [protected]
 

The Pacing Interval.

Definition at line 170 of file Consumer.h.

Referenced by schedule_timer().

ACE_Auto_Ptr< Request_Queue > TAO_Notify_Consumer::pending_events_ [private]
 

Events pending to be delivered.

Definition at line 184 of file Consumer.h.

Referenced by pending_events(), and TAO_Notify_Consumer().

TAO_Notify_ProxySupplier* TAO_Notify_Consumer::proxy_ [protected]
 

The Proxy that we associate with.

Definition at line 161 of file Consumer.h.

CosNotifyComm::NotifyPublish_var TAO_Notify_Consumer::publish_ [protected]
 

Interface that accepts offer_changes.

Definition at line 167 of file Consumer.h.

Referenced by dispatch_updates_i().

TAO_Notify_Timer::Ptr TAO_Notify_Consumer::timer_ [protected]
 

The Timer Manager that we use.

Definition at line 179 of file Consumer.h.

long TAO_Notify_Consumer::timer_id_ [protected]
 

Timer Id.

Definition at line 176 of file Consumer.h.


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 13:31:43 2006 for TAO_CosNotification by doxygen 1.3.6