Base Strategy to enqueue and dequeue items from a Message Queue. More...
#include <Buffering_Strategy.h>
Classes | |
class | Tracker |
This interface allows tracking of the queue size. More... | |
Public Member Functions | |
TAO_Notify_Buffering_Strategy (TAO_Notify_Message_Queue &msg_queue, const TAO_Notify_AdminProperties::Ptr &admin_properties) | |
~TAO_Notify_Buffering_Strategy () | |
void | update_qos_properties (const TAO_Notify_QoSProperties &qos_properties) |
int | enqueue (TAO_Notify_Method_Request_Queueable *method_request) |
int | dequeue (TAO_Notify_Method_Request_Queueable *&method_request, const ACE_Time_Value *abstime) |
void | shutdown (void) |
Shutdown. | |
ACE_Time_Value | oldest_event (void) |
Provide the time value of the oldest event in the queue. | |
void | set_tracker (Tracker *tracker) |
Set the tracker object. This strategy does not own the tracker. | |
Private Member Functions | |
int | queue (TAO_Notify_Method_Request_Queueable *method_request) |
Apply the Order Policy and queue. return -1 on error. | |
bool | discard (TAO_Notify_Method_Request_Queueable *method_request) |
Discard as per the Discard Policy. | |
Private Attributes | |
TAO_Notify_Message_Queue & | msg_queue_ |
= Data Members | |
TAO_Notify_AdminProperties::Ptr | admin_properties_ |
Reference to the properties per event channel. | |
TAO_SYNCH_MUTEX & | global_queue_lock_ |
The shared global lock used by all the queues. | |
CORBA::Long & | global_queue_length_ |
The global queue length - queue length accross all the queues. | |
const TAO_Notify_Property_Long & | max_queue_length_ |
The maximum events that can be queued overall. | |
TAO_Notify_Property_Short | order_policy_ |
Order of events in internal buffers. | |
TAO_Notify_Property_Short | discard_policy_ |
Policy to discard when buffers are full. | |
TAO_Notify_Property_Long | max_events_per_consumer_ |
TAO_Notify_Property_Time | blocking_policy_ |
TAO_SYNCH_CONDITION & | global_not_full_ |
TAO_SYNCH_CONDITION | local_not_full_ |
TAO_SYNCH_CONDITION | local_not_empty_ |
Condition that batch size reached. | |
bool | shutdown_ |
Flag to shutdown. | |
Tracker * | tracker_ |
Optional queue tracker. |
Base Strategy to enqueue and dequeue items from a Message Queue.
Definition at line 45 of file Buffering_Strategy.h.
TAO_Notify_Buffering_Strategy::TAO_Notify_Buffering_Strategy | ( | TAO_Notify_Message_Queue & | msg_queue, | |
const TAO_Notify_AdminProperties::Ptr & | admin_properties | |||
) |
Definition at line 22 of file Buffering_Strategy.cpp.
: msg_queue_ (msg_queue) , admin_properties_ (admin_properties) , global_queue_lock_ (admin_properties->global_queue_lock ()) , global_queue_length_ (admin_properties->global_queue_length ()) , max_queue_length_ (admin_properties->max_global_queue_length ()) , order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder) , discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder) , max_events_per_consumer_ (CosNotification::MaxEventsPerConsumer) , blocking_policy_ (TAO_Notify_Extensions::BlockingPolicy) , global_not_full_ (admin_properties->global_queue_not_full()) , local_not_full_ (global_queue_lock_) , local_not_empty_ (global_queue_lock_) , shutdown_ (false) , tracker_ (0) { }
TAO_Notify_Buffering_Strategy::~TAO_Notify_Buffering_Strategy | ( | ) |
Definition at line 42 of file Buffering_Strategy.cpp.
{ }
int TAO_Notify_Buffering_Strategy::dequeue | ( | TAO_Notify_Method_Request_Queueable *& | method_request, | |
const ACE_Time_Value * | abstime | |||
) |
Dequeue batch. This method will block for abstime if non-zero or else blocks till an item is available. Return -1 on error or if nothing is available, else the number of items actually dequeued (1).
Definition at line 225 of file Buffering_Strategy.cpp.
{ ACE_Message_Block *mb; ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1); if ( this->shutdown_ ) return -1; while (this->msg_queue_.message_count () == 0) { this->local_not_empty_.wait (abstime); if (this->shutdown_) return -1; if (errno == ETIME) return 0; } if (this->msg_queue_.dequeue (mb) == -1) return -1; if (this->tracker_ != 0) { this->tracker_->update_queue_count (this->msg_queue_.message_count ()); } method_request = dynamic_cast<TAO_Notify_Method_Request_Queueable*>(mb); if (method_request == 0) return -1; --this->global_queue_length_; local_not_full_.signal(); global_not_full_.signal(); return 1; }
bool TAO_Notify_Buffering_Strategy::discard | ( | TAO_Notify_Method_Request_Queueable * | method_request | ) | [private] |
Discard as per the Discard Policy.
Definition at line 316 of file Buffering_Strategy.cpp.
{ if (this->shutdown_) { return false; } ACE_Message_Block* mb = 0; int result = -1; if (this->discard_policy_.is_valid() == 0 || this->discard_policy_ == CosNotification::AnyOrder || this->discard_policy_ == CosNotification::FifoOrder) { result = this->msg_queue_.dequeue_head (mb); } else if (this->discard_policy_ == CosNotification::LifoOrder) { // The most current message is NOT the newest one in the queue. It's // the one we're about to add to the queue. result = -1; } else if (this->discard_policy_ == CosNotification::DeadlineOrder) { result = this->msg_queue_.dequeue_deadline (mb); } else if (this->discard_policy_ == CosNotification::PriorityOrder) { result = this->msg_queue_.dequeue_prio (mb); if (mb->msg_priority() >= method_request->msg_priority()) { this->msg_queue_.enqueue_prio (mb); result = -1; } } else { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid discard policy\n")); result = this->msg_queue_.dequeue_head (mb); } if (result != -1) { ACE_Message_Block::release (mb); return true; } return false; }
int TAO_Notify_Buffering_Strategy::enqueue | ( | TAO_Notify_Method_Request_Queueable * | method_request | ) |
Enqueue according the enqueing strategy. Return -1 on error else the number of items in the queue.
Definition at line 138 of file Buffering_Strategy.cpp.
{ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1); if (this->shutdown_) return -1; bool discarded_existing = false; bool local_overflow = this->max_events_per_consumer_.is_valid() && static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value(); bool global_overflow = this->max_queue_length_.value () != 0 && this->global_queue_length_ >= this->max_queue_length_.value (); while (local_overflow || global_overflow) { if (blocking_policy_.is_valid()) { ACE_Time_Value timeout; ORBSVCS_Time::TimeT_to_Time_Value(timeout, blocking_policy_.value()); // Condition variables take an absolute time timeout += ACE_OS::gettimeofday(); if (local_overflow) { local_not_full_.wait(&timeout); } else { global_not_full_.wait(&timeout); } if (errno != ETIME) { local_overflow = this->max_events_per_consumer_.is_valid() && static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value(); global_overflow = this->max_queue_length_.value () != 0 && this->global_queue_length_ >= this->max_queue_length_.value (); continue; } } if (tracker_ != 0) { tracker_->count_queue_overflow (local_overflow, global_overflow); } discarded_existing = this->discard(method_request); if (discarded_existing) { --this->global_queue_length_; local_not_full_.signal(); global_not_full_.signal(); } break; } if (! (local_overflow || global_overflow) || discarded_existing) { if (this->queue (method_request) == -1) { ACE_DEBUG((LM_DEBUG, "Notify (%P|%t) - Panic! failed to enqueue event\n")); return -1; } ++this->global_queue_length_; local_not_empty_.signal (); } else { ACE_DEBUG((LM_DEBUG, "Notify (%P|%t) - Panic! did not attempt to enqueue event\n")); return -1; } size_t count = this->msg_queue_.message_count (); if (this->tracker_ != 0) { this->tracker_->update_queue_count (count); } return ACE_Utils::truncate_cast<int> (count); }
ACE_Time_Value TAO_Notify_Buffering_Strategy::oldest_event | ( | void | ) |
Provide the time value of the oldest event in the queue.
Definition at line 74 of file Buffering_Strategy.cpp.
{ ACE_Time_Value tv (ACE_Time_Value::max_time); ACE_Message_Block* mb = 0; ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, tv); TAO_Notify_Message_Queue::ITERATOR itr (this->msg_queue_); while(itr.next (mb)) { TAO_Notify_Method_Request_Queueable* event = dynamic_cast<TAO_Notify_Method_Request_Queueable*> (mb); if (event != 0) { const ACE_Time_Value& etime = event->creation_time (); if (etime < tv) tv = etime; } itr.advance (); } return tv; }
int TAO_Notify_Buffering_Strategy::queue | ( | TAO_Notify_Method_Request_Queueable * | method_request | ) | [private] |
Apply the Order Policy and queue. return -1 on error.
Definition at line 280 of file Buffering_Strategy.cpp.
{ if ( this->shutdown_ ) return -1; CORBA::Short order = this->order_policy_.value(); if (! this->order_policy_.is_valid() || order == CosNotification::AnyOrder || order == CosNotification::FifoOrder) { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in fifo order\n")); return this->msg_queue_.enqueue_tail (method_request); } if (order == CosNotification::PriorityOrder) { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in priority order\n")); return this->msg_queue_.enqueue_prio (method_request); } if (order == CosNotification::DeadlineOrder) { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in deadline order\n")); return this->msg_queue_.enqueue_deadline (method_request); } if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n")); return this->msg_queue_.enqueue_tail (method_request); }
void TAO_Notify_Buffering_Strategy::set_tracker | ( | TAO_Notify_Buffering_Strategy::Tracker * | tracker | ) |
Set the tracker object. This strategy does not own the tracker.
Definition at line 266 of file Buffering_Strategy.cpp.
{ if (this->tracker_ == 0) { this->tracker_ = tracker; } else if (this->tracker_ != tracker) { this->tracker_->register_child (tracker); } }
void TAO_Notify_Buffering_Strategy::shutdown | ( | void | ) |
Shutdown.
Definition at line 57 of file Buffering_Strategy.cpp.
{ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_); if (this->shutdown_) { return; } this->shutdown_ = true; this->local_not_empty_.broadcast (); this->global_not_full_.broadcast(); this->local_not_full_.broadcast(); }
void TAO_Notify_Buffering_Strategy::update_qos_properties | ( | const TAO_Notify_QoSProperties & | qos_properties | ) |
Update state with the following QoS Properties: Order Policy Discard Policy MaxEventsPerConsumer
Definition at line 48 of file Buffering_Strategy.cpp.
{ this->order_policy_.set (qos_properties); this->discard_policy_.set (qos_properties); this->max_events_per_consumer_.set(qos_properties); this->blocking_policy_.set (qos_properties); }
Reference to the properties per event channel.
Definition at line 106 of file Buffering_Strategy.h.
Definition at line 124 of file Buffering_Strategy.h.
Policy to discard when buffers are full.
Definition at line 121 of file Buffering_Strategy.h.
TAO_SYNCH_CONDITION& TAO_Notify_Buffering_Strategy::global_not_full_ [private] |
Definition at line 126 of file Buffering_Strategy.h.
The global queue length - queue length accross all the queues.
Definition at line 112 of file Buffering_Strategy.h.
TAO_SYNCH_MUTEX& TAO_Notify_Buffering_Strategy::global_queue_lock_ [private] |
The shared global lock used by all the queues.
Definition at line 109 of file Buffering_Strategy.h.
TAO_SYNCH_CONDITION TAO_Notify_Buffering_Strategy::local_not_empty_ [private] |
Condition that batch size reached.
Definition at line 130 of file Buffering_Strategy.h.
TAO_SYNCH_CONDITION TAO_Notify_Buffering_Strategy::local_not_full_ [private] |
Definition at line 127 of file Buffering_Strategy.h.
Definition at line 123 of file Buffering_Strategy.h.
const TAO_Notify_Property_Long& TAO_Notify_Buffering_Strategy::max_queue_length_ [private] |
The maximum events that can be queued overall.
Definition at line 115 of file Buffering_Strategy.h.
Order of events in internal buffers.
Definition at line 118 of file Buffering_Strategy.h.
bool TAO_Notify_Buffering_Strategy::shutdown_ [private] |
Flag to shutdown.
Definition at line 133 of file Buffering_Strategy.h.
Tracker* TAO_Notify_Buffering_Strategy::tracker_ [private] |
Optional queue tracker.
Definition at line 136 of file Buffering_Strategy.h.