#include <Message_Queue_T.h>
Inheritance diagram for ACE_Dynamic_Message_Queue<>:


Public Member Functions | |
| ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy &message_strategy, size_t hwm=ACE_Message_Queue_Base::DEFAULT_HWM, size_t lwm=ACE_Message_Queue_Base::DEFAULT_LWM, ACE_Notification_Strategy *=0) | |
| virtual | ~ACE_Dynamic_Message_Queue (void) |
| Close down the message queue and release all resources. | |
| virtual int | remove_messages (ACE_Message_Block *&list_head, ACE_Message_Block *&list_tail, u_int status_flags) |
| virtual int | dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0) |
| virtual void | dump (void) const |
| Dump the state of the queue. | |
| virtual int | enqueue_tail (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0) |
| virtual int | enqueue_head (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0) |
Public Attributes | |
| ACE_ALLOC_HOOK_DECLARE | |
| Declare the dynamic allocation hooks. | |
Protected Member Functions | |
| virtual int | enqueue_i (ACE_Message_Block *new_item) |
| virtual int | sublist_enqueue_i (ACE_Message_Block *new_item, const ACE_Time_Value ¤t_time, ACE_Message_Block *&sublist_head, ACE_Message_Block *&sublist_tail, ACE_Dynamic_Message_Strategy::Priority_Status status) |
| Enqueue a message in priority order within a given priority status sublist. | |
| virtual int | dequeue_head_i (ACE_Message_Block *&first_item) |
| virtual int | refresh_queue (const ACE_Time_Value ¤t_time) |
| virtual int | refresh_pending_queue (const ACE_Time_Value ¤t_time) |
| virtual int | refresh_late_queue (const ACE_Time_Value ¤t_time) |
Protected Attributes | |
| ACE_Message_Block * | pending_head_ |
| Pointer to head of the pending messages. | |
| ACE_Message_Block * | pending_tail_ |
| Pointer to tail of the pending messages. | |
| ACE_Message_Block * | late_head_ |
| Pointer to head of the late messages. | |
| ACE_Message_Block * | late_tail_ |
| Pointer to tail of the late messages. | |
| ACE_Message_Block * | beyond_late_head_ |
| Pointer to head of the beyond late messages. | |
| ACE_Message_Block * | beyond_late_tail_ |
| Pointer to tail of the beyond late messages. | |
| ACE_Dynamic_Message_Strategy & | message_strategy_ |
| Pointer to a dynamic priority evaluation function. | |
Private Member Functions | |
| void | operator= (const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &) |
| ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &) | |
| virtual int | peek_dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0) |
| Private method to hide public base class method: just calls base class method. | |
The messages in the queue are managed so as to preserve a logical ordering with minimal overhead per enqueue and dequeue operation. For this reason, the actual order of messages in the linked list of the queue may differ from their priority order. As time passes, a message may change from pending status to late status, and eventually to beyond late status. To minimize reordering overhead under this design force, three separate boundaries are maintained within the linked list of messages. Messages are dequeued preferentially from the head of the pending portion, then the head of the late portion, and finally from the head of the beyond late portion. In this way, only the boundaries need to be maintained (which can be done efficiently, as aging messages maintain the same linked list order as they progress from one status to the next), with no reordering of the messages themselves, while providing correct priority ordered dequeueing semantics. Head and tail enqueue methods inherited from ACE_Message_Queue are made private to prevent out-of-order messages from confusing management of the various portions of the queue. Messages in the pending portion of the queue whose priority becomes late (according to the specific dynamic strategy) advance into the late portion of the queue. Messages in the late portion of the queue whose priority becomes later than can be represented advance to the beyond_late portion of the queue. These behaviors support a limited schedule overrun, with pending messages prioritized ahead of late messages, and late messages ahead of beyond late messages. These behaviors can be modified in derived classes by providing alternative definitions for the appropriate virtual methods. When filled with messages, the queue's linked list should look like: H T | | B - B - B - B - L - L - L - P - P - P - P - P | | | | | | BH BT LH LT PH PT Where the symbols are as follows: H = Head of the entire list T = Tail of the entire list B = Beyond late message BH = Beyond late messages Head BT = Beyond late messages Tail L = Late message LH = Late messages Head LT = Late messages Tail P = Pending message PH = Pending messages Head PT = Pending messages Tail Caveat: the virtual methods enqueue_tail, enqueue_head, and peek_dequeue_head have semantics for the static message queues that cannot be guaranteed for dynamic message queues. The peek_dequeue_head method just calls the base class method, while the two enqueue methods call the priority enqueue method. The order of messages in the dynamic queue is a function of message deadlines and how long they are in the queues. You can manipulate these in some cases to ensure the correct semantics, but that is not a very stable or portable approach (discouraged).
Definition at line 749 of file Message_Queue_T.h.
|
||||||||||||||||||||||||
|
Definition at line 1765 of file Message_Queue_T.cpp. References ACE_SYNCH_USE.
01769 : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns), 01770 pending_head_ (0), 01771 pending_tail_ (0), 01772 late_head_ (0), 01773 late_tail_ (0), 01774 beyond_late_head_ (0), 01775 beyond_late_tail_ (0), 01776 message_strategy_ (message_strategy) 01777 { 01778 // Note, the ACE_Dynamic_Message_Queue assumes full responsibility 01779 // for the passed ACE_Dynamic_Message_Strategy object, and deletes 01780 // it in its own dtor 01781 } |
|
||||||||||
|
Close down the message queue and release all resources.
Definition at line 1786 of file Message_Queue_T.cpp. References ACE_Dynamic_Message_Queue<>::message_strategy_.
01787 {
01788 delete &this->message_strategy_;
01789 }
|
|
||||||||||
|
|
|
||||||||||||||||
|
Dequeue and return the <ACE_Message_Block *> at the head of the queue. Returns -1 on failure, else the number of items still on the queue. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 1932 of file Message_Queue_T.cpp. References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX_T, ACE_TRACE, ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ESHUTDOWN, ACE_OS::gettimeofday(), ACE_Dynamic_Message_Queue<>::refresh_queue(), and ACE_Message_Queue< ACE_SYNCH_USE >::wait_not_empty_cond().
01934 {
01935 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
01936
01937 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01938
01939 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01940 {
01941 errno = ESHUTDOWN;
01942 return -1;
01943 }
01944
01945 int result;
01946
01947 // get the current time
01948 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01949
01950 // refresh priority status boundaries in the queue
01951 result = this->refresh_queue (current_time);
01952 if (result < 0)
01953 return result;
01954
01955 // *now* it's appropriate to wait for an enqueued item
01956 result = this->wait_not_empty_cond (ace_mon, timeout);
01957 if (result == -1)
01958 return result;
01959
01960 // call the internal dequeue method, which selects an item from the
01961 // highest priority status portion of the queue that has messages
01962 // enqueued.
01963 result = this->dequeue_head_i (first_item);
01964
01965 return result;
01966 }
|
|
||||||||||
|
Dequeue and return the <ACE_Message_Block *> at the head of the logical queue. Attempts first to dequeue from the pending portion of the queue, or if that is empty from the late portion, or if that is empty from the beyond late portion, or if that is empty just sets the passed pointer to zero and returns -1. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2213 of file Message_Queue_T.cpp. References ACE_TRACE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Message_Queue< ACE_SYNCH_USE >::signal_enqueue_waiters(), and ACE_Message_Block::total_size_and_length(). Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head().
02214 {
02215 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
02216
02217 int result = 0;
02218 int last_in_subqueue = 0;
02219
02220 // first, try to dequeue from the head of the pending list
02221 if (this->pending_head_)
02222 {
02223 first_item = this->pending_head_;
02224
02225 if (0 == this->pending_head_->prev ())
02226 this->head_ = this->pending_head_->next ();
02227 else
02228 this->pending_head_->prev ()->next (this->pending_head_->next ());
02229
02230 if (0 == this->pending_head_->next ())
02231 {
02232 this->tail_ = this->pending_head_->prev ();
02233 this->pending_head_ = 0;
02234 this->pending_tail_ = 0;
02235 }
02236 else
02237 {
02238 this->pending_head_->next ()->prev (this->pending_head_->prev ());
02239 this->pending_head_ = this->pending_head_->next ();
02240 }
02241
02242 first_item->prev (0);
02243 first_item->next (0);
02244 }
02245
02246 // Second, try to dequeue from the head of the late list
02247 else if (this->late_head_)
02248 {
02249 last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0;
02250
02251 first_item = this->late_head_;
02252
02253 if (0 == this->late_head_->prev ())
02254 this->head_ = this->late_head_->next ();
02255 else
02256 this->late_head_->prev ()->next (this->late_head_->next ());
02257
02258 if (0 == this->late_head_->next ())
02259 this->tail_ = this->late_head_->prev ();
02260 else
02261 {
02262 this->late_head_->next ()->prev (this->late_head_->prev ());
02263 this->late_head_ = this->late_head_->next ();
02264 }
02265
02266 if (last_in_subqueue)
02267 {
02268 this->late_head_ = 0;
02269 this->late_tail_ = 0;
02270 }
02271
02272 first_item->prev (0);
02273 first_item->next (0);
02274 }
02275 // finally, try to dequeue from the head of the beyond late list
02276 else if (this->beyond_late_head_)
02277 {
02278 last_in_subqueue =
02279 (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0;
02280
02281 first_item = this->beyond_late_head_;
02282 this->head_ = this->beyond_late_head_->next ();
02283
02284 if (0 == this->beyond_late_head_->next ())
02285 this->tail_ = this->beyond_late_head_->prev ();
02286 else
02287 {
02288 this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ());
02289 this->beyond_late_head_ = this->beyond_late_head_->next ();
02290 }
02291
02292 if (last_in_subqueue)
02293 {
02294 this->beyond_late_head_ = 0;
02295 this->beyond_late_tail_ = 0;
02296 }
02297
02298 first_item->prev (0);
02299 first_item->next (0);
02300 }
02301 else
02302 {
02303 // nothing to dequeue: set the pointer to zero and return an error code
02304 first_item = 0;
02305 result = -1;
02306 }
02307
02308 if (result < 0)
02309 return result;
02310
02311 size_t mb_bytes = 0;
02312 size_t mb_length = 0;
02313 first_item->total_size_and_length (mb_bytes,
02314 mb_length);
02315 // Subtract off all of the bytes associated with this message.
02316 this->cur_bytes_ -= mb_bytes;
02317 this->cur_length_ -= mb_length;
02318 --this->cur_count_;
02319
02320 // Only signal enqueueing threads if we've fallen below the low
02321 // water mark.
02322 if (this->cur_bytes_ <= this->low_water_mark_
02323 && this->signal_enqueue_waiters () == -1)
02324 return -1;
02325 else
02326 return this->cur_count_;
02327 }
|
|
||||||||||
|
Dump the state of the queue.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 1972 of file Message_Queue_T.cpp. References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_LIB_TEXT, ACE_TRACE, ACE_Dynamic_Message_Strategy::dump(), ACE_Message_Queue<>::dump(), LM_DEBUG, and ACE_Dynamic_Message_Queue<>::message_strategy_.
01973 {
01974 #if defined (ACE_HAS_DUMP)
01975 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump");
01976 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
01977
01978 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n")));
01979 this->ACE_Message_Queue<ACE_SYNCH_USE>::dump ();
01980
01981 ACE_DEBUG ((LM_DEBUG,
01982 ACE_LIB_TEXT ("pending_head_ = %u\n")
01983 ACE_LIB_TEXT ("pending_tail_ = %u\n")
01984 ACE_LIB_TEXT ("late_head_ = %u\n")
01985 ACE_LIB_TEXT ("late_tail_ = %u\n")
01986 ACE_LIB_TEXT ("beyond_late_head_ = %u\n")
01987 ACE_LIB_TEXT ("beyond_late_tail_ = %u\n"),
01988 this->pending_head_,
01989 this->pending_tail_,
01990 this->late_head_,
01991 this->late_tail_,
01992 this->beyond_late_head_,
01993 this->beyond_late_tail_));
01994
01995 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("message_strategy_ : \n")));
01996 message_strategy_.dump ();
01997
01998 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
01999 #endif /* ACE_HAS_DUMP */
02000 }
|
|
||||||||||||||||
|
Just call priority enqueue method: head enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2584 of file Message_Queue_T.cpp. References ACE_TRACE, and ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_prio().
02586 {
02587 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
02588 return this->enqueue_prio (new_item, timeout);
02589 }
|
|
||||||||||
|
Enqueue an <ACE_Message_Block *> in accordance with its priority. priority may be *dynamic* or *static* or a combination or *both* It calls the priority evaluation function passed into the Dynamic Message Queue constructor to update the priorities of all enqueued messages. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2004 of file Message_Queue_T.cpp. References ACE_TRACE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_head_i(), ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_tail_i(), ACE_OS::gettimeofday(), ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Dynamic_Message_Strategy::priority_status(), ACE_Dynamic_Message_Queue<>::refresh_queue(), ACE_Message_Queue< ACE_SYNCH_USE >::signal_dequeue_waiters(), ACE_Dynamic_Message_Queue<>::sublist_enqueue_i(), and ACE_Message_Block::total_size_and_length().
02005 {
02006 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
02007
02008 if (new_item == 0)
02009 return -1;
02010
02011 int result = 0;
02012
02013 // Get the current time.
02014 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02015
02016 // Refresh priority status boundaries in the queue.
02017
02018 result = this->refresh_queue (current_time);
02019 if (result < 0)
02020 return result;
02021
02022 // Where we enqueue depends on the message's priority status.
02023 switch (message_strategy_.priority_status (*new_item,
02024 current_time))
02025 {
02026 case ACE_Dynamic_Message_Strategy::PENDING:
02027 if (this->pending_tail_ == 0)
02028 {
02029 // Check for simple case of an empty pending queue, where
02030 // all we need to do is insert <new_item> into the tail of
02031 // the queue.
02032 pending_head_ = new_item;
02033 pending_tail_ = pending_head_;
02034 return this->enqueue_tail_i (new_item);
02035 }
02036 else
02037 {
02038 // Enqueue the new message in priority order in the pending
02039 // sublist
02040 result = sublist_enqueue_i (new_item,
02041 current_time,
02042 this->pending_head_,
02043 this->pending_tail_,
02044 ACE_Dynamic_Message_Strategy::PENDING);
02045 }
02046 break;
02047
02048 case ACE_Dynamic_Message_Strategy::LATE:
02049 if (this->late_tail_ == 0)
02050 {
02051 late_head_ = new_item;
02052 late_tail_ = late_head_;
02053
02054 if (this->pending_head_ == 0)
02055 // Check for simple case of an empty pending queue,
02056 // where all we need to do is insert <new_item> into the
02057 // tail of the queue.
02058 return this->enqueue_tail_i (new_item);
02059 else if (this->beyond_late_tail_ == 0)
02060 // Check for simple case of an empty beyond late queue, where all
02061 // we need to do is insert <new_item> into the head of the queue.
02062 return this->enqueue_head_i (new_item);
02063 else
02064 {
02065 // Otherwise, we can just splice the new message in
02066 // between the pending and beyond late portions of the
02067 // queue.
02068 this->beyond_late_tail_->next (new_item);
02069 new_item->prev (this->beyond_late_tail_);
02070 this->pending_head_->prev (new_item);
02071 new_item->next (this->pending_head_);
02072 }
02073 }
02074 else
02075 {
02076 // Enqueue the new message in priority order in the late
02077 // sublist
02078 result = sublist_enqueue_i (new_item,
02079 current_time,
02080 this->late_head_,
02081 this->late_tail_,
02082 ACE_Dynamic_Message_Strategy::LATE);
02083 }
02084 break;
02085
02086 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02087 if (this->beyond_late_tail_ == 0)
02088 {
02089 // Check for simple case of an empty beyond late queue,
02090 // where all we need to do is insert <new_item> into the
02091 // head of the queue.
02092 beyond_late_head_ = new_item;
02093 beyond_late_tail_ = beyond_late_head_;
02094 return this->enqueue_head_i (new_item);
02095 }
02096 else
02097 {
02098 // all beyond late messages have the same (zero) priority,
02099 // so just put the new one at the end of the beyond late
02100 // messages
02101 if (this->beyond_late_tail_->next ())
02102 this->beyond_late_tail_->next ()->prev (new_item);
02103 else
02104 this->tail_ = new_item;
02105
02106 new_item->next (this->beyond_late_tail_->next ());
02107 this->beyond_late_tail_->next (new_item);
02108 new_item->prev (this->beyond_late_tail_);
02109 this->beyond_late_tail_ = new_item;
02110 }
02111
02112 break;
02113
02114 // should never get here, but just in case...
02115 default:
02116 result = -1;
02117 break;
02118 }
02119
02120 if (result < 0)
02121 return result;
02122
02123 size_t mb_bytes = 0;
02124 size_t mb_length = 0;
02125 new_item->total_size_and_length (mb_bytes,
02126 mb_length);
02127 this->cur_bytes_ += mb_bytes;
02128 this->cur_length_ += mb_length;
02129 ++this->cur_count_;
02130
02131 if (this->signal_dequeue_waiters () == -1)
02132 return -1;
02133 else
02134 return this->cur_count_;
02135 }
|
|
||||||||||||||||
|
Just call priority enqueue method: tail enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2571 of file Message_Queue_T.cpp. References ACE_TRACE, and ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_prio().
02573 {
02574 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
02575 return this->enqueue_prio (new_item, timeout);
02576 }
|
|
||||||||||
|
|
|
||||||||||||||||
|
Private method to hide public base class method: just calls base class method.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2560 of file Message_Queue_T.cpp. References ACE_Message_Queue<>::peek_dequeue_head().
02562 {
02563 return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item,
02564 timeout);
02565 }
|
|
||||||||||
|
Refresh the late queue using the strategy specific priority status function. Definition at line 2475 of file Message_Queue_T.cpp. References ACE_ERROR_RETURN, ACE_LIB_TEXT, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, LM_ERROR, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Message_Block::prev(), and ACE_Dynamic_Message_Strategy::priority_status(). Referenced by ACE_Dynamic_Message_Queue<>::refresh_queue().
02476 {
02477 ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02478
02479 if (this->late_head_)
02480 {
02481 current_status = message_strategy_.priority_status (*this->late_head_,
02482 current_time);
02483 switch (current_status)
02484 {
02485 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02486
02487 // make sure the head of the beyond late queue is set
02488 // (there may not have been any beyond late messages previously)
02489 this->beyond_late_head_ = this->head_;
02490
02491 // advance through the beyond late messages in the late queue
02492 do
02493 {
02494 this->late_head_ = this->late_head_->next ();
02495
02496 if (this->late_head_)
02497 current_status = message_strategy_.priority_status (*this->late_head_,
02498 current_time);
02499 else
02500 break; // do while
02501
02502 }
02503 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02504
02505 if (this->late_head_)
02506 {
02507 // point tail of beyond late sublist to previous item
02508 this->beyond_late_tail_ = this->late_head_->prev ();
02509
02510 if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02511 {
02512 // there are no late messages left in the queue
02513 this->late_head_ = 0;
02514 this->late_tail_ = 0;
02515 }
02516 else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02517 // if we got here, something is *seriously* wrong with the queue
02518 ACE_ERROR_RETURN ((LM_ERROR,
02519 ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02520 (int) current_status),
02521 -1);
02522 }
02523 else
02524 {
02525 // there are no late messages left in the queue
02526 this->beyond_late_tail_ = this->tail_;
02527 this->late_head_ = 0;
02528 this->late_tail_ = 0;
02529 }
02530
02531 break; // switch
02532
02533 case ACE_Dynamic_Message_Strategy::LATE:
02534 // do nothing - the late queue is unchanged
02535 break; // switch
02536
02537 case ACE_Dynamic_Message_Strategy::PENDING:
02538 // if we got here, something is *seriously* wrong with the queue
02539 ACE_ERROR_RETURN ((LM_ERROR,
02540 ACE_LIB_TEXT ("Unexpected message priority status ")
02541 ACE_LIB_TEXT ("[%d] (expected LATE or BEYOND_LATE)"),
02542 (int) current_status),
02543 -1);
02544 default:
02545 // if we got here, something is *seriously* wrong with the queue
02546 ACE_ERROR_RETURN ((LM_ERROR,
02547 ACE_LIB_TEXT ("Unknown message priority status [%d]"),
02548 (int) current_status),
02549 -1);
02550 }
02551 }
02552
02553 return 0;
02554 }
|
|
||||||||||
|
Refresh the pending queue using the strategy specific priority status function. Definition at line 2352 of file Message_Queue_T.cpp. References ACE_ERROR_RETURN, ACE_LIB_TEXT, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, LM_ERROR, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), and ACE_Dynamic_Message_Strategy::priority_status(). Referenced by ACE_Dynamic_Message_Queue<>::refresh_queue().
02353 {
02354 ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02355
02356 // refresh priority status boundaries in the queue
02357 if (this->pending_head_)
02358 {
02359 current_status = message_strategy_.priority_status (*this->pending_head_,
02360 current_time);
02361 switch (current_status)
02362 {
02363 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02364 // Make sure the head of the beyond late queue is set (there
02365 // may not have been any beyond late messages previously)
02366 this->beyond_late_head_ = this->head_;
02367
02368 // Zero out the late queue pointers, and set them only if
02369 // there turn out to be late messages in the pending sublist
02370 this->late_head_ = 0;
02371 this->late_tail_ = 0;
02372
02373 // Advance through the beyond late messages in the pending queue
02374 do
02375 {
02376 this->pending_head_ = this->pending_head_->next ();
02377
02378 if (this->pending_head_)
02379 current_status = message_strategy_.priority_status (*this->pending_head_,
02380 current_time);
02381 else
02382 break; // do while
02383
02384 }
02385 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02386
02387 if (this->pending_head_)
02388 {
02389 // point tail of beyond late sublist to previous item
02390 this->beyond_late_tail_ = this->pending_head_->prev ();
02391
02392 if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02393 // there are no late messages left in the queue
02394 break; // switch
02395 else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02396 {
02397 // if we got here, something is *seriously* wrong with the queue
02398 ACE_ERROR_RETURN ((LM_ERROR,
02399 ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02400 (int) current_status),
02401 -1);
02402 }
02403 /* FALLTHRU */
02404 }
02405 else
02406 {
02407 // There are no pending or late messages left in the
02408 // queue.
02409 this->beyond_late_tail_ = this->tail_;
02410 this->pending_head_ = 0;
02411 this->pending_tail_ = 0;
02412 break; // switch
02413 }
02414
02415 case ACE_Dynamic_Message_Strategy::LATE:
02416 // Make sure the head of the late queue is set (there may
02417 // not have been any late messages previously, or they may
02418 // have all become beyond late).
02419 if (this->late_head_ == 0)
02420 this->late_head_ = this->pending_head_;
02421
02422 // advance through the beyond late messages in the pending queue
02423 do
02424 {
02425 this->pending_head_ = this->pending_head_->next ();
02426
02427 if (this->pending_head_)
02428 current_status = message_strategy_.priority_status (*this->pending_head_,
02429 current_time);
02430 else
02431 break; // do while
02432
02433 }
02434 while (current_status == ACE_Dynamic_Message_Strategy::LATE);
02435
02436 if (this->pending_head_)
02437 {
02438 if (current_status != ACE_Dynamic_Message_Strategy::PENDING)
02439 // if we got here, something is *seriously* wrong with the queue
02440 ACE_ERROR_RETURN((LM_ERROR,
02441 ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected PENDING)"),
02442 (int) current_status),
02443 -1);
02444
02445 // Point tail of late sublist to previous item
02446 this->late_tail_ = this->pending_head_->prev ();
02447 }
02448 else
02449 {
02450 // there are no pending messages left in the queue
02451 this->late_tail_ = this->tail_;
02452 this->pending_head_ = 0;
02453 this->pending_tail_ = 0;
02454 }
02455
02456 break; // switch
02457 case ACE_Dynamic_Message_Strategy::PENDING:
02458 // do nothing - the pending queue is unchanged
02459 break; // switch
02460 default:
02461 // if we got here, something is *seriously* wrong with the queue
02462 ACE_ERROR_RETURN((LM_ERROR,
02463 ACE_LIB_TEXT ("Unknown message priority status [%d]"),
02464 (int) current_status),
02465 -1);
02466 }
02467 }
02468 return 0;
02469 }
|
|
||||||||||
|
Refresh the queue using the strategy specific priority status function. Definition at line 2336 of file Message_Queue_T.cpp. References ACE_Dynamic_Message_Queue<>::refresh_late_queue(), and ACE_Dynamic_Message_Queue<>::refresh_pending_queue(). Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head(), ACE_Dynamic_Message_Queue<>::enqueue_i(), and ACE_Dynamic_Message_Queue<>::remove_messages().
02337 {
02338 int result;
02339
02340 result = refresh_pending_queue (current_time);
02341
02342 if (result != -1)
02343 result = refresh_late_queue (current_time);
02344
02345 return result;
02346 }
|
|
||||||||||||||||||||
|
Detach all messages with status given in the passed flags from the queue and return them by setting passed head and tail pointers to the linked list they comprise. This method is intended primarily as a means of periodically harvesting messages that have missed their deadlines, but is available in its most general form. All messages are returned in priority order, from head to tail, as of the time this method was called. Definition at line 1792 of file Message_Queue_T.cpp. References ACE_BIT_ENABLED, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_OS::gettimeofday(), ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Dynamic_Message_Queue<>::refresh_queue(), and ACE_Message_Block::total_size_and_length().
01795 {
01796 // start with an empty list
01797 list_head = 0;
01798 list_tail = 0;
01799
01800 // Get the current time
01801 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01802
01803 // Refresh priority status boundaries in the queue.
01804 int result = this->refresh_queue (current_time);
01805 if (result < 0)
01806 return result;
01807
01808 if (ACE_BIT_ENABLED (status_flags,
01809 (u_int) ACE_Dynamic_Message_Strategy::PENDING)
01810 && this->pending_head_
01811 && this->pending_tail_)
01812 {
01813 // patch up pointers for the new tail of the queue
01814 if (this->pending_head_->prev ())
01815 {
01816 this->tail_ = this->pending_head_->prev ();
01817 this->pending_head_->prev ()->next (0);
01818 }
01819 else
01820 {
01821 // the list has become empty
01822 this->head_ = 0;
01823 this->tail_ = 0;
01824 }
01825
01826 // point to the head and tail of the list
01827 list_head = this->pending_head_;
01828 list_tail = this->pending_tail_;
01829
01830 // cut the pending messages out of the queue entirely
01831 this->pending_head_->prev (0);
01832 this->pending_head_ = 0;
01833 this->pending_tail_ = 0;
01834 }
01835
01836 if (ACE_BIT_ENABLED (status_flags,
01837 (u_int) ACE_Dynamic_Message_Strategy::LATE)
01838 && this->late_head_
01839 && this->late_tail_)
01840 {
01841 // Patch up pointers for the (possibly) new head and tail of the
01842 // queue.
01843 if (this->late_tail_->next ())
01844 this->late_tail_->next ()->prev (this->late_head_->prev ());
01845 else
01846 this->tail_ = this->late_head_->prev ();
01847
01848 if (this->late_head_->prev ())
01849 this->late_head_->prev ()->next (this->late_tail_->next ());
01850 else
01851 this->head_ = this->late_tail_->next ();
01852
01853 // put late messages behind pending messages (if any) being returned
01854 this->late_head_->prev (list_tail);
01855 if (list_tail)
01856 list_tail->next (this->late_head_);
01857 else
01858 list_head = this->late_head_;
01859
01860 list_tail = this->late_tail_;
01861
01862 this->late_tail_->next (0);
01863 this->late_head_ = 0;
01864 this->late_tail_ = 0;
01865 }
01866
01867 if (ACE_BIT_ENABLED (status_flags,
01868 (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE)
01869 && this->beyond_late_head_
01870 && this->beyond_late_tail_)
01871 {
01872 // Patch up pointers for the new tail of the queue
01873 if (this->beyond_late_tail_->next ())
01874 {
01875 this->head_ = this->beyond_late_tail_->next ();
01876 this->beyond_late_tail_->next ()->prev (0);
01877 }
01878 else
01879 {
01880 // the list has become empty
01881 this->head_ = 0;
01882 this->tail_ = 0;
01883 }
01884
01885 // Put beyond late messages at the end of the list being
01886 // returned.
01887 if (list_tail)
01888 {
01889 this->beyond_late_head_->prev (list_tail);
01890 list_tail->next (this->beyond_late_head_);
01891 }
01892 else
01893 list_head = this->beyond_late_head_;
01894
01895 list_tail = this->beyond_late_tail_;
01896
01897 this->beyond_late_tail_->next (0);
01898 this->beyond_late_head_ = 0;
01899 this->beyond_late_tail_ = 0;
01900 }
01901
01902 // Decrement message and size counts for removed messages.
01903 ACE_Message_Block *temp1;
01904
01905 for (temp1 = list_head;
01906 temp1 != 0;
01907 temp1 = temp1->next ())
01908 {
01909 --this->cur_count_;
01910
01911 size_t mb_bytes = 0;
01912 size_t mb_length = 0;
01913 temp1->total_size_and_length (mb_bytes,
01914 mb_length);
01915 // Subtract off all of the bytes associated with this message.
01916 this->cur_bytes_ -= mb_bytes;
01917 this->cur_length_ -= mb_length;
01918 }
01919
01920 return result;
01921 }
|
|
||||||||||||||||||||||||||||
|
Enqueue a message in priority order within a given priority status sublist.
Definition at line 2144 of file Message_Queue_T.cpp. References ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::msg_priority(), ACE_Message_Block::next(), ACE_Message_Block::prev(), and ACE_Dynamic_Message_Strategy::priority_status(). Referenced by ACE_Dynamic_Message_Queue<>::enqueue_i().
02149 {
02150 int result = 0;
02151 ACE_Message_Block *current_item = 0;
02152
02153 // Find message after which to enqueue new item, based on message
02154 // priority and priority status.
02155 for (current_item = sublist_tail;
02156 current_item;
02157 current_item = current_item->prev ())
02158 {
02159 if (message_strategy_.priority_status (*current_item, current_time) == status)
02160 {
02161 if (current_item->msg_priority () >= new_item->msg_priority ())
02162 break;
02163 }
02164 else
02165 {
02166 sublist_head = new_item;
02167 break;
02168 }
02169 }
02170
02171 if (current_item == 0)
02172 {
02173 // If the new message has highest priority of any, put it at the
02174 // head of the list (and sublist).
02175 new_item->prev (0);
02176 new_item->next (this->head_);
02177 if (this->head_ != 0)
02178 this->head_->prev (new_item);
02179 else
02180 {
02181 this->tail_ = new_item;
02182 sublist_tail = new_item;
02183 }
02184 this->head_ = new_item;
02185 sublist_head = new_item;
02186 }
02187 else
02188 {
02189 // insert the new item into the list
02190 new_item->next (current_item->next ());
02191 new_item->prev (current_item);
02192
02193 if (current_item->next ())
02194 current_item->next ()->prev (new_item);
02195 else
02196 this->tail_ = new_item;
02197
02198 current_item->next (new_item);
02199
02200 // If the new item has lowest priority of any in the sublist,
02201 // move the tail pointer of the sublist back to the new item
02202 if (current_item == sublist_tail)
02203 sublist_tail = new_item;
02204 }
02205
02206 return result;
02207 }
|
|
|||||
|
Declare the dynamic allocation hooks.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 805 of file Message_Queue_T.h. |
|
|||||
|
Pointer to head of the beyond late messages.
Definition at line 859 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
|
|||||
|
Pointer to tail of the beyond late messages.
Definition at line 862 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
|
|||||
|
Pointer to head of the late messages.
Definition at line 853 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
|
|||||
|
Pointer to tail of the late messages.
Definition at line 856 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
|
|||||
|
Pointer to a dynamic priority evaluation function.
Definition at line 865 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dump(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), ACE_Dynamic_Message_Queue<>::sublist_enqueue_i(), and ACE_Dynamic_Message_Queue<>::~ACE_Dynamic_Message_Queue(). |
|
|||||
|
Pointer to head of the pending messages.
Definition at line 847 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
|
|||||
|
Pointer to tail of the pending messages.
Definition at line 850 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
1.3.6