TAO_Notify_Buffering_Strategy Class Reference

Base Strategy to enqueue and dequeue items from a Message Queue. More...

#include <Buffering_Strategy.h>

Collaboration diagram for TAO_Notify_Buffering_Strategy:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TAO_Notify_Buffering_Strategy (TAO_Notify_Message_Queue &msg_queue, const TAO_Notify_AdminProperties::Ptr &admin_properties)
 ~TAO_Notify_Buffering_Strategy ()
void update_qos_properties (const TAO_Notify_QoSProperties &qos_properties)
int enqueue (TAO_Notify_Method_Request_Queueable *method_request)
int dequeue (TAO_Notify_Method_Request_Queueable *&method_request, const ACE_Time_Value *abstime)
void shutdown (void)
 Shutdown.
ACE_Time_Value oldest_event (void)
 Provide the time value of the oldest event in the queue.
void set_tracker (Tracker *tracker)
 Set the tracker object. This strategy does not own the tracker.

Private Member Functions

int queue (TAO_Notify_Method_Request_Queueable *method_request)
 Apply the Order Policy and queue. return -1 on error.
bool discard (TAO_Notify_Method_Request_Queueable *method_request)
 Discard as per the Discard Policy.

Private Attributes

TAO_Notify_Message_Queuemsg_queue_
 The local Message Queue.
TAO_Notify_AdminProperties::Ptr admin_properties_
 Reference to the properties per event channel.
TAO_SYNCH_MUTEXglobal_queue_lock_
 The shared global lock used by all the queues.
CORBA::Longglobal_queue_length_
 The global queue length - queue length accross all the queues.
const TAO_Notify_Property_Longmax_queue_length_
 The maximum events that can be queued overall.
TAO_Notify_Property_Short order_policy_
 Order of events in internal buffers.
TAO_Notify_Property_Short discard_policy_
 Policy to discard when buffers are full.
TAO_Notify_Property_Long max_events_per_consumer_
TAO_Notify_Property_Time blocking_policy_
TAO_SYNCH_CONDITIONglobal_not_full_
TAO_SYNCH_CONDITION local_not_full_
TAO_SYNCH_CONDITION local_not_empty_
 Condition that batch size reached.
bool shutdown_
 Flag to shutdown.
Trackertracker_
 Optional queue tracker.

Classes

class  Tracker
 This interface allows tracking of the queue size. More...

Detailed Description

Base Strategy to enqueue and dequeue items from a Message Queue.

Definition at line 45 of file Buffering_Strategy.h.


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_Notify_Buffering_Strategy::TAO_Notify_Buffering_Strategy ( TAO_Notify_Message_Queue msg_queue,
const TAO_Notify_AdminProperties::Ptr admin_properties 
)

Definition at line 22 of file Buffering_Strategy.cpp.

00025 : msg_queue_ (msg_queue)
00026 , admin_properties_ (admin_properties)
00027 , global_queue_lock_ (admin_properties->global_queue_lock ())
00028 , global_queue_length_ (admin_properties->global_queue_length ())
00029 , max_queue_length_ (admin_properties->max_global_queue_length ())
00030 , order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder)
00031 , discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder)
00032 , max_events_per_consumer_ (CosNotification::MaxEventsPerConsumer)
00033 , blocking_policy_ (TAO_Notify_Extensions::BlockingPolicy)
00034 , global_not_full_ (admin_properties->global_queue_not_full())
00035 , local_not_full_ (global_queue_lock_)
00036 , local_not_empty_ (global_queue_lock_)
00037 , shutdown_ (false)
00038 , tracker_ (0)
00039 {
00040 }

TAO_Notify_Buffering_Strategy::~TAO_Notify_Buffering_Strategy (  ) 

Definition at line 42 of file Buffering_Strategy.cpp.

00043 {
00044 }


Member Function Documentation

int TAO_Notify_Buffering_Strategy::dequeue ( TAO_Notify_Method_Request_Queueable *&  method_request,
const ACE_Time_Value abstime 
)

Dequeue batch. This method will block for abstime if non-zero or else blocks till an item is available. Return -1 on error or if nothing is available, else the number of items actually dequeued (1).

Definition at line 177 of file Buffering_Strategy.cpp.

References ACE_GUARD_RETURN, global_not_full_, global_queue_length_, local_not_full_, TAO_SYNCH_MUTEX, tracker_, and TAO_Notify_Buffering_Strategy::Tracker::update_queue_count().

00178 {
00179   ACE_Message_Block *mb;
00180 
00181   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
00182 
00183   if ( this->shutdown_ )
00184     return -1;
00185 
00186   while (this->msg_queue_.message_count () == 0)
00187     {
00188       this->local_not_empty_.wait (abstime);
00189 
00190       if (this->shutdown_)
00191         return -1;
00192 
00193       if (errno == ETIME)
00194         return 0;
00195     }
00196 
00197   if (this->msg_queue_.dequeue (mb) == -1)
00198     return -1;
00199 
00200   if (this->tracker_ != 0)
00201     this->tracker_->update_queue_count (this->msg_queue_.message_count ());
00202 
00203   method_request = dynamic_cast<TAO_Notify_Method_Request_Queueable*>(mb);
00204 
00205   if (method_request == 0)
00206     return -1;
00207 
00208   --this->global_queue_length_;
00209   local_not_full_.signal();
00210   global_not_full_.signal();
00211 
00212   return 1;
00213 }

bool TAO_Notify_Buffering_Strategy::discard ( TAO_Notify_Method_Request_Queueable method_request  )  [private]

Discard as per the Discard Policy.

Definition at line 259 of file Buffering_Strategy.cpp.

References ACE_DEBUG, CosNotification::AnyOrder, CosNotification::DeadlineOrder, ACE_Message_Queue<>::dequeue_deadline(), ACE_Message_Queue<>::dequeue_head(), ACE_Message_Queue<>::dequeue_prio(), discard_policy_, CosNotification::FifoOrder, CosNotification::LifoOrder, LM_DEBUG, ACE_Message_Block::msg_priority(), msg_queue_, CosNotification::PriorityOrder, ACE_Message_Block::release(), and TAO_debug_level.

Referenced by enqueue().

00260 {
00261   if (this->shutdown_)
00262     {
00263       return false;
00264     }
00265 
00266   ACE_Message_Block* mb = 0;
00267   int result = -1;
00268 
00269   if (this->discard_policy_.is_valid() == 0 ||
00270       this->discard_policy_ == CosNotification::AnyOrder ||
00271       this->discard_policy_ == CosNotification::FifoOrder)
00272     {
00273       result = this->msg_queue_.dequeue_head (mb);
00274     }
00275   else if (this->discard_policy_ == CosNotification::LifoOrder)
00276     {
00277       // The most current message is NOT the newest one in the queue. It's
00278       // the one we're about to add to the queue.
00279       result = -1;
00280     }
00281   else if (this->discard_policy_ == CosNotification::DeadlineOrder)
00282     {
00283       result = this->msg_queue_.dequeue_deadline (mb);
00284     }
00285   else if (this->discard_policy_ == CosNotification::PriorityOrder)
00286     {
00287       result = this->msg_queue_.dequeue_prio (mb);
00288       if (mb->msg_priority() >= method_request->msg_priority())
00289         {
00290           this->msg_queue_.enqueue_prio (mb);
00291           result = -1;
00292         }
00293     }
00294   else
00295     {
00296       if (TAO_debug_level > 0)
00297         ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid discard policy\n"));
00298       result = this->msg_queue_.dequeue_head (mb);
00299     }
00300 
00301   if (result != -1)
00302     {
00303       ACE_Message_Block::release (mb);
00304       return true;
00305     }
00306 
00307   return false;
00308 }

int TAO_Notify_Buffering_Strategy::enqueue ( TAO_Notify_Method_Request_Queueable method_request  ) 

Enqueue according the enqueing strategy. Return -1 on error else the number of items in the queue.

Definition at line 102 of file Buffering_Strategy.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, blocking_policy_, discard(), ACE_OS::gettimeofday(), global_not_full_, global_queue_length_, TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, local_not_empty_, local_not_full_, max_events_per_consumer_, max_queue_length_, ACE_Message_Queue<>::message_count(), msg_queue_, TAO_SYNCH_MUTEX, tracker_, TAO_Notify_Buffering_Strategy::Tracker::update_queue_count(), and TAO_Notify_PropertyBase_T< TYPE >::value().

00103 {
00104   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
00105 
00106   if (this->shutdown_)
00107     return -1;
00108 
00109   bool discarded_existing = false;
00110 
00111   bool local_overflow = this->max_events_per_consumer_.is_valid() &&
00112     static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
00113 
00114   bool global_overflow = this->max_queue_length_.value () != 0 &&
00115     this->global_queue_length_ >= this->max_queue_length_.value ();
00116 
00117   while (local_overflow || global_overflow)
00118     {
00119       if (blocking_policy_.is_valid())
00120         {
00121           ACE_Time_Value timeout;
00122           ORBSVCS_Time::TimeT_to_Time_Value(timeout, blocking_policy_.value());
00123           // Condition variables take an absolute time
00124           timeout += ACE_OS::gettimeofday();
00125           if (local_overflow)
00126             {
00127               local_not_full_.wait(&timeout);
00128             }
00129           else
00130             {
00131               global_not_full_.wait(&timeout);
00132             }
00133           if (errno != ETIME)
00134             {
00135               local_overflow =
00136                 this->max_events_per_consumer_.is_valid() &&
00137                 static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
00138               global_overflow =
00139                 this->max_queue_length_.value () != 0 &&
00140                 this->global_queue_length_ >= this->max_queue_length_.value ();
00141               continue;
00142             }
00143         }
00144 
00145       discarded_existing = this->discard(method_request);
00146       if (discarded_existing)
00147         {
00148           --this->global_queue_length_;
00149           local_not_full_.signal();
00150           global_not_full_.signal();
00151         }
00152       break;
00153     }
00154 
00155   if (! (local_overflow || global_overflow) || discarded_existing)
00156     {
00157       if (this->queue (method_request) == -1)
00158         {
00159           ACE_DEBUG((LM_DEBUG,
00160                      "Notify (%P|%t) - Panic! failed to enqueue event\n"));
00161           return -1;
00162         }
00163 
00164       ++this->global_queue_length_;
00165 
00166       local_not_empty_.signal ();
00167     }
00168 
00169   size_t count = this->msg_queue_.message_count ();
00170   if (this->tracker_ != 0)
00171     this->tracker_->update_queue_count (count);
00172 
00173   return ACE_Utils::truncate_cast<int> (count);
00174 }

ACE_Time_Value TAO_Notify_Buffering_Strategy::oldest_event ( void   ) 

Provide the time value of the oldest event in the queue.

Definition at line 74 of file Buffering_Strategy.cpp.

References ACE_GUARD_RETURN, ACE_Time_Value::max_time, and TAO_SYNCH_MUTEX.

00075 {
00076   ACE_Time_Value tv (ACE_Time_Value::max_time);
00077   ACE_Message_Block* mb = 0;
00078 
00079   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, tv);
00080   TAO_Notify_Message_Queue::ITERATOR itr (this->msg_queue_);
00081   while(itr.next (mb))
00082     {
00083       TAO_Notify_Method_Request_Queueable* event =
00084         dynamic_cast<TAO_Notify_Method_Request_Queueable*> (mb);
00085       if (event != 0)
00086         {
00087           const ACE_Time_Value& etime = event->creation_time ();
00088           if (etime < tv)
00089             tv = etime;
00090         }
00091       itr.advance ();
00092     }
00093 
00094   return tv;
00095 }

int TAO_Notify_Buffering_Strategy::queue ( TAO_Notify_Method_Request_Queueable method_request  )  [private]

Apply the Order Policy and queue. return -1 on error.

Definition at line 223 of file Buffering_Strategy.cpp.

References ACE_DEBUG, CosNotification::AnyOrder, CosNotification::DeadlineOrder, ACE_Message_Queue<>::enqueue_deadline(), ACE_Message_Queue<>::enqueue_prio(), ACE_Message_Queue<>::enqueue_tail(), CosNotification::FifoOrder, LM_DEBUG, msg_queue_, order_policy_, CosNotification::PriorityOrder, TAO_debug_level, and TAO_Notify_PropertyBase_T< TYPE >::value().

00224 {
00225   if ( this->shutdown_ )
00226     return -1;
00227 
00228   CORBA::Short order = this->order_policy_.value();
00229 
00230   if (! this->order_policy_.is_valid() ||
00231       order == CosNotification::AnyOrder ||
00232       order == CosNotification::FifoOrder)
00233     {
00234       if (TAO_debug_level > 0)
00235         ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in fifo order\n"));
00236       return this->msg_queue_.enqueue_tail (method_request);
00237     }
00238 
00239   if (order == CosNotification::PriorityOrder)
00240     {
00241       if (TAO_debug_level > 0)
00242         ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in priority order\n"));
00243       return this->msg_queue_.enqueue_prio (method_request);
00244     }
00245 
00246   if (order == CosNotification::DeadlineOrder)
00247     {
00248       if (TAO_debug_level > 0)
00249         ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in deadline order\n"));
00250       return this->msg_queue_.enqueue_deadline (method_request);
00251     }
00252 
00253   if (TAO_debug_level > 0)
00254     ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n"));
00255   return this->msg_queue_.enqueue_tail (method_request);
00256 }

void TAO_Notify_Buffering_Strategy::set_tracker ( Tracker tracker  ) 

Set the tracker object. This strategy does not own the tracker.

