TAO_Notify_SequencePushConsumer Class Reference

#include <SequencePushConsumer.h>

Inheritance diagram for TAO_Notify_SequencePushConsumer:

Inheritance graph
[legend]
Collaboration diagram for TAO_Notify_SequencePushConsumer:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TAO_Notify_SequencePushConsumer (TAO_Notify_ProxySupplier *proxy)
virtual ~TAO_Notify_SequencePushConsumer ()
void init (CosNotifyComm::SequencePushConsumer_ptr push_consumer)
 Init the Consumer.

virtual bool enqueue_if_necessary (TAO_Notify_Method_Request_Event *request)
virtual bool dispatch_from_queue (Request_Queue &requests, ACE_Guard< TAO_SYNCH_MUTEX > &ace_mon)
 Attempt to dispatch event from a queue.

virtual void push (const CORBA::Any &event)
 Push to this consumer.

virtual void push (const CosNotification::StructuredEvent &event)
 Push to this consumer.

virtual void push (const CosNotification::EventBatch &event)
 Push a batch of events to this consumer.

virtual ACE_CString get_ior (void) const
 Retrieve the ior of this peer.

virtual void reconnect_from_consumer (TAO_Notify_Consumer *old_consumer)

Protected Attributes

CosNotifyComm::SequencePushConsumer_var push_consumer_
 The Consumer.


Private Member Functions

virtual void release (void)
 TAO_Notify_Destroy_Callback methods.


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_Notify_SequencePushConsumer::TAO_Notify_SequencePushConsumer TAO_Notify_ProxySupplier proxy  ) 
 

Definition at line 25 of file SequencePushConsumer.cpp.

00026 : TAO_Notify_Consumer (proxy)
00027 {
00028 }

TAO_Notify_SequencePushConsumer::~TAO_Notify_SequencePushConsumer  )  [virtual]
 

Definition at line 30 of file SequencePushConsumer.cpp.

00031 {
00032 }


Member Function Documentation

bool TAO_Notify_SequencePushConsumer::dispatch_from_queue Request_Queue requests,
ACE_Guard< TAO_SYNCH_MUTEX > &  ace_mon
[virtual]
 

Attempt to dispatch event from a queue.

Called by dispatch_pending. Deliver one or more events to the Consumer. If delivery fails, events are left in the queue (or discarded depending on QoS parameters.) Undelivered, undiscarded requests are left at the front of the queue. Overridden in sequence consumer to dispatch as an EventBatch.

Returns:
false if delivery failed and the request(s) cannot be discarded.

Reimplemented from TAO_Notify_Consumer.

Definition at line 57 of file SequencePushConsumer.cpp.

References ACE_ASSERT, ACE_CATCHANY, ACE_DEBUG, ACE_DECLARE_NEW_ENV, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TEXT(), ACE_TRY, ACE_TRY_CHECK, ACE_Guard< ACE_LOCK >::acquire(), TAO_Notify_Method_Request_Event::complete(), TAO_Notify_Event::convert(), DEBUG_LEVEL, TAO_Notify_ProxySupplier::destroy(), TAO_Notify_Consumer::dispatch_batch(), TAO_Notify_Method_Request_Event::event(), CosNotification::EventBatch, TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, TAO_Notify_Consumer::proxy_supplier(), ACE_Message_Block::release(), ACE_Guard< ACE_LOCK >::release(), TAO_Notify_Method_Request_Event::sequence(), TAO_Notify_Method_Request_Event::should_retry(), and TAO_Notify_PropertyBase_T< TYPE >::value().

