Base Strategy to enqueue and dequeue items from a Message Queue. More...
#include <Buffering_Strategy.h>

Classes | |
| class | Tracker |
| This interface allows tracking of the queue size. More... | |
Public Member Functions | |
| TAO_Notify_Buffering_Strategy (TAO_Notify_Message_Queue &msg_queue, const TAO_Notify_AdminProperties::Ptr &admin_properties) | |
| ~TAO_Notify_Buffering_Strategy () | |
| void | update_qos_properties (const TAO_Notify_QoSProperties &qos_properties) |
| int | enqueue (TAO_Notify_Method_Request_Queueable *method_request) |
| int | dequeue (TAO_Notify_Method_Request_Queueable *&method_request, const ACE_Time_Value *abstime) |
| void | shutdown (void) |
| Shutdown. | |
| ACE_Time_Value | oldest_event (void) |
| Provide the time value of the oldest event in the queue. | |
| void | set_tracker (Tracker *tracker) |
| Set the tracker object. This strategy does not own the tracker. | |
Private Member Functions | |
| int | queue (TAO_Notify_Method_Request_Queueable *method_request) |
| Apply the Order Policy and queue. return -1 on error. | |
| bool | discard (TAO_Notify_Method_Request_Queueable *method_request) |
| Discard as per the Discard Policy. | |
Private Attributes | |
| TAO_Notify_Message_Queue & | msg_queue_ |
| = Data Members | |
| TAO_Notify_AdminProperties::Ptr | admin_properties_ |
| Reference to the properties per event channel. | |
| TAO_SYNCH_MUTEX & | global_queue_lock_ |
| The shared global lock used by all the queues. | |
| CORBA::Long & | global_queue_length_ |
| The global queue length - queue length accross all the queues. | |
| const TAO_Notify_Property_Long & | max_queue_length_ |
| The maximum events that can be queued overall. | |
| TAO_Notify_Property_Short | order_policy_ |
| Order of events in internal buffers. | |
| TAO_Notify_Property_Short | discard_policy_ |
| Policy to discard when buffers are full. | |
| TAO_Notify_Property_Long | max_events_per_consumer_ |
| TAO_Notify_Property_Time | blocking_policy_ |
| TAO_SYNCH_CONDITION & | global_not_full_ |
| TAO_SYNCH_CONDITION | local_not_full_ |
| TAO_SYNCH_CONDITION | local_not_empty_ |
| Condition that batch size reached. | |
| bool | shutdown_ |
| Flag to shutdown. | |
| Tracker * | tracker_ |
| Optional queue tracker. | |
Base Strategy to enqueue and dequeue items from a Message Queue.
Definition at line 45 of file Buffering_Strategy.h.
| TAO_Notify_Buffering_Strategy::TAO_Notify_Buffering_Strategy | ( | TAO_Notify_Message_Queue & | msg_queue, | |
| const TAO_Notify_AdminProperties::Ptr & | admin_properties | |||
| ) |
Definition at line 22 of file Buffering_Strategy.cpp.
: msg_queue_ (msg_queue) , admin_properties_ (admin_properties) , global_queue_lock_ (admin_properties->global_queue_lock ()) , global_queue_length_ (admin_properties->global_queue_length ()) , max_queue_length_ (admin_properties->max_global_queue_length ()) , order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder) , discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder) , max_events_per_consumer_ (CosNotification::MaxEventsPerConsumer) , blocking_policy_ (TAO_Notify_Extensions::BlockingPolicy) , global_not_full_ (admin_properties->global_queue_not_full()) , local_not_full_ (global_queue_lock_) , local_not_empty_ (global_queue_lock_) , shutdown_ (false) , tracker_ (0) { }
| TAO_Notify_Buffering_Strategy::~TAO_Notify_Buffering_Strategy | ( | ) |
Definition at line 42 of file Buffering_Strategy.cpp.
{
}
| int TAO_Notify_Buffering_Strategy::dequeue | ( | TAO_Notify_Method_Request_Queueable *& | method_request, | |
| const ACE_Time_Value * | abstime | |||
| ) |
Dequeue batch. This method will block for abstime if non-zero or else blocks till an item is available. Return -1 on error or if nothing is available, else the number of items actually dequeued (1).
Definition at line 225 of file Buffering_Strategy.cpp.
{
ACE_Message_Block *mb;
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
if ( this->shutdown_ )
return -1;
while (this->msg_queue_.message_count () == 0)
{
this->local_not_empty_.wait (abstime);
if (this->shutdown_)
return -1;
if (errno == ETIME)
return 0;
}
if (this->msg_queue_.dequeue (mb) == -1)
return -1;
if (this->tracker_ != 0)
{
this->tracker_->update_queue_count (this->msg_queue_.message_count ());
}
method_request = dynamic_cast<TAO_Notify_Method_Request_Queueable*>(mb);
if (method_request == 0)
return -1;
--this->global_queue_length_;
local_not_full_.signal();
global_not_full_.signal();
return 1;
}
| bool TAO_Notify_Buffering_Strategy::discard | ( | TAO_Notify_Method_Request_Queueable * | method_request | ) | [private] |
Discard as per the Discard Policy.
Definition at line 316 of file Buffering_Strategy.cpp.
{
if (this->shutdown_)
{
return false;
}
ACE_Message_Block* mb = 0;
int result = -1;
if (this->discard_policy_.is_valid() == 0 ||
this->discard_policy_ == CosNotification::AnyOrder ||
this->discard_policy_ == CosNotification::FifoOrder)
{
result = this->msg_queue_.dequeue_head (mb);
}
else if (this->discard_policy_ == CosNotification::LifoOrder)
{
// The most current message is NOT the newest one in the queue. It's
// the one we're about to add to the queue.
result = -1;
}
else if (this->discard_policy_ == CosNotification::DeadlineOrder)
{
result = this->msg_queue_.dequeue_deadline (mb);
}
else if (this->discard_policy_ == CosNotification::PriorityOrder)
{
result = this->msg_queue_.dequeue_prio (mb);
if (mb->msg_priority() >= method_request->msg_priority())
{
this->msg_queue_.enqueue_prio (mb);
result = -1;
}
}
else
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid discard policy\n"));
result = this->msg_queue_.dequeue_head (mb);
}
if (result != -1)
{
ACE_Message_Block::release (mb);
return true;
}
return false;
}
| int TAO_Notify_Buffering_Strategy::enqueue | ( | TAO_Notify_Method_Request_Queueable * | method_request | ) |
Enqueue according the enqueing strategy. Return -1 on error else the number of items in the queue.
Definition at line 138 of file Buffering_Strategy.cpp.
{
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
if (this->shutdown_)
return -1;
bool discarded_existing = false;
bool local_overflow = this->max_events_per_consumer_.is_valid() &&
static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
bool global_overflow = this->max_queue_length_.value () != 0 &&
this->global_queue_length_ >= this->max_queue_length_.value ();
while (local_overflow || global_overflow)
{
if (blocking_policy_.is_valid())
{
ACE_Time_Value timeout;
ORBSVCS_Time::TimeT_to_Time_Value(timeout, blocking_policy_.value());
// Condition variables take an absolute time
timeout += ACE_OS::gettimeofday();
if (local_overflow)
{
local_not_full_.wait(&timeout);
}
else
{
global_not_full_.wait(&timeout);
}
if (errno != ETIME)
{
local_overflow =
this->max_events_per_consumer_.is_valid() &&
static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
global_overflow =
this->max_queue_length_.value () != 0 &&
this->global_queue_length_ >= this->max_queue_length_.value ();
continue;
}
}
if (tracker_ != 0)
{
tracker_->count_queue_overflow (local_overflow, global_overflow);
}
discarded_existing = this->discard(method_request);
if (discarded_existing)
{
--this->global_queue_length_;
local_not_full_.signal();
global_not_full_.signal();
}
break;
}
if (! (local_overflow || global_overflow) || discarded_existing)
{
if (this->queue (method_request) == -1)
{
ACE_DEBUG((LM_DEBUG,
"Notify (%P|%t) - Panic! failed to enqueue event\n"));
return -1;
}
++this->global_queue_length_;
local_not_empty_.signal ();
}
else
{
ACE_DEBUG((LM_DEBUG,
"Notify (%P|%t) - Panic! did not attempt to enqueue event\n"));
return -1;
}
size_t count = this->msg_queue_.message_count ();
if (this->tracker_ != 0)
{
this->tracker_->update_queue_count (count);
}
return ACE_Utils::truncate_cast<int> (count);
}
| ACE_Time_Value TAO_Notify_Buffering_Strategy::oldest_event | ( | void | ) |
Provide the time value of the oldest event in the queue.
Definition at line 74 of file Buffering_Strategy.cpp.
{
ACE_Time_Value tv (ACE_Time_Value::max_time);
ACE_Message_Block* mb = 0;
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, tv);
TAO_Notify_Message_Queue::ITERATOR itr (this->msg_queue_);
while(itr.next (mb))
{
TAO_Notify_Method_Request_Queueable* event =
dynamic_cast<TAO_Notify_Method_Request_Queueable*> (mb);
if (event != 0)
{
const ACE_Time_Value& etime = event->creation_time ();
if (etime < tv)
tv = etime;
}
itr.advance ();
}
return tv;
}
| int TAO_Notify_Buffering_Strategy::queue | ( | TAO_Notify_Method_Request_Queueable * | method_request | ) | [private] |
Apply the Order Policy and queue. return -1 on error.
Definition at line 280 of file Buffering_Strategy.cpp.
{
if ( this->shutdown_ )
return -1;
CORBA::Short order = this->order_policy_.value();
if (! this->order_policy_.is_valid() ||
order == CosNotification::AnyOrder ||
order == CosNotification::FifoOrder)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in fifo order\n"));
return this->msg_queue_.enqueue_tail (method_request);
}
if (order == CosNotification::PriorityOrder)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in priority order\n"));
return this->msg_queue_.enqueue_prio (method_request);
}
if (order == CosNotification::DeadlineOrder)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in deadline order\n"));
return this->msg_queue_.enqueue_deadline (method_request);
}
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n"));
return this->msg_queue_.enqueue_tail (method_request);
}
| void TAO_Notify_Buffering_Strategy::set_tracker | ( | TAO_Notify_Buffering_Strategy::Tracker * | tracker | ) |
Set the tracker object. This strategy does not own the tracker.
Definition at line 266 of file Buffering_Strategy.cpp.
{
if (this->tracker_ == 0)
{
this->tracker_ = tracker;
}
else if (this->tracker_ != tracker)
{
this->tracker_->register_child (tracker);
}
}
| void TAO_Notify_Buffering_Strategy::shutdown | ( | void | ) |
Shutdown.
Definition at line 57 of file Buffering_Strategy.cpp.
{
ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_);
if (this->shutdown_)
{
return;
}
this->shutdown_ = true;
this->local_not_empty_.broadcast ();
this->global_not_full_.broadcast();
this->local_not_full_.broadcast();
}
| void TAO_Notify_Buffering_Strategy::update_qos_properties | ( | const TAO_Notify_QoSProperties & | qos_properties | ) |
Update state with the following QoS Properties: Order Policy Discard Policy MaxEventsPerConsumer
Definition at line 48 of file Buffering_Strategy.cpp.
{
this->order_policy_.set (qos_properties);
this->discard_policy_.set (qos_properties);
this->max_events_per_consumer_.set(qos_properties);
this->blocking_policy_.set (qos_properties);
}
Reference to the properties per event channel.
Definition at line 106 of file Buffering_Strategy.h.
Definition at line 124 of file Buffering_Strategy.h.
Policy to discard when buffers are full.
Definition at line 121 of file Buffering_Strategy.h.
TAO_SYNCH_CONDITION& TAO_Notify_Buffering_Strategy::global_not_full_ [private] |
Definition at line 126 of file Buffering_Strategy.h.
The global queue length - queue length accross all the queues.
Definition at line 112 of file Buffering_Strategy.h.
TAO_SYNCH_MUTEX& TAO_Notify_Buffering_Strategy::global_queue_lock_ [private] |
The shared global lock used by all the queues.
Definition at line 109 of file Buffering_Strategy.h.
TAO_SYNCH_CONDITION TAO_Notify_Buffering_Strategy::local_not_empty_ [private] |
Condition that batch size reached.
Definition at line 130 of file Buffering_Strategy.h.
TAO_SYNCH_CONDITION TAO_Notify_Buffering_Strategy::local_not_full_ [private] |
Definition at line 127 of file Buffering_Strategy.h.
Definition at line 123 of file Buffering_Strategy.h.
const TAO_Notify_Property_Long& TAO_Notify_Buffering_Strategy::max_queue_length_ [private] |
The maximum events that can be queued overall.
Definition at line 115 of file Buffering_Strategy.h.
Order of events in internal buffers.
Definition at line 118 of file Buffering_Strategy.h.
bool TAO_Notify_Buffering_Strategy::shutdown_ [private] |
Flag to shutdown.
Definition at line 133 of file Buffering_Strategy.h.
Tracker* TAO_Notify_Buffering_Strategy::tracker_ [private] |
Optional queue tracker.
Definition at line 136 of file Buffering_Strategy.h.
1.7.0