Definition at line 216 of file Buffering_Strategy.cpp.

References tracker_.

00218 {
00219   this->tracker_ = tracker;
00220 }

void TAO_Notify_Buffering_Strategy::shutdown ( void   ) 

Shutdown.

Definition at line 57 of file Buffering_Strategy.cpp.

References ACE_GUARD, global_not_full_, local_not_empty_, local_not_full_, shutdown_, and TAO_SYNCH_MUTEX.

00058 {
00059   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_);
00060 
00061   if (this->shutdown_)
00062   {
00063     return;
00064   }
00065 
00066   this->shutdown_ = true;
00067 
00068   this->local_not_empty_.broadcast ();
00069   this->global_not_full_.broadcast();
00070   this->local_not_full_.broadcast();
00071 }

void TAO_Notify_Buffering_Strategy::update_qos_properties ( const TAO_Notify_QoSProperties qos_properties  ) 

Update state with the following QoS Properties: Order Policy Discard Policy MaxEventsPerConsumer

Definition at line 48 of file Buffering_Strategy.cpp.

00049 {
00050   this->order_policy_.set (qos_properties);
00051   this->discard_policy_.set (qos_properties);
00052   this->max_events_per_consumer_.set(qos_properties);
00053   this->blocking_policy_.set (qos_properties);
00054 }


Member Data Documentation

TAO_Notify_AdminProperties::Ptr TAO_Notify_Buffering_Strategy::admin_properties_ [private]

Reference to the properties per event channel.

Definition at line 100 of file Buffering_Strategy.h.

TAO_Notify_Property_Time TAO_Notify_Buffering_Strategy::blocking_policy_ [private]

Definition at line 118 of file Buffering_Strategy.h.

Referenced by enqueue().

TAO_Notify_Property_Short TAO_Notify_Buffering_Strategy::discard_policy_ [private]

Policy to discard when buffers are full.

Definition at line 115 of file Buffering_Strategy.h.

Referenced by discard().

TAO_SYNCH_CONDITION& TAO_Notify_Buffering_Strategy::global_not_full_ [private]

Definition at line 120 of file Buffering_Strategy.h.

Referenced by dequeue(), enqueue(), and shutdown().

CORBA::Long& TAO_Notify_Buffering_Strategy::global_queue_length_ [private]

The global queue length - queue length accross all the queues.

Definition at line 106 of file Buffering_Strategy.h.

Referenced by dequeue(), and enqueue().

TAO_SYNCH_MUTEX& TAO_Notify_Buffering_Strategy::global_queue_lock_ [private]

The shared global lock used by all the queues.

Definition at line 103 of file Buffering_Strategy.h.

TAO_SYNCH_CONDITION TAO_Notify_Buffering_Strategy::local_not_empty_ [private]

Condition that batch size reached.

Definition at line 124 of file Buffering_Strategy.h.

Referenced by enqueue(), and shutdown().

TAO_SYNCH_CONDITION TAO_Notify_Buffering_Strategy::local_not_full_ [private]

Definition at line 121 of file Buffering_Strategy.h.

Referenced by dequeue(), enqueue(), and shutdown().

TAO_Notify_Property_Long TAO_Notify_Buffering_Strategy::max_events_per_consumer_ [private]

Definition at line 117 of file Buffering_Strategy.h.

Referenced by enqueue().

const TAO_Notify_Property_Long& TAO_Notify_Buffering_Strategy::max_queue_length_ [private]

The maximum events that can be queued overall.

Definition at line 109 of file Buffering_Strategy.h.

Referenced by enqueue().

TAO_Notify_Message_Queue& TAO_Notify_Buffering_Strategy::msg_queue_ [private]

The local Message Queue.

Definition at line 97 of file Buffering_Strategy.h.

Referenced by discard(), enqueue(), and queue().

TAO_Notify_Property_Short TAO_Notify_Buffering_Strategy::order_policy_ [private]

Order of events in internal buffers.

Definition at line 112 of file Buffering_Strategy.h.

Referenced by queue().

bool TAO_Notify_Buffering_Strategy::shutdown_ [private]

Flag to shutdown.

Definition at line 127 of file Buffering_Strategy.h.

Referenced by shutdown().

Tracker* TAO_Notify_Buffering_Strategy::tracker_ [private]

Optional queue tracker.

Definition at line 130 of file Buffering_Strategy.h.

Referenced by dequeue(), enqueue(), and set_tracker().


The documentation for this class was generated from the following files:
Generated on Tue Feb 2 17:46:16 2010 for TAO_CosNotification by  doxygen 1.4.7