00058 {
00059   bool result = true;
00060   if (DEBUG_LEVEL > 0)
00061   {
00062     ACE_DEBUG ( (LM_DEBUG,
00063       ACE_TEXT ("(%P|%t) SequencePushConsumer dispatch queued requests. queue size:%u\n"),
00064       requests.size ()));
00065   }
00066 
00067   long queue_size = requests.size ();
00068   CORBA::Long max_batch_size = queue_size;
00069   if (this->max_batch_size_.is_valid () )
00070   {
00071     max_batch_size = this->max_batch_size_.value ();
00072   }
00073   CORBA::Long batch_size = queue_size;
00074   if (batch_size > max_batch_size)
00075   {
00076     batch_size = max_batch_size;
00077   }
00078   if (batch_size > 0)
00079   {
00080     CosNotification::EventBatch batch (batch_size);
00081     batch.length (batch_size);
00082 
00083     Request_Queue completed;
00084 
00085     CORBA::Long pos = 0;
00086     TAO_Notify_Method_Request_Event_Queueable * request = 0;
00087     while (pos < batch_size && requests.dequeue_head (request) == 0)
00088     {
00089       if (DEBUG_LEVEL > 0)
00090       {
00091         ACE_DEBUG ( (LM_DEBUG,
00092           ACE_TEXT ("(%P|%t) Sequence Dispatch Method_Request_Dispatch @%@\n"),
00093           request));
00094       }
00095 
00096       const TAO_Notify_Event * ev = request->event ();
00097       ev->convert (batch [pos]);
00098       ++pos;
00099 
00100       // note enqueue at head, use queue as stack.
00101       completed.enqueue_head (request);
00102     }
00103     batch.length (pos);
00104     ACE_ASSERT (pos > 0);
00105 
00106     ace_mon.release ();
00107     TAO_Notify_Consumer::DispatchStatus status =
00108       this->dispatch_batch (batch);
00109     ace_mon.acquire ();
00110     switch (status)
00111     {
00112     case DISPATCH_SUCCESS:
00113       {
00114         TAO_Notify_Method_Request_Event_Queueable * request = 0;
00115         while (completed.dequeue_head (request) == 0)
00116         {
00117           request->complete ();
00118           request->release ();
00119         }
00120         result = true;
00121         break;
00122       }
00123     case DISPATCH_FAIL:
00124       {
00125         TAO_Notify_Method_Request_Event_Queueable * request = 0;
00126         while (completed.dequeue_head (request) == 0)
00127         {
00128           if (request->should_retry ())
00129           {
00130             if (DEBUG_LEVEL > 0)
00131               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00132                           static_cast <int> (this->proxy ()->id ()),
00133                           request->sequence ()));
00134             requests.enqueue_head (request);
00135             result = false;
00136           }
00137           else
00138           {
00139             if (DEBUG_LEVEL > 0)
00140               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
00141                           static_cast<int> (this->proxy ()->id ()),
00142                           request->sequence ()));
00143             request->complete ();
00144             request->release ();
00145           }
00146         }
00147         while (requests.dequeue_head (request) == 0)
00148         {
00149           if (request->should_retry ())
00150           {
00151             if (DEBUG_LEVEL > 0)
00152               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00153                           static_cast<int> (this->proxy ()->id ()),
00154                           request->sequence ()));
00155             requests.enqueue_head (request);
00156             result = false;
00157           }
00158           else
00159           {
00160             if (DEBUG_LEVEL > 0)
00161               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
00162                           static_cast<int> (this->proxy ()->id ()),
00163                           request->sequence ()));
00164             request->complete ();
00165             request->release ();
00166           }
00167         }
00168         ace_mon.release();
00169         ACE_DECLARE_NEW_ENV;
00170         ACE_TRY
00171         {
00172           this->proxy_supplier ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00173           ACE_TRY_CHECK;
00174         }
00175         ACE_CATCHANY
00176         {
00177           // todo is there something meaningful we can do here?
00178           ;
00179         }
00180         ACE_ENDTRY;
00181         ace_mon.acquire();
00182         break;
00183       }
00184     case DISPATCH_RETRY:
00185     case DISPATCH_DISCARD:
00186       {
00187         TAO_Notify_Method_Request_Event_Queueable *  request = 0;
00188         while (completed.dequeue_head (request) == 0)
00189         {
00190           if (request->should_retry ())
00191           {
00192             if (DEBUG_LEVEL > 0)
00193               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00194                           static_cast<int> (this->proxy ()->id ()),
00195                           request->sequence ()));
00196             requests.enqueue_head (request);
00197             result = false;
00198           }
00199           else
00200           {
00201             if (DEBUG_LEVEL > 0)
00202               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
00203                           static_cast<int> (this->proxy ()->id ()),
00204                           request->sequence ()));
00205             request->complete ();
00206             request->release ();
00207           }
00208         }
00209         break;
00210       }
00211     default:
00212       {
00213         result = false;
00214         break;
00215       }
00216     }
00217   }
00218   return result;
00219 }

bool TAO_Notify_SequencePushConsumer::enqueue_if_necessary TAO_Notify_Method_Request_Event request  )  [virtual]
 

Add request to a queue if necessary. for Sequence it's always necessary.

Reimplemented from TAO_Notify_Consumer.

Definition at line 222 of file SequencePushConsumer.cpp.

References ACE_CHECK_RETURN, ACE_DEBUG, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, DEBUG_LEVEL, TAO_Notify_Consumer::dispatch_pending(), TAO_Notify_Consumer::enqueue_request(), TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, TAO_Notify_Consumer::pending_events(), TAO_Notify_Consumer::schedule_timer(), and TAO_Notify_PropertyBase_T< TYPE >::value().

00225 {
00226   if (DEBUG_LEVEL > 0)
00227     ACE_DEBUG ( (LM_DEBUG, "SequencePushConsumer enqueing event.\n"));
00228   this->enqueue_request (request ACE_ENV_ARG_PARAMETER);
00229   ACE_CHECK_RETURN (false);
00230 
00231   size_t mbs = static_cast<size_t>(this->max_batch_size_.value());
00232 
00233   if (this->pending_events().size() >= mbs || this->pacing_.is_valid () == 0)
00234   {
00235     this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER);
00236     ACE_CHECK_RETURN (false);
00237   }
00238   else
00239   {
00240     schedule_timer (false);
00241   }
00242   return true;
00243 }

ACE_CString TAO_Notify_SequencePushConsumer::get_ior void   )  const [virtual]
 

Retrieve the ior of this peer.

Implements TAO_Notify_Peer.

Definition at line 267 of file SequencePushConsumer.cpp.

References ACE_CATCHANY, ACE_CString, ACE_DECLARE_NEW_CORBA_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, and TAO_Singleton< TYPE, ACE_LOCK >::instance().

00268 {
00269   ACE_CString result;
00270   CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance ()->orb ();
00271   ACE_DECLARE_NEW_CORBA_ENV;
00272   ACE_TRY
00273   {
00274     CORBA::String_var ior = orb->object_to_string (this->push_consumer_.in () ACE_ENV_ARG_PARAMETER);
00275     ACE_TRY_CHECK;
00276     result = static_cast<const char*> (ior.in ());
00277   }
00278   ACE_CATCHANY
00279   {
00280     result.fast_clear();
00281   }
00282   ACE_ENDTRY;
00283   return result;
00284 }

void TAO_Notify_SequencePushConsumer::init CosNotifyComm::SequencePushConsumer_ptr  push_consumer  ) 
 

Init the Consumer.

Definition at line 35 of file SequencePushConsumer.cpp.

References ACE_ASSERT, ACE_THROW, and CORBA::is_nil().

Referenced by TAO_Notify_SequenceProxyPushSupplier::connect_sequence_push_consumer(), and reconnect_from_consumer().

00036 {
00037   // Initialize only once
00038   ACE_ASSERT( CORBA::is_nil (this->push_consumer_.in()) );
00039 
00040   if (CORBA::is_nil (push_consumer))
00041   {
00042     ACE_THROW (CORBA::BAD_PARAM());
00043   }
00044 
00045   this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer);
00046   this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer);
00047 }

void TAO_Notify_SequencePushConsumer::push const CosNotification::EventBatch event  )  [virtual]
 

Push a batch of events to this consumer.

Implements TAO_Notify_Consumer.

Definition at line 260 of file SequencePushConsumer.cpp.

References ACE_CHECK, ACE_ENV_ARG_PARAMETER, and CosNotification::EventBatch.

00261 {
00262   this->push_consumer_->push_structured_events (event_batch ACE_ENV_ARG_PARAMETER);
00263   ACE_CHECK;
00264 }

void TAO_Notify_SequencePushConsumer::push const CosNotification::StructuredEvent event  )  [virtual]
 

Push to this consumer.

Implements TAO_Notify_Consumer.

Definition at line 253 of file SequencePushConsumer.cpp.

00254 {
00255   //NOP
00256 }

void TAO_Notify_SequencePushConsumer::push const CORBA::Any &  event  )  [virtual]
 

Push to this consumer.

Implements TAO_Notify_Consumer.

Definition at line 247 of file SequencePushConsumer.cpp.

00248 {
00249   //NOP
00250 }

void TAO_Notify_SequencePushConsumer::reconnect_from_consumer TAO_Notify_Consumer old_consumer  )  [virtual]
 

on reconnect we need to move events from the old consumer to the new one

Implements TAO_Notify_Consumer.

Definition at line 287 of file SequencePushConsumer.cpp.

References ACE_ASSERT, ACE_CHECK, ACE_ENV_ARG_PARAMETER, init(), push_consumer_, and TAO_Notify_Consumer::schedule_timer().

00289 {
00290   TAO_Notify_SequencePushConsumer* tmp = dynamic_cast<TAO_Notify_SequencePushConsumer *> (old_consumer);
00291   ACE_ASSERT(tmp != 0);
00292   this->init(tmp->push_consumer_.in() ACE_ENV_ARG_PARAMETER);
00293   ACE_CHECK;
00294   this->schedule_timer(false);
00295 }

void TAO_Notify_SequencePushConsumer::release void   )  [private, virtual]
 

TAO_Notify_Destroy_Callback methods.

Implements TAO_Notify_Peer.

Definition at line 50 of file SequencePushConsumer.cpp.

00051 {
00052   delete this;
00053   //@@ inform factory
00054 }


Member Data Documentation

CosNotifyComm::SequencePushConsumer_var TAO_Notify_SequencePushConsumer::push_consumer_ [protected]
 

The Consumer.

Definition at line 84 of file SequencePushConsumer.h.

Referenced by reconnect_from_consumer().


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 13:33:14 2006 for TAO_CosNotification by doxygen 1.3.6