A derived class which adapts the ACE_Message_Queue class in order to maintain dynamic priorities for enqueued <ACE_Message_Blocks> and manage the queue order according to these dynamic priorities. More...
#include <Message_Queue_T.h>
Public Member Functions | |
ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy &message_strategy, size_t hwm=ACE_Message_Queue_Base::DEFAULT_HWM, size_t lwm=ACE_Message_Queue_Base::DEFAULT_LWM, ACE_Notification_Strategy *=0) | |
virtual | ~ACE_Dynamic_Message_Queue (void) |
Close down the message queue and release all resources. | |
virtual int | remove_messages (ACE_Message_Block *&list_head, ACE_Message_Block *&list_tail, u_int status_flags) |
virtual int | dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0) |
virtual void | dump (void) const |
Dump the state of the queue. | |
virtual int | enqueue_tail (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0) |
virtual int | enqueue_head (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0) |
Public Attributes | |
ACE_ALLOC_HOOK_DECLARE | |
Declare the dynamic allocation hooks. | |
Protected Member Functions | |
virtual int | enqueue_i (ACE_Message_Block *new_item) |
virtual int | sublist_enqueue_i (ACE_Message_Block *new_item, const ACE_Time_Value ¤t_time, ACE_Message_Block *&sublist_head, ACE_Message_Block *&sublist_tail, ACE_Dynamic_Message_Strategy::Priority_Status status) |
Enqueue a message in priority order within a given priority status sublist. | |
virtual int | dequeue_head_i (ACE_Message_Block *&first_item) |
virtual int | refresh_queue (const ACE_Time_Value ¤t_time) |
virtual int | refresh_pending_queue (const ACE_Time_Value ¤t_time) |
virtual int | refresh_late_queue (const ACE_Time_Value ¤t_time) |
Protected Attributes | |
ACE_Message_Block * | pending_head_ |
Pointer to head of the pending messages. | |
ACE_Message_Block * | pending_tail_ |
Pointer to tail of the pending messages. | |
ACE_Message_Block * | late_head_ |
Pointer to head of the late messages. | |
ACE_Message_Block * | late_tail_ |
Pointer to tail of the late messages. | |
ACE_Message_Block * | beyond_late_head_ |
Pointer to head of the beyond late messages. | |
ACE_Message_Block * | beyond_late_tail_ |
Pointer to tail of the beyond late messages. | |
ACE_Dynamic_Message_Strategy & | message_strategy_ |
Pointer to a dynamic priority evaluation function. | |
Private Member Functions | |
void | operator= (const ACE_Dynamic_Message_Queue< _ACE_SYNCH_MUTEX_T, _ACE_SYNCH_CONDITION_T > &) |
ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue< _ACE_SYNCH_MUTEX_T, _ACE_SYNCH_CONDITION_T > &) | |
virtual int | peek_dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0) |
Private method to hide public base class method: just calls base class method. |
A derived class which adapts the ACE_Message_Queue class in order to maintain dynamic priorities for enqueued <ACE_Message_Blocks> and manage the queue order according to these dynamic priorities.
The messages in the queue are managed so as to preserve a logical ordering with minimal overhead per enqueue and dequeue operation. For this reason, the actual order of messages in the linked list of the queue may differ from their priority order. As time passes, a message may change from pending status to late status, and eventually to beyond late status. To minimize reordering overhead under this design force, three separate boundaries are maintained within the linked list of messages. Messages are dequeued preferentially from the head of the pending portion, then the head of the late portion, and finally from the head of the beyond late portion. In this way, only the boundaries need to be maintained (which can be done efficiently, as aging messages maintain the same linked list order as they progress from one status to the next), with no reordering of the messages themselves, while providing correct priority ordered dequeueing semantics. Head and tail enqueue methods inherited from ACE_Message_Queue are made private to prevent out-of-order messages from confusing management of the various portions of the queue. Messages in the pending portion of the queue whose priority becomes late (according to the specific dynamic strategy) advance into the late portion of the queue. Messages in the late portion of the queue whose priority becomes later than can be represented advance to the beyond_late portion of the queue. These behaviors support a limited schedule overrun, with pending messages prioritized ahead of late messages, and late messages ahead of beyond late messages. These behaviors can be modified in derived classes by providing alternative definitions for the appropriate virtual methods. When filled with messages, the queue's linked list should look like: H T | | B - B - B - B - L - L - L - P - P - P - P - P | | | | | | BH BT LH LT PH PT Where the symbols are as follows: H = Head of the entire list T = Tail of the entire list B = Beyond late message BH = Beyond late messages Head BT = Beyond late messages Tail L = Late message LH = Late messages Head LT = Late messages Tail P = Pending message PH = Pending messages Head PT = Pending messages Tail Caveat: the virtual methods enqueue_tail, enqueue_head, and peek_dequeue_head have semantics for the static message queues that cannot be guaranteed for dynamic message queues. The peek_dequeue_head method just calls the base class method, while the two enqueue methods call the priority enqueue method. The order of messages in the dynamic queue is a function of message deadlines and how long they are in the queues. You can manipulate these in some cases to ensure the correct semantics, but that is not a very stable or portable approach (discouraged).
Definition at line 767 of file Message_Queue_T.h.
ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::ACE_Dynamic_Message_Queue | ( | ACE_Dynamic_Message_Strategy & | message_strategy, | |
size_t | hwm = ACE_Message_Queue_Base::DEFAULT_HWM , |
|||
size_t | lwm = ACE_Message_Queue_Base::DEFAULT_LWM , |
|||
ACE_Notification_Strategy * | ns = 0 | |||
) |
Definition at line 2027 of file Message_Queue_T.cpp.
: ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns), pending_head_ (0), pending_tail_ (0), late_head_ (0), late_tail_ (0), beyond_late_head_ (0), beyond_late_tail_ (0), message_strategy_ (message_strategy) { // Note, the ACE_Dynamic_Message_Queue assumes full responsibility // for the passed ACE_Dynamic_Message_Strategy object, and deletes // it in its own dtor }
ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::~ACE_Dynamic_Message_Queue | ( | void | ) | [virtual] |
Close down the message queue and release all resources.
Definition at line 2048 of file Message_Queue_T.cpp.
{ delete &this->message_strategy_; }
ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::ACE_Dynamic_Message_Queue | ( | const ACE_Dynamic_Message_Queue< _ACE_SYNCH_MUTEX_T, _ACE_SYNCH_CONDITION_T > & | ) | [private] |
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::dequeue_head | ( | ACE_Message_Block *& | first_item, | |
ACE_Time_Value * | timeout = 0 | |||
) | [virtual] |
Dequeue and return the <ACE_Message_Block *> at the head of the queue. Returns -1 on failure, else the number of items still on the queue.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.
Definition at line 2194 of file Message_Queue_T.cpp.
{ ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED) { errno = ESHUTDOWN; return -1; } int result; // get the current time ACE_Time_Value current_time = ACE_OS::gettimeofday (); // refresh priority status boundaries in the queue result = this->refresh_queue (current_time); if (result < 0) return result; // *now* it's appropriate to wait for an enqueued item result = this->wait_not_empty_cond (ace_mon, timeout); if (result == -1) return result; // call the internal dequeue method, which selects an item from the // highest priority status portion of the queue that has messages // enqueued. result = this->dequeue_head_i (first_item); return result; }
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::dequeue_head_i | ( | ACE_Message_Block *& | first_item | ) | [protected, virtual] |
Dequeue and return the <ACE_Message_Block *> at the head of the logical queue. Attempts first to dequeue from the pending portion of the queue, or if that is empty from the late portion, or if that is empty from the beyond late portion, or if that is empty just sets the passed pointer to zero and returns -1.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.
Definition at line 2490 of file Message_Queue_T.cpp.
{ ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i"); int result = 0; int last_in_subqueue = 0; // first, try to dequeue from the head of the pending list if (this->pending_head_) { first_item = this->pending_head_; if (0 == this->pending_head_->prev ()) this->head_ = this->pending_head_->next (); else this->pending_head_->prev ()->next (this->pending_head_->next ()); if (0 == this->pending_head_->next ()) { this->tail_ = this->pending_head_->prev (); this->pending_head_ = 0; this->pending_tail_ = 0; } else { this->pending_head_->next ()->prev (this->pending_head_->prev ()); this->pending_head_ = this->pending_head_->next (); } first_item->prev (0); first_item->next (0); } // Second, try to dequeue from the head of the late list else if (this->late_head_) { last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0; first_item = this->late_head_; if (0 == this->late_head_->prev ()) this->head_ = this->late_head_->next (); else this->late_head_->prev ()->next (this->late_head_->next ()); if (0 == this->late_head_->next ()) this->tail_ = this->late_head_->prev (); else { this->late_head_->next ()->prev (this->late_head_->prev ()); this->late_head_ = this->late_head_->next (); } if (last_in_subqueue) { this->late_head_ = 0; this->late_tail_ = 0; } first_item->prev (0); first_item->next (0); } // finally, try to dequeue from the head of the beyond late list else if (this->beyond_late_head_) { last_in_subqueue = (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0; first_item = this->beyond_late_head_; this->head_ = this->beyond_late_head_->next (); if (0 == this->beyond_late_head_->next ()) { this->tail_ = this->beyond_late_head_->prev (); } else { this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ()); this->beyond_late_head_ = this->beyond_late_head_->next (); } if (last_in_subqueue) { this->beyond_late_head_ = 0; this->beyond_late_tail_ = 0; } first_item->prev (0); first_item->next (0); } else { // nothing to dequeue: set the pointer to zero and return an error code first_item = 0; result = -1; } if (result < 0) { return result; } size_t mb_bytes = 0; size_t mb_length = 0; first_item->total_size_and_length (mb_bytes, mb_length); // Subtract off all of the bytes associated with this message. this->cur_bytes_ -= mb_bytes; this->cur_length_ -= mb_length; --this->cur_count_; // Only signal enqueueing threads if we've fallen below the low // water mark. if (this->cur_bytes_ <= this->low_water_mark_ && this->signal_enqueue_waiters () == -1) { return -1; } else { return ACE_Utils::truncate_cast<int> (this->cur_count_); } }
void ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::dump | ( | void | ) | const [virtual] |
Dump the state of the queue.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.
Definition at line 2234 of file Message_Queue_T.cpp.
{ #if defined (ACE_HAS_DUMP) ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump"); ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class):\n"))); this->ACE_Message_Queue<ACE_SYNCH_USE>::dump (); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("pending_head_ = %u\n") ACE_TEXT ("pending_tail_ = %u\n") ACE_TEXT ("late_head_ = %u\n") ACE_TEXT ("late_tail_ = %u\n") ACE_TEXT ("beyond_late_head_ = %u\n") ACE_TEXT ("beyond_late_tail_ = %u\n"), this->pending_head_, this->pending_tail_, this->late_head_, this->late_tail_, this->beyond_late_head_, this->beyond_late_tail_)); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("message_strategy_ :\n"))); message_strategy_.dump (); ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); #endif /* ACE_HAS_DUMP */ }
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::enqueue_head | ( | ACE_Message_Block * | new_item, | |
ACE_Time_Value * | timeout = 0 | |||
) | [virtual] |
Just call priority enqueue method: head enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.
Definition at line 2869 of file Message_Queue_T.cpp.
{ ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head"); return this->enqueue_prio (new_item, timeout); }
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::enqueue_i | ( | ACE_Message_Block * | new_item | ) | [protected, virtual] |
Enqueue an <ACE_Message_Block *> in accordance with its priority. priority may be *dynamic* or *static* or a combination or *both* It calls the priority evaluation function passed into the Dynamic Message Queue constructor to update the priorities of all enqueued messages.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.
Definition at line 2266 of file Message_Queue_T.cpp.
{ ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i"); if (new_item == 0) { return -1; } int result = 0; // Get the current time. ACE_Time_Value current_time = ACE_OS::gettimeofday (); // Refresh priority status boundaries in the queue. result = this->refresh_queue (current_time); if (result < 0) { return result; } // Where we enqueue depends on the message's priority status. switch (message_strategy_.priority_status (*new_item, current_time)) { case ACE_Dynamic_Message_Strategy::PENDING: if (this->pending_tail_ == 0) { // Check for simple case of an empty pending queue, where // all we need to do is insert <new_item> into the tail of // the queue. pending_head_ = new_item; pending_tail_ = pending_head_; return this->enqueue_tail_i (new_item); } else { // Enqueue the new message in priority order in the pending // sublist result = sublist_enqueue_i (new_item, current_time, this->pending_head_, this->pending_tail_, ACE_Dynamic_Message_Strategy::PENDING); } break; case ACE_Dynamic_Message_Strategy::LATE: if (this->late_tail_ == 0) { late_head_ = new_item; late_tail_ = late_head_; if (this->pending_head_ == 0) // Check for simple case of an empty pending queue, // where all we need to do is insert <new_item> into the // tail of the queue. return this->enqueue_tail_i (new_item); else if (this->beyond_late_tail_ == 0) // Check for simple case of an empty beyond late queue, where all // we need to do is insert <new_item> into the head of the queue. return this->enqueue_head_i (new_item); else { // Otherwise, we can just splice the new message in // between the pending and beyond late portions of the // queue. this->beyond_late_tail_->next (new_item); new_item->prev (this->beyond_late_tail_); this->pending_head_->prev (new_item); new_item->next (this->pending_head_); } } else { // Enqueue the new message in priority order in the late // sublist result = sublist_enqueue_i (new_item, current_time, this->late_head_, this->late_tail_, ACE_Dynamic_Message_Strategy::LATE); } break; case ACE_Dynamic_Message_Strategy::BEYOND_LATE: if (this->beyond_late_tail_ == 0) { // Check for simple case of an empty beyond late queue, // where all we need to do is insert <new_item> into the // head of the queue. beyond_late_head_ = new_item; beyond_late_tail_ = beyond_late_head_; return this->enqueue_head_i (new_item); } else { // all beyond late messages have the same (zero) priority, // so just put the new one at the end of the beyond late // messages if (this->beyond_late_tail_->next ()) { this->beyond_late_tail_->next ()->prev (new_item); } else { this->tail_ = new_item; } new_item->next (this->beyond_late_tail_->next ()); this->beyond_late_tail_->next (new_item); new_item->prev (this->beyond_late_tail_); this->beyond_late_tail_ = new_item; } break; // should never get here, but just in case... default: result = -1; break; } if (result < 0) { return result; } size_t mb_bytes = 0; size_t mb_length = 0; new_item->total_size_and_length (mb_bytes, mb_length); this->cur_bytes_ += mb_bytes; this->cur_length_ += mb_length; ++this->cur_count_; if (this->signal_dequeue_waiters () == -1) { return -1; } else { return ACE_Utils::truncate_cast<int> (this->cur_count_); } }
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::enqueue_tail | ( | ACE_Message_Block * | new_item, | |
ACE_Time_Value * | timeout = 0 | |||
) | [virtual] |
Just call priority enqueue method: tail enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.
Definition at line 2856 of file Message_Queue_T.cpp.
{ ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail"); return this->enqueue_prio (new_item, timeout); }
void ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::operator= | ( | const ACE_Dynamic_Message_Queue< _ACE_SYNCH_MUTEX_T, _ACE_SYNCH_CONDITION_T > & | ) | [private] |
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::peek_dequeue_head | ( | ACE_Message_Block *& | first_item, | |
ACE_Time_Value * | timeout = 0 | |||
) | [private, virtual] |
Private method to hide public base class method: just calls base class method.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.
Definition at line 2845 of file Message_Queue_T.cpp.
{ return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item, timeout); }
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::refresh_late_queue | ( | const ACE_Time_Value & | current_time | ) | [protected, virtual] |
Refresh the late queue using the strategy specific priority status function.
Definition at line 2760 of file Message_Queue_T.cpp.
{ ACE_Dynamic_Message_Strategy::Priority_Status current_status; if (this->late_head_) { current_status = message_strategy_.priority_status (*this->late_head_, current_time); switch (current_status) { case ACE_Dynamic_Message_Strategy::BEYOND_LATE: // make sure the head of the beyond late queue is set // (there may not have been any beyond late messages previously) this->beyond_late_head_ = this->head_; // advance through the beyond late messages in the late queue do { this->late_head_ = this->late_head_->next (); if (this->late_head_) current_status = message_strategy_.priority_status (*this->late_head_, current_time); else break; // do while } while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE); if (this->late_head_) { // point tail of beyond late sublist to previous item this->beyond_late_tail_ = this->late_head_->prev (); if (current_status == ACE_Dynamic_Message_Strategy::PENDING) { // there are no late messages left in the queue this->late_head_ = 0; this->late_tail_ = 0; } else if (current_status != ACE_Dynamic_Message_Strategy::LATE) // if we got here, something is *seriously* wrong with the queue ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"), (int) current_status), -1); } else { // there are no late messages left in the queue this->beyond_late_tail_ = this->tail_; this->late_head_ = 0; this->late_tail_ = 0; } break; // switch case ACE_Dynamic_Message_Strategy::LATE: // do nothing - the late queue is unchanged break; // switch case ACE_Dynamic_Message_Strategy::PENDING: // if we got here, something is *seriously* wrong with the queue ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Unexpected message priority status ") ACE_TEXT ("[%d] (expected LATE or BEYOND_LATE)"), (int) current_status), -1); default: // if we got here, something is *seriously* wrong with the queue ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Unknown message priority status [%d]"), (int) current_status), -1); } } return 0; }
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::refresh_pending_queue | ( | const ACE_Time_Value & | current_time | ) | [protected, virtual] |
Refresh the pending queue using the strategy specific priority status function.
Definition at line 2637 of file Message_Queue_T.cpp.
{ ACE_Dynamic_Message_Strategy::Priority_Status current_status; // refresh priority status boundaries in the queue if (this->pending_head_) { current_status = message_strategy_.priority_status (*this->pending_head_, current_time); switch (current_status) { case ACE_Dynamic_Message_Strategy::BEYOND_LATE: // Make sure the head of the beyond late queue is set (there // may not have been any beyond late messages previously) this->beyond_late_head_ = this->head_; // Zero out the late queue pointers, and set them only if // there turn out to be late messages in the pending sublist this->late_head_ = 0; this->late_tail_ = 0; // Advance through the beyond late messages in the pending queue do { this->pending_head_ = this->pending_head_->next (); if (this->pending_head_) current_status = message_strategy_.priority_status (*this->pending_head_, current_time); else break; // do while } while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE); if (this->pending_head_) { // point tail of beyond late sublist to previous item this->beyond_late_tail_ = this->pending_head_->prev (); if (current_status == ACE_Dynamic_Message_Strategy::PENDING) // there are no late messages left in the queue break; // switch else if (current_status != ACE_Dynamic_Message_Strategy::LATE) { // if we got here, something is *seriously* wrong with the queue ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"), (int) current_status), -1); } /* FALLTHRU */ } else { // There are no pending or late messages left in the // queue. this->beyond_late_tail_ = this->tail_; this->pending_head_ = 0; this->pending_tail_ = 0; break; // switch } case ACE_Dynamic_Message_Strategy::LATE: // Make sure the head of the late queue is set (there may // not have been any late messages previously, or they may // have all become beyond late). if (this->late_head_ == 0) this->late_head_ = this->pending_head_; // advance through the beyond late messages in the pending queue do { this->pending_head_ = this->pending_head_->next (); if (this->pending_head_) current_status = message_strategy_.priority_status (*this->pending_head_, current_time); else break; // do while } while (current_status == ACE_Dynamic_Message_Strategy::LATE); if (this->pending_head_) { if (current_status != ACE_Dynamic_Message_Strategy::PENDING) // if we got here, something is *seriously* wrong with the queue ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT ("Unexpected message priority status [%d] (expected PENDING)"), (int) current_status), -1); // Point tail of late sublist to previous item this->late_tail_ = this->pending_head_->prev (); } else { // there are no pending messages left in the queue this->late_tail_ = this->tail_; this->pending_head_ = 0; this->pending_tail_ = 0; } break; // switch case ACE_Dynamic_Message_Strategy::PENDING: // do nothing - the pending queue is unchanged break; // switch default: // if we got here, something is *seriously* wrong with the queue ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT ("Unknown message priority status [%d]"), (int) current_status), -1); } } return 0; }
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::refresh_queue | ( | const ACE_Time_Value & | current_time | ) | [protected, virtual] |
Refresh the queue using the strategy specific priority status function.
Definition at line 2621 of file Message_Queue_T.cpp.
{ int result; result = refresh_pending_queue (current_time); if (result != -1) result = refresh_late_queue (current_time); return result; }
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::remove_messages | ( | ACE_Message_Block *& | list_head, | |
ACE_Message_Block *& | list_tail, | |||
u_int | status_flags | |||
) | [virtual] |
Detach all messages with status given in the passed flags from the queue and return them by setting passed head and tail pointers to the linked list they comprise. This method is intended primarily as a means of periodically harvesting messages that have missed their deadlines, but is available in its most general form. All messages are returned in priority order, from head to tail, as of the time this method was called.
Definition at line 2054 of file Message_Queue_T.cpp.
{ // start with an empty list list_head = 0; list_tail = 0; // Get the current time ACE_Time_Value current_time = ACE_OS::gettimeofday (); // Refresh priority status boundaries in the queue. int result = this->refresh_queue (current_time); if (result < 0) return result; if (ACE_BIT_ENABLED (status_flags, (u_int) ACE_Dynamic_Message_Strategy::PENDING) && this->pending_head_ && this->pending_tail_) { // patch up pointers for the new tail of the queue if (this->pending_head_->prev ()) { this->tail_ = this->pending_head_->prev (); this->pending_head_->prev ()->next (0); } else { // the list has become empty this->head_ = 0; this->tail_ = 0; } // point to the head and tail of the list list_head = this->pending_head_; list_tail = this->pending_tail_; // cut the pending messages out of the queue entirely this->pending_head_->prev (0); this->pending_head_ = 0; this->pending_tail_ = 0; } if (ACE_BIT_ENABLED (status_flags, (u_int) ACE_Dynamic_Message_Strategy::LATE) && this->late_head_ && this->late_tail_) { // Patch up pointers for the (possibly) new head and tail of the // queue. if (this->late_tail_->next ()) this->late_tail_->next ()->prev (this->late_head_->prev ()); else this->tail_ = this->late_head_->prev (); if (this->late_head_->prev ()) this->late_head_->prev ()->next (this->late_tail_->next ()); else this->head_ = this->late_tail_->next (); // put late messages behind pending messages (if any) being returned this->late_head_->prev (list_tail); if (list_tail) list_tail->next (this->late_head_); else list_head = this->late_head_; list_tail = this->late_tail_; this->late_tail_->next (0); this->late_head_ = 0; this->late_tail_ = 0; } if (ACE_BIT_ENABLED (status_flags, (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE) && this->beyond_late_head_ && this->beyond_late_tail_) { // Patch up pointers for the new tail of the queue if (this->beyond_late_tail_->next ()) { this->head_ = this->beyond_late_tail_->next (); this->beyond_late_tail_->next ()->prev (0); } else { // the list has become empty this->head_ = 0; this->tail_ = 0; } // Put beyond late messages at the end of the list being // returned. if (list_tail) { this->beyond_late_head_->prev (list_tail); list_tail->next (this->beyond_late_head_); } else list_head = this->beyond_late_head_; list_tail = this->beyond_late_tail_; this->beyond_late_tail_->next (0); this->beyond_late_head_ = 0; this->beyond_late_tail_ = 0; } // Decrement message and size counts for removed messages. ACE_Message_Block *temp1; for (temp1 = list_head; temp1 != 0; temp1 = temp1->next ()) { --this->cur_count_; size_t mb_bytes = 0; size_t mb_length = 0; temp1->total_size_and_length (mb_bytes, mb_length); // Subtract off all of the bytes associated with this message. this->cur_bytes_ -= mb_bytes; this->cur_length_ -= mb_length; } return result; }
int ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::sublist_enqueue_i | ( | ACE_Message_Block * | new_item, | |
const ACE_Time_Value & | current_time, | |||
ACE_Message_Block *& | sublist_head, | |||
ACE_Message_Block *& | sublist_tail, | |||
ACE_Dynamic_Message_Strategy::Priority_Status | status | |||
) | [protected, virtual] |
Enqueue a message in priority order within a given priority status sublist.
Definition at line 2421 of file Message_Queue_T.cpp.
{ int result = 0; ACE_Message_Block *current_item = 0; // Find message after which to enqueue new item, based on message // priority and priority status. for (current_item = sublist_tail; current_item; current_item = current_item->prev ()) { if (message_strategy_.priority_status (*current_item, current_time) == status) { if (current_item->msg_priority () >= new_item->msg_priority ()) break; } else { sublist_head = new_item; break; } } if (current_item == 0) { // If the new message has highest priority of any, put it at the // head of the list (and sublist). new_item->prev (0); new_item->next (this->head_); if (this->head_ != 0) this->head_->prev (new_item); else { this->tail_ = new_item; sublist_tail = new_item; } this->head_ = new_item; sublist_head = new_item; } else { // insert the new item into the list new_item->next (current_item->next ()); new_item->prev (current_item); if (current_item->next ()) current_item->next ()->prev (new_item); else this->tail_ = new_item; current_item->next (new_item); // If the new item has lowest priority of any in the sublist, // move the tail pointer of the sublist back to the new item if (current_item == sublist_tail) sublist_tail = new_item; } return result; }
ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::ACE_ALLOC_HOOK_DECLARE |
Declare the dynamic allocation hooks.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.
Definition at line 823 of file Message_Queue_T.h.
ACE_Message_Block* ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::beyond_late_head_ [protected] |
Pointer to head of the beyond late messages.
Definition at line 877 of file Message_Queue_T.h.
ACE_Message_Block* ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::beyond_late_tail_ [protected] |
Pointer to tail of the beyond late messages.
Definition at line 880 of file Message_Queue_T.h.
ACE_Message_Block* ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::late_head_ [protected] |
Pointer to head of the late messages.
Definition at line 871 of file Message_Queue_T.h.
ACE_Message_Block* ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::late_tail_ [protected] |
Pointer to tail of the late messages.
Definition at line 874 of file Message_Queue_T.h.
ACE_Dynamic_Message_Strategy& ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::message_strategy_ [protected] |
Pointer to a dynamic priority evaluation function.
Definition at line 883 of file Message_Queue_T.h.
ACE_Message_Block* ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::pending_head_ [protected] |
Pointer to head of the pending messages.
Definition at line 865 of file Message_Queue_T.h.
ACE_Message_Block* ACE_Dynamic_Message_Queue< ACE_SYNCH_DECL >::pending_tail_ [protected] |
Pointer to tail of the pending messages.
Definition at line 868 of file Message_Queue_T.h.