Classes | Public Member Functions | Private Member Functions | Private Attributes

TAO_Notify_Buffering_Strategy Class Reference

Base Strategy to enqueue and dequeue items from a Message Queue. More...

#include <Buffering_Strategy.h>

Collaboration diagram for TAO_Notify_Buffering_Strategy:
Collaboration graph
[legend]

List of all members.

Classes

class  Tracker
 This interface allows tracking of the queue size. More...

Public Member Functions

 TAO_Notify_Buffering_Strategy (TAO_Notify_Message_Queue &msg_queue, const TAO_Notify_AdminProperties::Ptr &admin_properties)
 ~TAO_Notify_Buffering_Strategy ()
void update_qos_properties (const TAO_Notify_QoSProperties &qos_properties)
int enqueue (TAO_Notify_Method_Request_Queueable *method_request)
int dequeue (TAO_Notify_Method_Request_Queueable *&method_request, const ACE_Time_Value *abstime)
void shutdown (void)
 Shutdown.
ACE_Time_Value oldest_event (void)
 Provide the time value of the oldest event in the queue.
void set_tracker (Tracker *tracker)
 Set the tracker object. This strategy does not own the tracker.

Private Member Functions

int queue (TAO_Notify_Method_Request_Queueable *method_request)
 Apply the Order Policy and queue. return -1 on error.
bool discard (TAO_Notify_Method_Request_Queueable *method_request)
 Discard as per the Discard Policy.

Private Attributes

TAO_Notify_Message_Queuemsg_queue_
 = Data Members
TAO_Notify_AdminProperties::Ptr admin_properties_
 Reference to the properties per event channel.
TAO_SYNCH_MUTEX & global_queue_lock_
 The shared global lock used by all the queues.
CORBA::Longglobal_queue_length_
 The global queue length - queue length accross all the queues.
const TAO_Notify_Property_Longmax_queue_length_
 The maximum events that can be queued overall.
TAO_Notify_Property_Short order_policy_
 Order of events in internal buffers.
TAO_Notify_Property_Short discard_policy_
 Policy to discard when buffers are full.
TAO_Notify_Property_Long max_events_per_consumer_
TAO_Notify_Property_Time blocking_policy_
TAO_SYNCH_CONDITION & global_not_full_
TAO_SYNCH_CONDITION local_not_full_
TAO_SYNCH_CONDITION local_not_empty_
 Condition that batch size reached.
bool shutdown_
 Flag to shutdown.
Trackertracker_
 Optional queue tracker.

Detailed Description

Base Strategy to enqueue and dequeue items from a Message Queue.

Definition at line 45 of file Buffering_Strategy.h.


Constructor & Destructor Documentation

TAO_Notify_Buffering_Strategy::TAO_Notify_Buffering_Strategy ( TAO_Notify_Message_Queue msg_queue,
const TAO_Notify_AdminProperties::Ptr admin_properties 
)

Definition at line 22 of file Buffering_Strategy.cpp.

TAO_Notify_Buffering_Strategy::~TAO_Notify_Buffering_Strategy (  ) 

Definition at line 42 of file Buffering_Strategy.cpp.

{
}


Member Function Documentation

int TAO_Notify_Buffering_Strategy::dequeue ( TAO_Notify_Method_Request_Queueable *&  method_request,
const ACE_Time_Value abstime 
)

Dequeue batch. This method will block for abstime if non-zero or else blocks till an item is available. Return -1 on error or if nothing is available, else the number of items actually dequeued (1).

Definition at line 225 of file Buffering_Strategy.cpp.

{
  ACE_Message_Block *mb;

  ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);

  if ( this->shutdown_ )
    return -1;

  while (this->msg_queue_.message_count () == 0)
    {
      this->local_not_empty_.wait (abstime);

      if (this->shutdown_)
        return -1;

      if (errno == ETIME)
        return 0;
    }

  if (this->msg_queue_.dequeue (mb) == -1)
    return -1;

  if (this->tracker_ != 0)
    {
      this->tracker_->update_queue_count (this->msg_queue_.message_count ());
    }

  method_request = dynamic_cast<TAO_Notify_Method_Request_Queueable*>(mb);

  if (method_request == 0)
    return -1;

  --this->global_queue_length_;
  local_not_full_.signal();
  global_not_full_.signal();

  return 1;
}

bool TAO_Notify_Buffering_Strategy::discard ( TAO_Notify_Method_Request_Queueable method_request  )  [private]

Discard as per the Discard Policy.

Definition at line 316 of file Buffering_Strategy.cpp.

{
  if (this->shutdown_)
    {
      return false;
    }

  ACE_Message_Block* mb = 0;
  int result = -1;

  if (this->discard_policy_.is_valid() == 0 ||
      this->discard_policy_ == CosNotification::AnyOrder ||
      this->discard_policy_ == CosNotification::FifoOrder)
    {
      result = this->msg_queue_.dequeue_head (mb);
    }
  else if (this->discard_policy_ == CosNotification::LifoOrder)
    {
      // The most current message is NOT the newest one in the queue. It's
      // the one we're about to add to the queue.
      result = -1;
    }
  else if (this->discard_policy_ == CosNotification::DeadlineOrder)
    {
      result = this->msg_queue_.dequeue_deadline (mb);
    }
  else if (this->discard_policy_ == CosNotification::PriorityOrder)
    {
      result = this->msg_queue_.dequeue_prio (mb);
      if (mb->msg_priority() >= method_request->msg_priority())
        {
          this->msg_queue_.enqueue_prio (mb);
          result = -1;
        }
    }
  else
    {
      if (TAO_debug_level > 0)
        ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid discard policy\n"));
      result = this->msg_queue_.dequeue_head (mb);
    }

  if (result != -1)
    {
      ACE_Message_Block::release (mb);
      return true;
    }

  return false;
}

int TAO_Notify_Buffering_Strategy::enqueue ( TAO_Notify_Method_Request_Queueable method_request  ) 

Enqueue according the enqueing strategy. Return -1 on error else the number of items in the queue.

Definition at line 138 of file Buffering_Strategy.cpp.

{
  ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);

  if (this->shutdown_)
    return -1;

  bool discarded_existing = false;

  bool local_overflow = this->max_events_per_consumer_.is_valid() &&
    static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();

  bool global_overflow = this->max_queue_length_.value () != 0 &&
    this->global_queue_length_ >= this->max_queue_length_.value ();

  while (local_overflow || global_overflow)
    {
      if (blocking_policy_.is_valid())
        {
          ACE_Time_Value timeout;
          ORBSVCS_Time::TimeT_to_Time_Value(timeout, blocking_policy_.value());
          // Condition variables take an absolute time
          timeout += ACE_OS::gettimeofday();
          if (local_overflow)
            {
              local_not_full_.wait(&timeout);
            }
          else
            {
              global_not_full_.wait(&timeout);
            }
          if (errno != ETIME)
            {
              local_overflow =
                this->max_events_per_consumer_.is_valid() &&
                static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
              global_overflow =
                this->max_queue_length_.value () != 0 &&
                this->global_queue_length_ >= this->max_queue_length_.value ();
              continue;
            }
        }
      if (tracker_ != 0)
        {
          tracker_->count_queue_overflow (local_overflow, global_overflow);
        }

      discarded_existing = this->discard(method_request);
      if (discarded_existing)
        {
          --this->global_queue_length_;
          local_not_full_.signal();
          global_not_full_.signal();
        }
      break;
    }

  if (! (local_overflow || global_overflow) || discarded_existing)
    {
      if (this->queue (method_request) == -1)
        {
          ACE_DEBUG((LM_DEBUG,
                     "Notify (%P|%t) - Panic! failed to enqueue event\n"));
          return -1;
        }

      ++this->global_queue_length_;

      local_not_empty_.signal ();
    }
  else
    {
      ACE_DEBUG((LM_DEBUG,
                 "Notify (%P|%t) - Panic! did not attempt to enqueue event\n"));
      return -1;
    }

  size_t count = this->msg_queue_.message_count ();
  if (this->tracker_ != 0)
    {
      this->tracker_->update_queue_count (count);
    }

  return ACE_Utils::truncate_cast<int> (count);
}

ACE_Time_Value TAO_Notify_Buffering_Strategy::oldest_event ( void   ) 

Provide the time value of the oldest event in the queue.

Definition at line 74 of file Buffering_Strategy.cpp.

{
  ACE_Time_Value tv (ACE_Time_Value::max_time);
  ACE_Message_Block* mb = 0;

  ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, tv);
  TAO_Notify_Message_Queue::ITERATOR itr (this->msg_queue_);
  while(itr.next (mb))
    {
      TAO_Notify_Method_Request_Queueable* event =
        dynamic_cast<TAO_Notify_Method_Request_Queueable*> (mb);
      if (event != 0)
        {
          const ACE_Time_Value& etime = event->creation_time ();
          if (etime < tv)
            tv = etime;
        }
      itr.advance ();
    }

  return tv;
}

int TAO_Notify_Buffering_Strategy::queue ( TAO_Notify_Method_Request_Queueable method_request  )  [private]

Apply the Order Policy and queue. return -1 on error.

Definition at line 280 of file Buffering_Strategy.cpp.

{
  if ( this->shutdown_ )
    return -1;

  CORBA::Short order = this->order_policy_.value();

  if (! this->order_policy_.is_valid() ||
      order == CosNotification::AnyOrder ||
      order == CosNotification::FifoOrder)
    {
      if (TAO_debug_level > 0)
        ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in fifo order\n"));
      return this->msg_queue_.enqueue_tail (method_request);
    }

  if (order == CosNotification::PriorityOrder)
    {
      if (TAO_debug_level > 0)
        ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in priority order\n"));
      return this->msg_queue_.enqueue_prio (method_request);
    }

  if (order == CosNotification::DeadlineOrder)
    {
      if (TAO_debug_level > 0)
        ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in deadline order\n"));
      return this->msg_queue_.enqueue_deadline (method_request);
    }

  if (TAO_debug_level > 0)
    ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n"));
  return this->msg_queue_.enqueue_tail (method_request);
}

void TAO_Notify_Buffering_Strategy::set_tracker ( TAO_Notify_Buffering_Strategy::Tracker tracker  ) 

Set the tracker object. This strategy does not own the tracker.

Definition at line 266 of file Buffering_Strategy.cpp.

{
  if (this->tracker_ == 0)
    {
      this->tracker_ = tracker;
    }
  else if (this->tracker_ != tracker)
    {
      this->tracker_->register_child (tracker);
    }
}

void TAO_Notify_Buffering_Strategy::shutdown ( void   ) 

Shutdown.

Definition at line 57 of file Buffering_Strategy.cpp.

{
  ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_);

  if (this->shutdown_)
  {
    return;
  }

  this->shutdown_ = true;

  this->local_not_empty_.broadcast ();
  this->global_not_full_.broadcast();
  this->local_not_full_.broadcast();
}

void TAO_Notify_Buffering_Strategy::update_qos_properties ( const TAO_Notify_QoSProperties qos_properties  ) 

Update state with the following QoS Properties: Order Policy Discard Policy MaxEventsPerConsumer

Definition at line 48 of file Buffering_Strategy.cpp.

{
  this->order_policy_.set (qos_properties);
  this->discard_policy_.set (qos_properties);
  this->max_events_per_consumer_.set(qos_properties);
  this->blocking_policy_.set (qos_properties);
}


Member Data Documentation

Reference to the properties per event channel.

Definition at line 106 of file Buffering_Strategy.h.

Definition at line 124 of file Buffering_Strategy.h.

Policy to discard when buffers are full.

Definition at line 121 of file Buffering_Strategy.h.

TAO_SYNCH_CONDITION& TAO_Notify_Buffering_Strategy::global_not_full_ [private]

Definition at line 126 of file Buffering_Strategy.h.

The global queue length - queue length accross all the queues.

Definition at line 112 of file Buffering_Strategy.h.

The shared global lock used by all the queues.

Definition at line 109 of file Buffering_Strategy.h.

TAO_SYNCH_CONDITION TAO_Notify_Buffering_Strategy::local_not_empty_ [private]

Condition that batch size reached.

Definition at line 130 of file Buffering_Strategy.h.

TAO_SYNCH_CONDITION TAO_Notify_Buffering_Strategy::local_not_full_ [private]

Definition at line 127 of file Buffering_Strategy.h.

Definition at line 123 of file Buffering_Strategy.h.

The maximum events that can be queued overall.

Definition at line 115 of file Buffering_Strategy.h.

= Data Members

The local Message Queue

Definition at line 103 of file Buffering_Strategy.h.

Order of events in internal buffers.

Definition at line 118 of file Buffering_Strategy.h.

Flag to shutdown.

Definition at line 133 of file Buffering_Strategy.h.

Optional queue tracker.

Definition at line 136 of file Buffering_Strategy.h.


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines