Public Member Functions | Public Attributes | Protected Member Functions | Protected Attributes | Private Member Functions

ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL > Class Template Reference

A derived class which adapts the ACE_Message_Queue class in order to maintain dynamic priorities for enqueued <ACE_Message_Blocks> and manage the queue order according to these dynamic priorities. More...

#include <Message_Queue_T.h>

Inheritance diagram for ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >:
Inheritance graph
[legend]
Collaboration diagram for ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy &message_strategy, size_t hwm=ACE_Message_Queue_Base::DEFAULT_HWM, size_t lwm=ACE_Message_Queue_Base::DEFAULT_LWM, ACE_Notification_Strategy *=0)
virtual ~ACE_Dynamic_Message_Queue (void)
 Close down the message queue and release all resources.
virtual int remove_messages (ACE_Message_Block *&list_head, ACE_Message_Block *&list_tail, u_int status_flags)
virtual int dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0)
virtual void dump (void) const
 Dump the state of the queue.
virtual int enqueue_tail (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0)
virtual int enqueue_head (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0)

Public Attributes

 ACE_ALLOC_HOOK_DECLARE
 Declare the dynamic allocation hooks.

Protected Member Functions

virtual int enqueue_i (ACE_Message_Block *new_item)
virtual int sublist_enqueue_i (ACE_Message_Block *new_item, const ACE_Time_Value &current_time, ACE_Message_Block *&sublist_head, ACE_Message_Block *&sublist_tail, ACE_Dynamic_Message_Strategy::Priority_Status status)
 Enqueue a message in priority order within a given priority status sublist.
virtual int dequeue_head_i (ACE_Message_Block *&first_item)
virtual int refresh_queue (const ACE_Time_Value &current_time)
virtual int refresh_pending_queue (const ACE_Time_Value &current_time)
virtual int refresh_late_queue (const ACE_Time_Value &current_time)

Protected Attributes

ACE_Message_Blockpending_head_
 Pointer to head of the pending messages.
ACE_Message_Blockpending_tail_
 Pointer to tail of the pending messages.
ACE_Message_Blocklate_head_
 Pointer to head of the late messages.
ACE_Message_Blocklate_tail_
 Pointer to tail of the late messages.
ACE_Message_Blockbeyond_late_head_
 Pointer to head of the beyond late messages.
ACE_Message_Blockbeyond_late_tail_
 Pointer to tail of the beyond late messages.
ACE_Dynamic_Message_Strategymessage_strategy_
 Pointer to a dynamic priority evaluation function.

Private Member Functions

void operator= (const ACE_Dynamic_Message_Queue< _ACE_SYNCH_MUTEX_T, _ACE_SYNCH_CONDITION_T > &)
 ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue< _ACE_SYNCH_MUTEX_T, _ACE_SYNCH_CONDITION_T > &)
virtual int peek_dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0)
 Private method to hide public base class method: just calls base class method.

Detailed Description

template<ACE_SYNCH_DECL>
class ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >

A derived class which adapts the ACE_Message_Queue class in order to maintain dynamic priorities for enqueued <ACE_Message_Blocks> and manage the queue order according to these dynamic priorities.

The messages in the queue are managed so as to preserve a logical ordering with minimal overhead per enqueue and dequeue operation. For this reason, the actual order of messages in the linked list of the queue may differ from their priority order. As time passes, a message may change from pending status to late status, and eventually to beyond late status. To minimize reordering overhead under this design force, three separate boundaries are maintained within the linked list of messages. Messages are dequeued preferentially from the head of the pending portion, then the head of the late portion, and finally from the head of the beyond late portion. In this way, only the boundaries need to be maintained (which can be done efficiently, as aging messages maintain the same linked list order as they progress from one status to the next), with no reordering of the messages themselves, while providing correct priority ordered dequeueing semantics. Head and tail enqueue methods inherited from ACE_Message_Queue are made private to prevent out-of-order messages from confusing management of the various portions of the queue. Messages in the pending portion of the queue whose priority becomes late (according to the specific dynamic strategy) advance into the late portion of the queue. Messages in the late portion of the queue whose priority becomes later than can be represented advance to the beyond_late portion of the queue. These behaviors support a limited schedule overrun, with pending messages prioritized ahead of late messages, and late messages ahead of beyond late messages. These behaviors can be modified in derived classes by providing alternative definitions for the appropriate virtual methods. When filled with messages, the queue's linked list should look like: H T | | B - B - B - B - L - L - L - P - P - P - P - P | | | | | | BH BT LH LT PH PT Where the symbols are as follows: H = Head of the entire list T = Tail of the entire list B = Beyond late message BH = Beyond late messages Head BT = Beyond late messages Tail L = Late message LH = Late messages Head LT = Late messages Tail P = Pending message PH = Pending messages Head PT = Pending messages Tail Caveat: the virtual methods enqueue_tail, enqueue_head, and peek_dequeue_head have semantics for the static message queues that cannot be guaranteed for dynamic message queues. The peek_dequeue_head method just calls the base class method, while the two enqueue methods call the priority enqueue method. The order of messages in the dynamic queue is a function of message deadlines and how long they are in the queues. You can manipulate these in some cases to ensure the correct semantics, but that is not a very stable or portable approach (discouraged).

