TAO_Notify_Buffering_Strategy Class Reference

Base Strategy to enqueue and dequeue items from a Message Queue. More...

#include <Buffering_Strategy.h>

Collaboration diagram for TAO_Notify_Buffering_Strategy:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TAO_Notify_Buffering_Strategy (TAO_Notify_Message_Queue &msg_queue, const TAO_Notify_AdminProperties::Ptr &admin_properties)
 ~TAO_Notify_Buffering_Strategy ()
void update_qos_properties (const TAO_Notify_QoSProperties &qos_properties)
int enqueue (TAO_Notify_Method_Request_Queueable *method_request)
int dequeue (TAO_Notify_Method_Request_Queueable *&method_request, const ACE_Time_Value *abstime)
void shutdown (void)
 Shutdown.


Private Member Functions

int queue (TAO_Notify_Method_Request_Queueable *method_request)
 Apply the Order Policy and queue. return -1 on error.

bool discard (TAO_Notify_Method_Request_Queueable *method_request)
 Discard as per the Discard Policy.


Private Attributes

TAO_Notify_Message_Queuemsg_queue_
 = Data Members The local Message Queue

TAO_Notify_AdminProperties::Ptr admin_properties_
 Reference to the properties per event channel.

ACE_SYNCH_MUTEXglobal_queue_lock_
 The shared global lock used by all the queues.

CORBA::Longglobal_queue_length_
 The global queue length - queue length accross all the queues.

const TAO_Notify_Property_Longmax_queue_length_
 The maximum events that can be queued overall.

TAO_Notify_Property_Short order_policy_
 Order of events in internal buffers.

TAO_Notify_Property_Short discard_policy_
 Policy to discard when buffers are full.

TAO_Notify_Property_Long max_events_per_consumer_
TAO_Notify_Property_Time blocking_policy_
TAO_SYNCH_CONDITIONglobal_not_full_
TAO_SYNCH_CONDITION local_not_full_
TAO_SYNCH_CONDITION local_not_empty_
 Condition that batch size reached.

bool shutdown_
 Flag to shutdown.


Detailed Description

Base Strategy to enqueue and dequeue items from a Message Queue.

Definition at line 45 of file Buffering_Strategy.h.


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_Notify_Buffering_Strategy::TAO_Notify_Buffering_Strategy TAO_Notify_Message_Queue msg_queue,
const TAO_Notify_AdminProperties::Ptr admin_properties
 

Definition at line 22 of file Buffering_Strategy.cpp.

References TAO_Notify_AdminProperties::Ptr, and TAO_Notify_Message_Queue.

00025 : msg_queue_ (msg_queue)
00026 , admin_properties_ (admin_properties)
00027 , global_queue_lock_ (admin_properties->global_queue_lock ())
00028 , global_queue_length_ (admin_properties->global_queue_length ())
00029 , max_queue_length_ (admin_properties->max_global_queue_length ())
00030 , order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder)
00031 , discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder)
00032 , max_events_per_consumer_ (CosNotification::MaxEventsPerConsumer)
00033 , blocking_policy_ (TAO_Notify_Extensions::BlockingPolicy)
00034 , global_not_full_ (admin_properties->global_queue_not_full())
00035 , local_not_full_ (global_queue_lock_)
00036 , local_not_empty_ (global_queue_lock_)
00037 , shutdown_ (false)
00038 {
00039 }

TAO_Notify_Buffering_Strategy::~TAO_Notify_Buffering_Strategy  ) 
 

Definition at line 41 of file Buffering_Strategy.cpp.

00042 {
00043 }


Member Function Documentation

int TAO_Notify_Buffering_Strategy::dequeue TAO_Notify_Method_Request_Queueable *&  method_request,
const ACE_Time_Value abstime
 

Dequeue batch. This method will block for abstime if non-zero or else blocks till an item is available. Return -1 on error or if nothing is available, else the number of items actually dequeued (1).

Definition at line 143 of file Buffering_Strategy.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, ACE_Message_Queue<>::dequeue(), ETIME, global_not_full_, local_not_empty_, local_not_full_, ACE_Message_Queue<>::message_count(), and shutdown_.

00144 {
00145   ACE_Message_Block *mb;
00146 
00147   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
00148 
00149   if ( this->shutdown_ )
00150     return -1;
00151 
00152   while (this->msg_queue_.message_count () == 0)
00153     {
00154       this->local_not_empty_.wait (abstime);
00155 
00156       if (this->shutdown_)
00157         return -1;
00158 
00159       if (errno == ETIME)
00160         return 0;
00161     }
00162 
00163   if (this->msg_queue_.dequeue (mb) == -1)
00164     return -1;
00165 
00166   method_request = dynamic_cast<TAO_Notify_Method_Request_Queueable*>(mb);
00167 
00168   if (method_request == 0)
00169     return -1;
00170 
00171   --this->global_queue_length_;
00172   local_not_full_.signal();
00173   global_not_full_.signal();
00174 
00175   return 1;
00176 }

bool TAO_Notify_Buffering_Strategy::discard TAO_Notify_Method_Request_Queueable method_request  )  [private]
 

Discard as per the Discard Policy.

Definition at line 215 of file Buffering_Strategy.cpp.

References ACE_DEBUG, ACE_Message_Queue<>::dequeue_deadline(), ACE_Message_Queue<>::dequeue_head(), ACE_Message_Queue<>::dequeue_prio(), discard_policy_, ACE_Message_Queue<>::enqueue_prio(), TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, ACE_Message_Block::msg_priority(), ACE_Message_Block::release(), shutdown_, and TAO_debug_level.

Referenced by enqueue().

00216 {
00217   if (this->shutdown_)
00218     {
00219       return false;
00220     }
00221 
00222   ACE_Message_Block* mb = 0;
00223   int result = -1;
00224 
00225   if (this->discard_policy_.is_valid() == 0 ||
00226       this->discard_policy_ == CosNotification::AnyOrder ||
00227       this->discard_policy_ == CosNotification::FifoOrder)
00228     {
00229       result = this->msg_queue_.dequeue_head (mb);
00230     }
00231   else if (this->discard_policy_ == CosNotification::LifoOrder)
00232     {
00233       // The most current message is NOT the newest one in the queue. It's
00234       // the one we're about to add to the queue.
00235       result = -1;
00236     }
00237   else if (this->discard_policy_ == CosNotification::DeadlineOrder)
00238     {
00239       result = this->msg_queue_.dequeue_deadline (mb);
00240     }
00241   else if (this->discard_policy_ == CosNotification::PriorityOrder)
00242     {
00243       result = this->msg_queue_.dequeue_prio (mb);
00244       if (mb->msg_priority() >= method_request->msg_priority())
00245         {
00246           this->msg_queue_.enqueue_prio (mb);
00247           result = -1;
00248         }
00249     }
00250   else
00251     {
00252       if (TAO_debug_level > 0)
00253         ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid discard policy\n"));
00254       result = this->msg_queue_.dequeue_head (mb);
00255     }
00256 
00257   if (result != -1)
00258     {
00259       ACE_Message_Block::release (mb);
00260       return true;
00261     }
00262 
00263   return false;
00264 }

int TAO_Notify_Buffering_Strategy::enqueue TAO_Notify_Method_Request_Queueable method_request  ) 
 

Enqueue according the enqueing strategy. Return -1 on error else the number of items in the queue.

Definition at line 73 of file Buffering_Strategy.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, blocking_policy_, discard(), ETIME, ACE_OS::gettimeofday(), global_not_full_, TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, local_not_empty_, local_not_full_, max_events_per_consumer_, max_queue_length_, ACE_Message_Queue<>::message_count(), queue(), shutdown_, and TAO_Notify_PropertyBase_T< TYPE >::value().

00074 {
00075   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
00076 
00077   if (this->shutdown_)
00078     return -1;
00079 
00080   bool discarded_existing = false;
00081 
00082   bool local_overflow = this->max_events_per_consumer_.is_valid() &&
00083     static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
00084 
00085   bool global_overflow = this->max_queue_length_.value () != 0 &&
00086     this->global_queue_length_ >= this->max_queue_length_.value ();
00087 
00088   while (local_overflow || global_overflow)
00089     {
00090       if (blocking_policy_.is_valid())
00091         {
00092           ACE_Time_Value timeout;
00093           ORBSVCS_Time::TimeT_to_Time_Value(timeout, blocking_policy_.value());
00094           // Condition variables take an absolute time
00095           timeout += ACE_OS::gettimeofday();
00096           if (local_overflow)
00097             {
00098               local_not_full_.wait(&timeout);
00099             }
00100           else
00101             {
00102               global_not_full_.wait(&timeout);
00103             }
00104           if (errno != ETIME)
00105             {
00106               local_overflow =
00107                 this->max_events_per_consumer_.is_valid() &&
00108                 static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
00109               global_overflow =
00110                 this->max_queue_length_.value () != 0 &&
00111                 this->global_queue_length_ >= this->max_queue_length_.value ();
00112               continue;
00113             }
00114         }
00115 
00116       discarded_existing = this->discard(method_request);
00117       if (discarded_existing)
00118         {
00119           --this->global_queue_length_;
00120           local_not_full_.signal();
00121           global_not_full_.signal();
00122         }
00123       break;
00124     }
00125 
00126   if (! (local_overflow || global_overflow) || discarded_existing)
00127     {
00128       if (this->queue (method_request) == -1)
00129         {
00130           ACE_DEBUG((LM_DEBUG,
00131                      "Notify (%P|%t) - Panic! failed to enqueue event\n"));
00132           return -1;
00133         }
00134 
00135       ++this->global_queue_length_;
00136 
00137       local_not_empty_.signal ();
00138     }
00139   return this->msg_queue_.message_count ();
00140 }

int TAO_Notify_Buffering_Strategy::queue TAO_Notify_Method_Request_Queueable method_request  )  [private]
 

Apply the Order Policy and queue. return -1 on error.

Definition at line 179 of file Buffering_Strategy.cpp.

References ACE_DEBUG, ACE_Message_Queue<>::enqueue_deadline(), ACE_Message_Queue<>::enqueue_prio(), ACE_Message_Queue<>::enqueue_tail(), TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, order_policy_, shutdown_, TAO_debug_level, and TAO_Notify_PropertyBase_T< TYPE >::value().

Referenced by enqueue().

00180 {
00181   if ( this->shutdown_ )
00182     return -1;
00183 
00184   CORBA::Short order = this->order_policy_.value();
00185 
00186   if (! this->order_policy_.is_valid() ||
00187       order == CosNotification::AnyOrder ||
00188       order == CosNotification::FifoOrder)
00189     {
00190       if (TAO_debug_level > 0)
00191         ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in fifo order\n"));
00192       return this->msg_queue_.enqueue_tail (method_request);
00193     }
00194 
00195   if (order == CosNotification::PriorityOrder)
00196     {
00197       if (TAO_debug_level > 0)
00198         ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in priority order\n"));
00199       return this->msg_queue_.enqueue_prio (method_request);
00200     }
00201 
00202   if (order == CosNotification::DeadlineOrder)
00203     {
00204       if (TAO_debug_level > 0)
00205         ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in deadline order\n"));
00206       return this->msg_queue_.enqueue_deadline (method_request);
00207     }
00208 
00209   if (TAO_debug_level > 0)
00210     ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n"));
00211   return this->msg_queue_.enqueue_tail (method_request);
00212 }

void TAO_Notify_Buffering_Strategy::shutdown void   ) 
 

Shutdown.

Definition at line 56 of file Buffering_Strategy.cpp.

References ACE_GUARD, ACE_SYNCH_MUTEX, global_not_full_, local_not_empty_, local_not_full_, and shutdown_.

00057 {
00058   ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_);
00059 
00060   if (this->shutdown_)
00061   {
00062     return;
00063   }
00064 
00065   this->shutdown_ = true;
00066 
00067   this->local_not_empty_.broadcast ();
00068   this->global_not_full_.broadcast();
00069   this->local_not_full_.broadcast();
00070 }

void TAO_Notify_Buffering_Strategy::update_qos_properties const TAO_Notify_QoSProperties qos_properties  ) 
 

Update state with the following QoS Properties: Order Policy Discard Policy MaxEventsPerConsumer

Definition at line 47 of file Buffering_Strategy.cpp.

00048 {
00049   this->order_policy_.set (qos_properties);
00050   this->discard_policy_.set (qos_properties);
00051   this->max_events_per_consumer_.set(qos_properties);
00052   this->blocking_policy_.set (qos_properties);
00053 }


Member Data Documentation

TAO_Notify_AdminProperties::Ptr TAO_Notify_Buffering_Strategy::admin_properties_ [private]
 

Reference to the properties per event channel.

Definition at line 86 of file Buffering_Strategy.h.

TAO_Notify_Property_Time TAO_Notify_Buffering_Strategy::blocking_policy_ [private]
 

Definition at line 104 of file Buffering_Strategy.h.

Referenced by enqueue().

TAO_Notify_Property_Short TAO_Notify_Buffering_Strategy::discard_policy_ [private]
 

Policy to discard when buffers are full.

Definition at line 101 of file Buffering_Strategy.h.

Referenced by discard().

TAO_SYNCH_CONDITION& TAO_Notify_Buffering_Strategy::global_not_full_ [private]
 

Definition at line 107 of file Buffering_Strategy.h.

Referenced by dequeue(), enqueue(), and shutdown().

CORBA::Long& TAO_Notify_Buffering_Strategy::global_queue_length_ [private]
 

The global queue length - queue length accross all the queues.

Definition at line 92 of file Buffering_Strategy.h.

ACE_SYNCH_MUTEX& TAO_Notify_Buffering_Strategy::global_queue_lock_ [private]
 

The shared global lock used by all the queues.

Definition at line 89 of file Buffering_Strategy.h.

TAO_SYNCH_CONDITION TAO_Notify_Buffering_Strategy::local_not_empty_ [private]
 

Condition that batch size reached.

Definition at line 111 of file Buffering_Strategy.h.

Referenced by dequeue(), enqueue(), and shutdown().

TAO_SYNCH_CONDITION TAO_Notify_Buffering_Strategy::local_not_full_ [private]
 

Definition at line 108 of file Buffering_Strategy.h.

Referenced by dequeue(), enqueue(), and shutdown().

TAO_Notify_Property_Long TAO_Notify_Buffering_Strategy::max_events_per_consumer_ [private]
 

Definition at line 103 of file Buffering_Strategy.h.

Referenced by enqueue().

const TAO_Notify_Property_Long& TAO_Notify_Buffering_Strategy::max_queue_length_ [private]
 

The maximum events that can be queued overall.

Definition at line 95 of file Buffering_Strategy.h.

Referenced by enqueue().

TAO_Notify_Message_Queue& TAO_Notify_Buffering_Strategy::msg_queue_ [private]
 

= Data Members The local Message Queue

Definition at line 83 of file Buffering_Strategy.h.

TAO_Notify_Property_Short TAO_Notify_Buffering_Strategy::order_policy_ [private]
 

Order of events in internal buffers.

Definition at line 98 of file Buffering_Strategy.h.

Referenced by queue().

bool TAO_Notify_Buffering_Strategy::shutdown_ [private]
 

Flag to shutdown.

Definition at line 114 of file Buffering_Strategy.h.

Referenced by dequeue(), discard(), enqueue(), queue(), and shutdown().


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 13:31:40 2006 for TAO_CosNotification by doxygen 1.3.6