TAO_Notify_SequencePushConsumer Class Reference

#include <SequencePushConsumer.h>

Inheritance diagram for TAO_Notify_SequencePushConsumer:

Inheritance graph
[legend]
Collaboration diagram for TAO_Notify_SequencePushConsumer:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TAO_Notify_SequencePushConsumer (TAO_Notify_ProxySupplier *proxy)
virtual ~TAO_Notify_SequencePushConsumer ()
void init (CosNotifyComm::SequencePushConsumer_ptr push_consumer)
 Init the Consumer.

virtual bool enqueue_if_necessary (TAO_Notify_Method_Request_Event *request)
virtual bool dispatch_from_queue (Request_Queue &requests, ACE_Guard< TAO_SYNCH_MUTEX > &ace_mon)
 Attempt to dispatch event from a queue.

virtual void push (const CORBA::Any &event)
 Push to this consumer.

virtual void push (const CosNotification::StructuredEvent &event)
 Push to this consumer.

virtual void push (const CosNotification::EventBatch &event)
 Push a batch of events to this consumer.

virtual ACE_CString get_ior (void) const
 Retrieve the ior of this peer.

virtual void reconnect_from_consumer (TAO_Notify_Consumer *old_consumer)

Protected Attributes

CosNotifyComm::SequencePushConsumer_var push_consumer_
 The Consumer.


Private Member Functions

virtual void release (void)
 TAO_Notify_Destroy_Callback methods.


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_Notify_SequencePushConsumer::TAO_Notify_SequencePushConsumer TAO_Notify_ProxySupplier proxy  ) 
 

Definition at line 28 of file SequencePushConsumer.cpp.

00029 : TAO_Notify_Consumer (proxy)
00030 {
00031 }

TAO_Notify_SequencePushConsumer::~TAO_Notify_SequencePushConsumer  )  [virtual]
 

Definition at line 33 of file SequencePushConsumer.cpp.

00034 {
00035 }


Member Function Documentation

bool TAO_Notify_SequencePushConsumer::dispatch_from_queue Request_Queue requests,
ACE_Guard< TAO_SYNCH_MUTEX > &  ace_mon
[virtual]
 

Attempt to dispatch event from a queue.

Called by dispatch_pending. Deliver one or more events to the Consumer. If delivery fails, events are left in the queue (or discarded depending on QoS parameters.) Undelivered, undiscarded requests are left at the front of the queue. Overridden in sequence consumer to dispatch as an EventBatch.

Returns:
false if delivery failed and the request(s) cannot be discarded.

Reimplemented from TAO_Notify_Consumer.

Definition at line 100 of file SequencePushConsumer.cpp.

References ACE_ASSERT, ACE_DEBUG, ACE_TEXT(), ACE_Guard< ACE_LOCK >::acquire(), TAO_Notify_Method_Request_Event::complete(), TAO_Notify_Event::convert(), DEBUG_LEVEL, TAO_Notify_ProxySupplier::destroy(), TAO_Notify_Consumer::dispatch_batch(), TAO_Notify_Method_Request_Event::event(), CosNotification::EventBatch, TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, TAO_Notify_Consumer::proxy_supplier(), ACE_Message_Block::release(), ACE_Guard< ACE_LOCK >::release(), TAO_Notify_Method_Request_Event::sequence(), TAO_Notify_Method_Request_Event::should_retry(), and TAO_Notify_PropertyBase_T< TYPE >::value().

00101 {
00102   bool result = true;
00103   if (DEBUG_LEVEL > 0)
00104   {
00105     ACE_DEBUG ( (LM_DEBUG,
00106       ACE_TEXT ("(%P|%t) SequencePushConsumer dispatch queued requests. queue size:%u\n"),
00107       requests.size ()));
00108   }
00109 
00110   CORBA::ULong queue_size = ACE_Utils::truncate_cast<CORBA::ULong> (requests.size ());
00111   CORBA::Long max_batch_size = queue_size;
00112   if (this->max_batch_size_.is_valid () )
00113   {
00114     max_batch_size = this->max_batch_size_.value ();
00115   }
00116   CORBA::Long batch_size = queue_size;
00117   if (batch_size > max_batch_size)
00118   {
00119     batch_size = max_batch_size;
00120   }
00121   if (batch_size > 0)
00122   {
00123     CosNotification::EventBatch batch (batch_size);
00124     batch.length (batch_size);
00125 
00126     Request_Queue completed;
00127 
00128     CORBA::Long pos = 0;
00129     TAO_Notify_Method_Request_Event_Queueable * request = 0;
00130     while (pos < batch_size && requests.dequeue_head (request) == 0)
00131     {
00132       if (DEBUG_LEVEL > 0)
00133       {
00134         ACE_DEBUG ( (LM_DEBUG,
00135           ACE_TEXT ("(%P|%t) Sequence Dispatch Method_Request_Dispatch @%@\n"),
00136           request));
00137       }
00138 
00139       const TAO_Notify_Event * ev = request->event ();
00140       ev->convert (batch [pos]);
00141       ++pos;
00142 
00143       // note enqueue at head, use queue as stack.
00144       completed.enqueue_head (request);
00145     }
00146     batch.length (pos);
00147     ACE_ASSERT (pos > 0);
00148 
00149     ace_mon.release ();
00150     TAO_Notify_Consumer::DispatchStatus status =
00151       this->dispatch_batch (batch);
00152     ace_mon.acquire ();
00153     switch (status)
00154     {
00155     case DISPATCH_SUCCESS:
00156       {
00157         TAO_Notify_Method_Request_Event_Queueable * request = 0;
00158         while (completed.dequeue_head (request) == 0)
00159         {
00160           request->complete ();
00161           request->release ();
00162         }
00163         result = true;
00164         break;
00165       }
00166     case DISPATCH_FAIL:
00167       {
00168         TAO_Notify_Method_Request_Event_Queueable * request = 0;
00169         while (completed.dequeue_head (request) == 0)
00170         {
00171           if (request->should_retry ())
00172           {
00173             if (DEBUG_LEVEL > 0)
00174               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00175                           static_cast <int> (this->proxy ()->id ()),
00176                           request->sequence ()));
00177             requests.enqueue_head (request);
00178             result = false;
00179           }
00180           else
00181           {
00182             if (DEBUG_LEVEL > 0)
00183               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
00184                           static_cast<int> (this->proxy ()->id ()),
00185                           request->sequence ()));
00186             request->complete ();
00187             request->release ();
00188           }
00189         }
00190         while (requests.dequeue_head (request) == 0)
00191         {
00192           if (request->should_retry ())
00193           {
00194             if (DEBUG_LEVEL > 0)
00195               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00196                           static_cast<int> (this->proxy ()->id ()),
00197                           request->sequence ()));
00198             requests.enqueue_head (request);
00199             result = false;
00200           }
00201           else
00202           {
00203             if (DEBUG_LEVEL > 0)
00204               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
00205                           static_cast<int> (this->proxy ()->id ()),
00206                           request->sequence ()));
00207             request->complete ();
00208             request->release ();
00209           }
00210         }
00211         ace_mon.release();
00212         try
00213         {
00214           this->proxy_supplier ()->destroy ();
00215         }
00216         catch (const CORBA::Exception&)
00217         {
00218           // todo is there something meaningful we can do here?
00219           ;
00220         }
00221         ace_mon.acquire();
00222         break;
00223       }
00224     case DISPATCH_RETRY:
00225     case DISPATCH_DISCARD:
00226       {
00227         TAO_Notify_Method_Request_Event_Queueable *  request = 0;
00228         while (completed.dequeue_head (request) == 0)
00229         {
00230           if (request->should_retry ())
00231           {
00232             if (DEBUG_LEVEL > 0)
00233               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00234                           static_cast<int> (this->proxy ()->id ()),
00235                           request->sequence ()));
00236             requests.enqueue_head (request);
00237             result = false;
00238           }
00239           else
00240           {
00241             if (DEBUG_LEVEL > 0)
00242               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
00243                           static_cast<int> (this->proxy ()->id ()),
00244                           request->sequence ()));
00245             request->complete ();
00246             request->release ();
00247           }
00248         }
00249         break;
00250       }
00251     default:
00252       {
00253         result = false;
00254         break;
00255       }
00256     }
00257   }
00258   return result;
00259 }

bool TAO_Notify_SequencePushConsumer::enqueue_if_necessary TAO_Notify_Method_Request_Event request  )  [virtual]
 

Add request to a queue if necessary. for Sequence it's always necessary.

Reimplemented from TAO_Notify_Consumer.

Definition at line 262 of file SequencePushConsumer.cpp.

References ACE_DEBUG, DEBUG_LEVEL, TAO_Notify_Consumer::dispatch_pending(), TAO_Notify_Consumer::enqueue_request(), TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, TAO_Notify_Consumer::pending_events(), TAO_Notify_Consumer::schedule_timer(), and TAO_Notify_PropertyBase_T< TYPE >::value().

00264 {
00265   if (DEBUG_LEVEL > 0)
00266     ACE_DEBUG ( (LM_DEBUG, "SequencePushConsumer enqueing event.\n"));
00267   this->enqueue_request (request);
00268 
00269   size_t mbs = static_cast<size_t>(this->max_batch_size_.value());
00270 
00271   if (this->pending_events().size() >= mbs || this->pacing_.is_valid () == 0)
00272   {
00273     this->dispatch_pending ();
00274   }
00275   else
00276   {
00277     schedule_timer (false);
00278   }
00279   return true;
00280 }

ACE_CString TAO_Notify_SequencePushConsumer::get_ior void   )  const [virtual]
 

Retrieve the ior of this peer.

Implements TAO_Notify_Peer.

Definition at line 310 of file SequencePushConsumer.cpp.

References ACE_CString, TAO_Notify_Properties::instance(), and TAO_Notify_Properties::orb().

00311 {
00312   ACE_CString result;
00313   CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance ()->orb ();
00314   try
00315   {
00316     CORBA::String_var ior = orb->object_to_string (this->push_consumer_.in ());
00317     result = static_cast<const char*> (ior.in ());
00318   }
00319   catch (const CORBA::Exception&)
00320   {
00321     result.fast_clear();
00322   }
00323   return result;
00324 }

void TAO_Notify_SequencePushConsumer::init CosNotifyComm::SequencePushConsumer_ptr  push_consumer  ) 
 

Init the Consumer.

Definition at line 38 of file SequencePushConsumer.cpp.

References ACE_ASSERT, ACE_DEBUG, TAO_Notify_Properties::dispatching_orb(), TAO_Notify_Properties::instance(), CORBA::is_nil(), LM_DEBUG, TAO_Notify_Properties::orb(), TAO_Notify_Properties::separate_dispatching_orb(), and TAO_debug_level.

Referenced by TAO_Notify_SequenceProxyPushSupplier::connect_sequence_push_consumer(), and reconnect_from_consumer().

00039 {
00040   // Initialize only once
00041   ACE_ASSERT( CORBA::is_nil (this->push_consumer_.in()) );
00042 
00043   if (CORBA::is_nil (push_consumer))
00044   {
00045     throw CORBA::BAD_PARAM();
00046   }
00047 
00048   if (!TAO_Notify_PROPERTIES::instance()->separate_dispatching_orb ())
00049     {
00050       this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer);
00051       this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer);
00052     }
00053   else
00054     {
00055       // "Port" consumer's object reference from receiving ORB to dispatching ORB.
00056       CORBA::String_var temp =
00057         TAO_Notify_PROPERTIES::instance()->orb()->object_to_string(push_consumer);
00058 
00059       CORBA::Object_var obj =
00060         TAO_Notify_PROPERTIES::instance()->dispatching_orb()->string_to_object(temp.in());
00061 
00062       try
00063         {
00064           CosNotifyComm::SequencePushConsumer_var new_push_consumer =
00065             CosNotifyComm::SequencePushConsumer::_unchecked_narrow(obj.in());
00066 
00067           this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (new_push_consumer.in());
00068           this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (new_push_consumer.in());
00069 
00070           //--cj verify dispatching ORB
00071           if (TAO_debug_level >= 10)
00072             {
00073               ACE_DEBUG ((LM_DEBUG,
00074                           "(%P|%t) Sequence push init dispatching ORB id is %s.\n",
00075                           obj->_stubobj()->orb_core()->orbid()));
00076             }
00077           //--cj end
00078         }
00079       catch (const CORBA::TRANSIENT& ex)
00080         {
00081           ex._tao_print_exception (
00082             "Got a TRANSIENT in NS_SequencePushConsumer::init");
00083           ACE_DEBUG ((LM_DEBUG, "(%P|%t) got it for NS_SequencePushConsumer %@\n", this));
00084         }
00085       catch (const CORBA::Exception&)
00086         {
00087           // _narrow failed
00088         }
00089     }
00090 }

void TAO_Notify_SequencePushConsumer::push const CosNotification::EventBatch event  )  [virtual]
 

Push a batch of events to this consumer.

Implements TAO_Notify_Consumer.

Definition at line 297 of file SequencePushConsumer.cpp.

References ACE_DEBUG, CosNotification::EventBatch, LM_DEBUG, and TAO_debug_level.

00298 {
00299   //--cj verify dispatching ORB
00300   if (TAO_debug_level >= 10) {
00301     ACE_DEBUG ((LM_DEBUG, "(%P|%t) Sequence push dispatching ORB id is %s.\n",
00302                 this->push_consumer_->_stubobj()->orb_core()->orbid()));
00303   }
00304   //--cj end
00305 
00306   this->push_consumer_->push_structured_events (event_batch);
00307 }

void TAO_Notify_SequencePushConsumer::push const CosNotification::StructuredEvent event  )  [virtual]
 

Push to this consumer.

Implements TAO_Notify_Consumer.

Definition at line 290 of file SequencePushConsumer.cpp.

00291 {
00292   //NOP
00293 }

void TAO_Notify_SequencePushConsumer::push const CORBA::Any &  event  )  [virtual]
 

Push to this consumer.

Implements TAO_Notify_Consumer.

Definition at line 284 of file SequencePushConsumer.cpp.

00285 {
00286   //NOP
00287 }

void TAO_Notify_SequencePushConsumer::reconnect_from_consumer TAO_Notify_Consumer old_consumer  )  [virtual]
 

on reconnect we need to move events from the old consumer to the new one

Implements TAO_Notify_Consumer.

Definition at line 327 of file SequencePushConsumer.cpp.

References ACE_ASSERT, init(), push_consumer_, and TAO_Notify_Consumer::schedule_timer().

00328 {
00329   TAO_Notify_SequencePushConsumer* tmp = dynamic_cast<TAO_Notify_SequencePushConsumer *> (old_consumer);
00330   ACE_ASSERT(tmp != 0);
00331   this->init(tmp->push_consumer_.in());
00332   this->schedule_timer(false);
00333 }

void TAO_Notify_SequencePushConsumer::release void   )  [private, virtual]
 

TAO_Notify_Destroy_Callback methods.

Implements TAO_Notify_Peer.

Definition at line 93 of file SequencePushConsumer.cpp.

00094 {
00095   delete this;
00096   //@@ inform factory
00097 }


Member Data Documentation

CosNotifyComm::SequencePushConsumer_var TAO_Notify_SequencePushConsumer::push_consumer_ [protected]
 

The Consumer.

Definition at line 82 of file SequencePushConsumer.h.

Referenced by reconnect_from_consumer().


The documentation for this class was generated from the following files:
Generated on Sun Jan 27 15:51:17 2008 for TAO_CosNotification by doxygen 1.3.6