Definition at line 767 of file Message_Queue_T.h.


Constructor & Destructor Documentation

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::ACE_Dynamic_Message_Queue ( ACE_Dynamic_Message_Strategy message_strategy,
size_t  hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
size_t  lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
ACE_Notification_Strategy ns = 0 
)

Definition at line 2027 of file Message_Queue_T.cpp.

  : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
    pending_head_ (0),
    pending_tail_ (0),
    late_head_ (0),
    late_tail_ (0),
    beyond_late_head_ (0),
    beyond_late_tail_ (0),
    message_strategy_ (message_strategy)
{
  // Note, the ACE_Dynamic_Message_Queue assumes full responsibility
  // for the passed ACE_Dynamic_Message_Strategy object, and deletes
  // it in its own dtor
}

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::~ACE_Dynamic_Message_Queue ( void   )  [virtual]

Close down the message queue and release all resources.

Definition at line 2048 of file Message_Queue_T.cpp.

{
  delete &this->message_strategy_;
}

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::ACE_Dynamic_Message_Queue ( const ACE_Dynamic_Message_Queue< _ACE_SYNCH_MUTEX_T, _ACE_SYNCH_CONDITION_T > &   )  [private]

Member Function Documentation

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::dequeue_head ( ACE_Message_Block *&  first_item,
ACE_Time_Value timeout = 0 
) [virtual]

Dequeue and return the <ACE_Message_Block *> at the head of the queue. Returns -1 on failure, else the number of items still on the queue.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2194 of file Message_Queue_T.cpp.

{
  ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head");

  ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);

  if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
    {
      errno = ESHUTDOWN;
      return -1;
    }

  int result;

  // get the current time
  ACE_Time_Value current_time = ACE_OS::gettimeofday ();

  // refresh priority status boundaries in the queue
  result = this->refresh_queue (current_time);
  if (result < 0)
    return result;

  // *now* it's appropriate to wait for an enqueued item
  result = this->wait_not_empty_cond (ace_mon, timeout);
  if (result == -1)
    return result;

  // call the internal dequeue method, which selects an item from the
  // highest priority status portion of the queue that has messages
  // enqueued.
  result = this->dequeue_head_i (first_item);

  return result;
}

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::dequeue_head_i ( ACE_Message_Block *&  first_item  )  [protected, virtual]

Dequeue and return the <ACE_Message_Block *> at the head of the logical queue. Attempts first to dequeue from the pending portion of the queue, or if that is empty from the late portion, or if that is empty from the beyond late portion, or if that is empty just sets the passed pointer to zero and returns -1.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2490 of file Message_Queue_T.cpp.

{
  ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");

  int result = 0;
  int last_in_subqueue = 0;

  // first, try to dequeue from the head of the pending list
  if (this->pending_head_)
    {
      first_item = this->pending_head_;

      if (0 == this->pending_head_->prev ())
        this->head_ = this->pending_head_->next ();
      else
        this->pending_head_->prev ()->next (this->pending_head_->next ());

      if (0 == this->pending_head_->next ())
        {
          this->tail_ = this->pending_head_->prev ();
          this->pending_head_ = 0;
          this->pending_tail_ = 0;
        }
      else
        {
          this->pending_head_->next ()->prev (this->pending_head_->prev ());
          this->pending_head_ = this->pending_head_->next ();
        }

      first_item->prev (0);
      first_item->next (0);
    }

  // Second, try to dequeue from the head of the late list
  else if (this->late_head_)
    {
      last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0;

      first_item = this->late_head_;

      if (0 == this->late_head_->prev ())
        this->head_ = this->late_head_->next ();
      else
        this->late_head_->prev ()->next (this->late_head_->next ());

      if (0 == this->late_head_->next ())
        this->tail_ = this->late_head_->prev ();
      else
        {
          this->late_head_->next ()->prev (this->late_head_->prev ());
          this->late_head_ = this->late_head_->next ();
        }

      if (last_in_subqueue)
        {
          this->late_head_ = 0;
          this->late_tail_ = 0;
        }

      first_item->prev (0);
      first_item->next (0);
    }
  // finally, try to dequeue from the head of the beyond late list
  else if (this->beyond_late_head_)
    {
      last_in_subqueue =
        (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0;

      first_item = this->beyond_late_head_;
      this->head_ = this->beyond_late_head_->next ();

      if (0 == this->beyond_late_head_->next ())
        {
          this->tail_ = this->beyond_late_head_->prev ();
        }
      else
        {
          this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ());
          this->beyond_late_head_ = this->beyond_late_head_->next ();
        }

      if (last_in_subqueue)
        {
          this->beyond_late_head_ = 0;
          this->beyond_late_tail_ = 0;
        }

      first_item->prev (0);
      first_item->next (0);
    }
  else
    {
      // nothing to dequeue: set the pointer to zero and return an error code
      first_item = 0;
      result = -1;
    }

  if (result < 0)
    {
      return result;
    }

  size_t mb_bytes = 0;
  size_t mb_length = 0;
  first_item->total_size_and_length (mb_bytes,
                                     mb_length);
  // Subtract off all of the bytes associated with this message.
  this->cur_bytes_ -= mb_bytes;
  this->cur_length_ -= mb_length;
  --this->cur_count_;

  // Only signal enqueueing threads if we've fallen below the low
  // water mark.
  if (this->cur_bytes_ <= this->low_water_mark_
      && this->signal_enqueue_waiters () == -1)
    {
      return -1;
    }
  else
    {
      return ACE_Utils::truncate_cast<int> (this->cur_count_);
    }
}

template<ACE_SYNCH_DECL >
void ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::dump ( void   )  const [virtual]

Dump the state of the queue.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2234 of file Message_Queue_T.cpp.

{
#if defined (ACE_HAS_DUMP)
  ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump");
  ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));

  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class):\n")));
  this->ACE_Message_Queue<ACE_SYNCH_USE>::dump ();

  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("pending_head_ = %u\n")
              ACE_TEXT ("pending_tail_ = %u\n")
              ACE_TEXT ("late_head_ = %u\n")
              ACE_TEXT ("late_tail_ = %u\n")
              ACE_TEXT ("beyond_late_head_ = %u\n")
              ACE_TEXT ("beyond_late_tail_ = %u\n"),
              this->pending_head_,
              this->pending_tail_,
              this->late_head_,
              this->late_tail_,
              this->beyond_late_head_,
              this->beyond_late_tail_));

  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("message_strategy_ :\n")));
  message_strategy_.dump ();

  ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
#endif /* ACE_HAS_DUMP */
}

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::enqueue_head ( ACE_Message_Block new_item,
ACE_Time_Value timeout = 0 
) [virtual]

Just call priority enqueue method: head enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2869 of file Message_Queue_T.cpp.

{
  ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
  return this->enqueue_prio (new_item, timeout);
}

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::enqueue_i ( ACE_Message_Block new_item  )  [protected, virtual]

Enqueue an <ACE_Message_Block *> in accordance with its priority. priority may be *dynamic* or *static* or a combination or *both* It calls the priority evaluation function passed into the Dynamic Message Queue constructor to update the priorities of all enqueued messages.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2266 of file Message_Queue_T.cpp.

{
  ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");

  if (new_item == 0)
    {
      return -1;
    }

  int result = 0;

  // Get the current time.
  ACE_Time_Value current_time = ACE_OS::gettimeofday ();

  // Refresh priority status boundaries in the queue.

  result = this->refresh_queue (current_time);

  if (result < 0)
    {
      return result;
    }

  // Where we enqueue depends on the message's priority status.
  switch (message_strategy_.priority_status (*new_item,
                                             current_time))
    {
    case ACE_Dynamic_Message_Strategy::PENDING:
      if (this->pending_tail_ == 0)
        {
          // Check for simple case of an empty pending queue, where
          // all we need to do is insert <new_item> into the tail of
          // the queue.
          pending_head_ = new_item;
          pending_tail_ = pending_head_;
          return this->enqueue_tail_i (new_item);
        }
      else
        {
          // Enqueue the new message in priority order in the pending
          // sublist
          result = sublist_enqueue_i (new_item,
                                      current_time,
                                      this->pending_head_,
                                      this->pending_tail_,
                                      ACE_Dynamic_Message_Strategy::PENDING);
        }
      break;

    case ACE_Dynamic_Message_Strategy::LATE:
      if (this->late_tail_ == 0)
        {
          late_head_ = new_item;
          late_tail_ = late_head_;

          if (this->pending_head_ == 0)
            // Check for simple case of an empty pending queue,
            // where all we need to do is insert <new_item> into the
            // tail of the queue.
            return this->enqueue_tail_i (new_item);
          else if (this->beyond_late_tail_ == 0)
            // Check for simple case of an empty beyond late queue, where all
            // we need to do is insert <new_item> into the head of the queue.
            return this->enqueue_head_i (new_item);
          else
            {
              // Otherwise, we can just splice the new message in
              // between the pending and beyond late portions of the
              // queue.
              this->beyond_late_tail_->next (new_item);
              new_item->prev (this->beyond_late_tail_);
              this->pending_head_->prev (new_item);
              new_item->next (this->pending_head_);
            }
        }
      else
        {
          // Enqueue the new message in priority order in the late
          // sublist
          result = sublist_enqueue_i (new_item,
                                      current_time,
                                      this->late_head_,
                                      this->late_tail_,
                                      ACE_Dynamic_Message_Strategy::LATE);
        }
      break;

    case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
      if (this->beyond_late_tail_ == 0)
        {
          // Check for simple case of an empty beyond late queue,
          // where all we need to do is insert <new_item> into the
          // head of the queue.
          beyond_late_head_ = new_item;
          beyond_late_tail_ = beyond_late_head_;
          return this->enqueue_head_i (new_item);
        }
      else
        {
          // all beyond late messages have the same (zero) priority,
          // so just put the new one at the end of the beyond late
          // messages
          if (this->beyond_late_tail_->next ())
            {
              this->beyond_late_tail_->next ()->prev (new_item);
            }
          else
            {
              this->tail_ = new_item;
            }

          new_item->next (this->beyond_late_tail_->next ());
          this->beyond_late_tail_->next (new_item);
          new_item->prev (this->beyond_late_tail_);
          this->beyond_late_tail_ = new_item;
        }

      break;

      // should never get here, but just in case...
    default:
      result = -1;
      break;
    }

  if (result < 0)
    {
      return result;
    }

  size_t mb_bytes = 0;
  size_t mb_length = 0;
  new_item->total_size_and_length (mb_bytes,
                                   mb_length);
  this->cur_bytes_ += mb_bytes;
  this->cur_length_ += mb_length;
  ++this->cur_count_;

  if (this->signal_dequeue_waiters () == -1)
    {
      return -1;
    }
  else
    {
      return ACE_Utils::truncate_cast<int> (this->cur_count_);
    }
}

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::enqueue_tail ( ACE_Message_Block new_item,
ACE_Time_Value timeout = 0 
) [virtual]

Just call priority enqueue method: tail enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2856 of file Message_Queue_T.cpp.

{
  ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
  return this->enqueue_prio (new_item, timeout);
}

template<ACE_SYNCH_DECL >
void ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::operator= ( const ACE_Dynamic_Message_Queue< _ACE_SYNCH_MUTEX_T, _ACE_SYNCH_CONDITION_T > &   )  [private]
template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::peek_dequeue_head ( ACE_Message_Block *&  first_item,
ACE_Time_Value timeout = 0 
) [private, virtual]

Private method to hide public base class method: just calls base class method.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2845 of file Message_Queue_T.cpp.

{
  return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item,
                                                              timeout);
}

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::refresh_late_queue ( const ACE_Time_Value current_time  )  [protected, virtual]

Refresh the late queue using the strategy specific priority status function.

Definition at line 2760 of file Message_Queue_T.cpp.

{
  ACE_Dynamic_Message_Strategy::Priority_Status current_status;

  if (this->late_head_)
    {
      current_status = message_strategy_.priority_status (*this->late_head_,
                                                          current_time);
      switch (current_status)
        {
        case ACE_Dynamic_Message_Strategy::BEYOND_LATE:

          // make sure the head of the beyond late queue is set
          // (there may not have been any beyond late messages previously)
          this->beyond_late_head_ = this->head_;

          // advance through the beyond late messages in the late queue
          do
            {
              this->late_head_ = this->late_head_->next ();

              if (this->late_head_)
                current_status = message_strategy_.priority_status (*this->late_head_,
                                                                    current_time);
              else
                break;  // do while

            }
          while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);

          if (this->late_head_)
            {
              // point tail of beyond late sublist to previous item
              this->beyond_late_tail_ = this->late_head_->prev ();

              if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
                {
                  // there are no late messages left in the queue
                  this->late_head_ = 0;
                  this->late_tail_ = 0;
                }
              else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
                // if we got here, something is *seriously* wrong with the queue
                ACE_ERROR_RETURN ((LM_ERROR,
                                   ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
                                   (int) current_status),
                                  -1);
            }
          else
            {
              // there are no late messages left in the queue
              this->beyond_late_tail_ = this->tail_;
              this->late_head_ = 0;
              this->late_tail_ = 0;
            }

          break;  // switch

        case ACE_Dynamic_Message_Strategy::LATE:
          // do nothing - the late queue is unchanged
          break; // switch

        case ACE_Dynamic_Message_Strategy::PENDING:
          // if we got here, something is *seriously* wrong with the queue
          ACE_ERROR_RETURN ((LM_ERROR,
                             ACE_TEXT ("Unexpected message priority status ")
                             ACE_TEXT ("[%d] (expected LATE or BEYOND_LATE)"),
                             (int) current_status),
                            -1);
        default:
          // if we got here, something is *seriously* wrong with the queue
          ACE_ERROR_RETURN ((LM_ERROR,
                             ACE_TEXT ("Unknown message priority status [%d]"),
                             (int) current_status),
                            -1);
        }
    }

  return 0;
}

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::refresh_pending_queue ( const ACE_Time_Value current_time  )  [protected, virtual]

Refresh the pending queue using the strategy specific priority status function.

Definition at line 2637 of file Message_Queue_T.cpp.

{
  ACE_Dynamic_Message_Strategy::Priority_Status current_status;

  // refresh priority status boundaries in the queue
  if (this->pending_head_)
    {
      current_status = message_strategy_.priority_status (*this->pending_head_,
                                                          current_time);
      switch (current_status)
        {
        case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
          // Make sure the head of the beyond late queue is set (there
          // may not have been any beyond late messages previously)
          this->beyond_late_head_ = this->head_;

          // Zero out the late queue pointers, and set them only if
          // there turn out to be late messages in the pending sublist
          this->late_head_ = 0;
          this->late_tail_ = 0;

          // Advance through the beyond late messages in the pending queue
          do
            {
              this->pending_head_ = this->pending_head_->next ();

              if (this->pending_head_)
                current_status = message_strategy_.priority_status (*this->pending_head_,
                                                                    current_time);
              else
                break;  // do while

            }
          while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);

          if (this->pending_head_)
            {
              // point tail of beyond late sublist to previous item
              this->beyond_late_tail_ = this->pending_head_->prev ();

              if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
                // there are no late messages left in the queue
                break; // switch
              else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
                {
                  // if we got here, something is *seriously* wrong with the queue
                  ACE_ERROR_RETURN ((LM_ERROR,
                                     ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
                                     (int) current_status),
                                    -1);
                }
              /* FALLTHRU */
            }
          else
            {
              // There are no pending or late messages left in the
              // queue.
              this->beyond_late_tail_ = this->tail_;
              this->pending_head_ = 0;
              this->pending_tail_ = 0;
              break; // switch
            }

        case ACE_Dynamic_Message_Strategy::LATE:
          // Make sure the head of the late queue is set (there may
          // not have been any late messages previously, or they may
          // have all become beyond late).
          if (this->late_head_ == 0)
            this->late_head_ = this->pending_head_;

          // advance through the beyond late messages in the pending queue
          do
            {
              this->pending_head_ = this->pending_head_->next ();

              if (this->pending_head_)
                current_status = message_strategy_.priority_status (*this->pending_head_,
                                                                    current_time);
              else
                break;  // do while

            }
          while (current_status == ACE_Dynamic_Message_Strategy::LATE);

          if (this->pending_head_)
            {
              if (current_status != ACE_Dynamic_Message_Strategy::PENDING)
                // if we got here, something is *seriously* wrong with the queue
                ACE_ERROR_RETURN((LM_ERROR,
                                  ACE_TEXT ("Unexpected message priority status [%d] (expected PENDING)"),
                                  (int) current_status),
                                 -1);

              // Point tail of late sublist to previous item
              this->late_tail_ = this->pending_head_->prev ();
            }
          else
            {
              // there are no pending messages left in the queue
              this->late_tail_ = this->tail_;
              this->pending_head_ = 0;
              this->pending_tail_ = 0;
            }

          break; // switch
        case ACE_Dynamic_Message_Strategy::PENDING:
          // do nothing - the pending queue is unchanged
          break; // switch
        default:
          // if we got here, something is *seriously* wrong with the queue
          ACE_ERROR_RETURN((LM_ERROR,
                            ACE_TEXT ("Unknown message priority status [%d]"),
                            (int) current_status),
                           -1);
        }
    }
  return 0;
}

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::refresh_queue ( const ACE_Time_Value current_time  )  [protected, virtual]

Refresh the queue using the strategy specific priority status function.

Definition at line 2621 of file Message_Queue_T.cpp.

{
  int result;

  result = refresh_pending_queue (current_time);

  if (result != -1)
    result = refresh_late_queue (current_time);

  return result;
}

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::remove_messages ( ACE_Message_Block *&  list_head,
ACE_Message_Block *&  list_tail,
u_int  status_flags 
) [virtual]

Detach all messages with status given in the passed flags from the queue and return them by setting passed head and tail pointers to the linked list they comprise. This method is intended primarily as a means of periodically harvesting messages that have missed their deadlines, but is available in its most general form. All messages are returned in priority order, from head to tail, as of the time this method was called.

Definition at line 2054 of file Message_Queue_T.cpp.

{
  // start with an empty list
  list_head = 0;
  list_tail = 0;

  // Get the current time
  ACE_Time_Value current_time = ACE_OS::gettimeofday ();

  // Refresh priority status boundaries in the queue.
  int result = this->refresh_queue (current_time);
  if (result < 0)
    return result;

  if (ACE_BIT_ENABLED (status_flags,
                       (u_int) ACE_Dynamic_Message_Strategy::PENDING)
      && this->pending_head_
      && this->pending_tail_)
    {
      // patch up pointers for the new tail of the queue
      if (this->pending_head_->prev ())
        {
          this->tail_ = this->pending_head_->prev ();
          this->pending_head_->prev ()->next (0);
        }
      else
        {
          // the list has become empty
          this->head_ = 0;
          this->tail_ = 0;
        }

      // point to the head and tail of the list
      list_head = this->pending_head_;
      list_tail = this->pending_tail_;

      // cut the pending messages out of the queue entirely
      this->pending_head_->prev (0);
      this->pending_head_ = 0;
      this->pending_tail_ = 0;
    }

  if (ACE_BIT_ENABLED (status_flags,
                       (u_int) ACE_Dynamic_Message_Strategy::LATE)
      && this->late_head_
      && this->late_tail_)
    {
      // Patch up pointers for the (possibly) new head and tail of the
      // queue.
      if (this->late_tail_->next ())
        this->late_tail_->next ()->prev (this->late_head_->prev ());
      else
        this->tail_ = this->late_head_->prev ();

      if (this->late_head_->prev ())
        this->late_head_->prev ()->next (this->late_tail_->next ());
      else
        this->head_ = this->late_tail_->next ();

      // put late messages behind pending messages (if any) being returned
      this->late_head_->prev (list_tail);
      if (list_tail)
        list_tail->next (this->late_head_);
      else
        list_head = this->late_head_;

      list_tail = this->late_tail_;

      this->late_tail_->next (0);
      this->late_head_ = 0;
      this->late_tail_ = 0;
    }

  if (ACE_BIT_ENABLED (status_flags,
      (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE)
      && this->beyond_late_head_
      && this->beyond_late_tail_)
    {
      // Patch up pointers for the new tail of the queue
      if (this->beyond_late_tail_->next ())
        {
          this->head_ = this->beyond_late_tail_->next ();
          this->beyond_late_tail_->next ()->prev (0);
        }
      else
        {
          // the list has become empty
          this->head_ = 0;
          this->tail_ = 0;
        }

      // Put beyond late messages at the end of the list being
      // returned.
      if (list_tail)
        {
          this->beyond_late_head_->prev (list_tail);
          list_tail->next (this->beyond_late_head_);
        }
      else
        list_head = this->beyond_late_head_;

      list_tail = this->beyond_late_tail_;

      this->beyond_late_tail_->next (0);
      this->beyond_late_head_ = 0;
      this->beyond_late_tail_ = 0;
    }

  // Decrement message and size counts for removed messages.
  ACE_Message_Block *temp1;

  for (temp1 = list_head;
       temp1 != 0;
       temp1 = temp1->next ())
    {
      --this->cur_count_;

      size_t mb_bytes = 0;
      size_t mb_length = 0;
      temp1->total_size_and_length (mb_bytes,
                                    mb_length);
      // Subtract off all of the bytes associated with this message.
      this->cur_bytes_ -= mb_bytes;
      this->cur_length_ -= mb_length;
    }

  return result;
}

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::sublist_enqueue_i ( ACE_Message_Block new_item,
const ACE_Time_Value current_time,
ACE_Message_Block *&  sublist_head,
ACE_Message_Block *&  sublist_tail,
ACE_Dynamic_Message_Strategy::Priority_Status  status 
) [protected, virtual]

Enqueue a message in priority order within a given priority status sublist.

Definition at line 2421 of file Message_Queue_T.cpp.

{
  int result = 0;
  ACE_Message_Block *current_item = 0;

  // Find message after which to enqueue new item, based on message
  // priority and priority status.
  for (current_item = sublist_tail;
       current_item;
       current_item = current_item->prev ())
    {
      if (message_strategy_.priority_status (*current_item, current_time) == status)
        {
          if (current_item->msg_priority () >= new_item->msg_priority ())
            break;
        }
      else
        {
          sublist_head = new_item;
          break;
        }
    }

  if (current_item == 0)
    {
      // If the new message has highest priority of any, put it at the
      // head of the list (and sublist).
      new_item->prev (0);
      new_item->next (this->head_);
      if (this->head_ != 0)
        this->head_->prev (new_item);
      else
        {
          this->tail_ = new_item;
          sublist_tail = new_item;
        }
      this->head_ = new_item;
      sublist_head = new_item;
    }
  else
    {
      // insert the new item into the list
      new_item->next (current_item->next ());
      new_item->prev (current_item);

      if (current_item->next ())
        current_item->next ()->prev (new_item);
      else
        this->tail_ = new_item;

      current_item->next (new_item);

      // If the new item has lowest priority of any in the sublist,
      // move the tail pointer of the sublist back to the new item
      if (current_item == sublist_tail)
        sublist_tail = new_item;
    }

  return result;
}


Member Data Documentation

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::ACE_ALLOC_HOOK_DECLARE

Declare the dynamic allocation hooks.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 823 of file Message_Queue_T.h.

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::beyond_late_head_ [protected]

Pointer to head of the beyond late messages.

Definition at line 877 of file Message_Queue_T.h.

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::beyond_late_tail_ [protected]

Pointer to tail of the beyond late messages.

Definition at line 880 of file Message_Queue_T.h.

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::late_head_ [protected]

Pointer to head of the late messages.

Definition at line 871 of file Message_Queue_T.h.

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::late_tail_ [protected]

Pointer to tail of the late messages.

Definition at line 874 of file Message_Queue_T.h.

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Strategy& ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::message_strategy_ [protected]

Pointer to a dynamic priority evaluation function.

Definition at line 883 of file Message_Queue_T.h.

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::pending_head_ [protected]

Pointer to head of the pending messages.

Definition at line 865 of file Message_Queue_T.h.

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::pending_tail_ [protected]

Pointer to tail of the pending messages.

Definition at line 868 of file Message_Queue_T.h.


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines