#include <Message_Queue_T.h>
Inheritance diagram for ACE_Dynamic_Message_Queue<>:
Public Member Functions | |
ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy &message_strategy, size_t hwm=ACE_Message_Queue_Base::DEFAULT_HWM, size_t lwm=ACE_Message_Queue_Base::DEFAULT_LWM, ACE_Notification_Strategy *=0) | |
virtual | ~ACE_Dynamic_Message_Queue (void) |
Close down the message queue and release all resources. | |
virtual int | remove_messages (ACE_Message_Block *&list_head, ACE_Message_Block *&list_tail, u_int status_flags) |
virtual int | dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0) |
virtual void | dump (void) const |
Dump the state of the queue. | |
virtual int | enqueue_tail (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0) |
virtual int | enqueue_head (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0) |
Public Attributes | |
ACE_ALLOC_HOOK_DECLARE | |
Declare the dynamic allocation hooks. | |
Protected Member Functions | |
virtual int | enqueue_i (ACE_Message_Block *new_item) |
virtual int | sublist_enqueue_i (ACE_Message_Block *new_item, const ACE_Time_Value ¤t_time, ACE_Message_Block *&sublist_head, ACE_Message_Block *&sublist_tail, ACE_Dynamic_Message_Strategy::Priority_Status status) |
Enqueue a message in priority order within a given priority status sublist. | |
virtual int | dequeue_head_i (ACE_Message_Block *&first_item) |
virtual int | refresh_queue (const ACE_Time_Value ¤t_time) |
virtual int | refresh_pending_queue (const ACE_Time_Value ¤t_time) |
virtual int | refresh_late_queue (const ACE_Time_Value ¤t_time) |
Protected Attributes | |
ACE_Message_Block * | pending_head_ |
Pointer to head of the pending messages. | |
ACE_Message_Block * | pending_tail_ |
Pointer to tail of the pending messages. | |
ACE_Message_Block * | late_head_ |
Pointer to head of the late messages. | |
ACE_Message_Block * | late_tail_ |
Pointer to tail of the late messages. | |
ACE_Message_Block * | beyond_late_head_ |
Pointer to head of the beyond late messages. | |
ACE_Message_Block * | beyond_late_tail_ |
Pointer to tail of the beyond late messages. | |
ACE_Dynamic_Message_Strategy & | message_strategy_ |
Pointer to a dynamic priority evaluation function. | |
Private Member Functions | |
void | operator= (const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &) |
ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &) | |
virtual int | peek_dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0) |
Private method to hide public base class method: just calls base class method. |
The messages in the queue are managed so as to preserve a logical ordering with minimal overhead per enqueue and dequeue operation. For this reason, the actual order of messages in the linked list of the queue may differ from their priority order. As time passes, a message may change from pending status to late status, and eventually to beyond late status. To minimize reordering overhead under this design force, three separate boundaries are maintained within the linked list of messages. Messages are dequeued preferentially from the head of the pending portion, then the head of the late portion, and finally from the head of the beyond late portion. In this way, only the boundaries need to be maintained (which can be done efficiently, as aging messages maintain the same linked list order as they progress from one status to the next), with no reordering of the messages themselves, while providing correct priority ordered dequeueing semantics. Head and tail enqueue methods inherited from ACE_Message_Queue are made private to prevent out-of-order messages from confusing management of the various portions of the queue. Messages in the pending portion of the queue whose priority becomes late (according to the specific dynamic strategy) advance into the late portion of the queue. Messages in the late portion of the queue whose priority becomes later than can be represented advance to the beyond_late portion of the queue. These behaviors support a limited schedule overrun, with pending messages prioritized ahead of late messages, and late messages ahead of beyond late messages. These behaviors can be modified in derived classes by providing alternative definitions for the appropriate virtual methods. When filled with messages, the queue's linked list should look like: H T | | B - B - B - B - L - L - L - P - P - P - P - P | | | | | | BH BT LH LT PH PT Where the symbols are as follows: H = Head of the entire list T = Tail of the entire list B = Beyond late message BH = Beyond late messages Head BT = Beyond late messages Tail L = Late message LH = Late messages Head LT = Late messages Tail P = Pending message PH = Pending messages Head PT = Pending messages Tail Caveat: the virtual methods enqueue_tail, enqueue_head, and peek_dequeue_head have semantics for the static message queues that cannot be guaranteed for dynamic message queues. The peek_dequeue_head method just calls the base class method, while the two enqueue methods call the priority enqueue method. The order of messages in the dynamic queue is a function of message deadlines and how long they are in the queues. You can manipulate these in some cases to ensure the correct semantics, but that is not a very stable or portable approach (discouraged).
Definition at line 750 of file Message_Queue_T.h.
|
Definition at line 1873 of file Message_Queue_T.cpp. References ACE_SYNCH_USE.
01877 : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns), 01878 pending_head_ (0), 01879 pending_tail_ (0), 01880 late_head_ (0), 01881 late_tail_ (0), 01882 beyond_late_head_ (0), 01883 beyond_late_tail_ (0), 01884 message_strategy_ (message_strategy) 01885 { 01886 // Note, the ACE_Dynamic_Message_Queue assumes full responsibility 01887 // for the passed ACE_Dynamic_Message_Strategy object, and deletes 01888 // it in its own dtor 01889 } |
|
Close down the message queue and release all resources.
Definition at line 1894 of file Message_Queue_T.cpp. References ACE_Dynamic_Message_Queue<>::message_strategy_.
01895 { 01896 delete &this->message_strategy_; 01897 } |
|
|
|
Dequeue and return the <ACE_Message_Block *> at the head of the queue. Returns -1 on failure, else the number of items still on the queue. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2040 of file Message_Queue_T.cpp. References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX_T, ACE_TRACE, ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ESHUTDOWN, ACE_OS::gettimeofday(), ACE_Dynamic_Message_Queue<>::refresh_queue(), and ACE_Message_Queue< ACE_SYNCH_USE >::wait_not_empty_cond().
02042 { 02043 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head"); 02044 02045 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); 02046 02047 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED) 02048 { 02049 errno = ESHUTDOWN; 02050 return -1; 02051 } 02052 02053 int result; 02054 02055 // get the current time 02056 ACE_Time_Value current_time = ACE_OS::gettimeofday (); 02057 02058 // refresh priority status boundaries in the queue 02059 result = this->refresh_queue (current_time); 02060 if (result < 0) 02061 return result; 02062 02063 // *now* it's appropriate to wait for an enqueued item 02064 result = this->wait_not_empty_cond (ace_mon, timeout); 02065 if (result == -1) 02066 return result; 02067 02068 // call the internal dequeue method, which selects an item from the 02069 // highest priority status portion of the queue that has messages 02070 // enqueued. 02071 result = this->dequeue_head_i (first_item); 02072 02073 return result; 02074 } |
|
Dequeue and return the <ACE_Message_Block *> at the head of the logical queue. Attempts first to dequeue from the pending portion of the queue, or if that is empty from the late portion, or if that is empty from the beyond late portion, or if that is empty just sets the passed pointer to zero and returns -1. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2321 of file Message_Queue_T.cpp. References ACE_TRACE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Message_Queue< ACE_SYNCH_USE >::signal_enqueue_waiters(), and ACE_Message_Block::total_size_and_length(). Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head().
02322 { 02323 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i"); 02324 02325 int result = 0; 02326 int last_in_subqueue = 0; 02327 02328 // first, try to dequeue from the head of the pending list 02329 if (this->pending_head_) 02330 { 02331 first_item = this->pending_head_; 02332 02333 if (0 == this->pending_head_->prev ()) 02334 this->head_ = this->pending_head_->next (); 02335 else 02336 this->pending_head_->prev ()->next (this->pending_head_->next ()); 02337 02338 if (0 == this->pending_head_->next ()) 02339 { 02340 this->tail_ = this->pending_head_->prev (); 02341 this->pending_head_ = 0; 02342 this->pending_tail_ = 0; 02343 } 02344 else 02345 { 02346 this->pending_head_->next ()->prev (this->pending_head_->prev ()); 02347 this->pending_head_ = this->pending_head_->next (); 02348 } 02349 02350 first_item->prev (0); 02351 first_item->next (0); 02352 } 02353 02354 // Second, try to dequeue from the head of the late list 02355 else if (this->late_head_) 02356 { 02357 last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0; 02358 02359 first_item = this->late_head_; 02360 02361 if (0 == this->late_head_->prev ()) 02362 this->head_ = this->late_head_->next (); 02363 else 02364 this->late_head_->prev ()->next (this->late_head_->next ()); 02365 02366 if (0 == this->late_head_->next ()) 02367 this->tail_ = this->late_head_->prev (); 02368 else 02369 { 02370 this->late_head_->next ()->prev (this->late_head_->prev ()); 02371 this->late_head_ = this->late_head_->next (); 02372 } 02373 02374 if (last_in_subqueue) 02375 { 02376 this->late_head_ = 0; 02377 this->late_tail_ = 0; 02378 } 02379 02380 first_item->prev (0); 02381 first_item->next (0); 02382 } 02383 // finally, try to dequeue from the head of the beyond late list 02384 else if (this->beyond_late_head_) 02385 { 02386 last_in_subqueue = 02387 (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0; 02388 02389 first_item = this->beyond_late_head_; 02390 this->head_ = this->beyond_late_head_->next (); 02391 02392 if (0 == this->beyond_late_head_->next ()) 02393 this->tail_ = this->beyond_late_head_->prev (); 02394 else 02395 { 02396 this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ()); 02397 this->beyond_late_head_ = this->beyond_late_head_->next (); 02398 } 02399 02400 if (last_in_subqueue) 02401 { 02402 this->beyond_late_head_ = 0; 02403 this->beyond_late_tail_ = 0; 02404 } 02405 02406 first_item->prev (0); 02407 first_item->next (0); 02408 } 02409 else 02410 { 02411 // nothing to dequeue: set the pointer to zero and return an error code 02412 first_item = 0; 02413 result = -1; 02414 } 02415 02416 if (result < 0) 02417 return result; 02418 02419 size_t mb_bytes = 0; 02420 size_t mb_length = 0; 02421 first_item->total_size_and_length (mb_bytes, 02422 mb_length); 02423 // Subtract off all of the bytes associated with this message. 02424 this->cur_bytes_ -= mb_bytes; 02425 this->cur_length_ -= mb_length; 02426 --this->cur_count_; 02427 02428 // Only signal enqueueing threads if we've fallen below the low 02429 // water mark. 02430 if (this->cur_bytes_ <= this->low_water_mark_ 02431 && this->signal_enqueue_waiters () == -1) 02432 return -1; 02433 else 02434 return this->cur_count_; 02435 } |
|
Dump the state of the queue.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2080 of file Message_Queue_T.cpp. References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_TEXT, ACE_TRACE, ACE_Dynamic_Message_Strategy::dump(), ACE_Message_Queue<>::dump(), LM_DEBUG, and ACE_Dynamic_Message_Queue<>::message_strategy_.
02081 { 02082 #if defined (ACE_HAS_DUMP) 02083 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump"); 02084 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); 02085 02086 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n"))); 02087 this->ACE_Message_Queue<ACE_SYNCH_USE>::dump (); 02088 02089 ACE_DEBUG ((LM_DEBUG, 02090 ACE_TEXT ("pending_head_ = %u\n") 02091 ACE_TEXT ("pending_tail_ = %u\n") 02092 ACE_TEXT ("late_head_ = %u\n") 02093 ACE_TEXT ("late_tail_ = %u\n") 02094 ACE_TEXT ("beyond_late_head_ = %u\n") 02095 ACE_TEXT ("beyond_late_tail_ = %u\n"), 02096 this->pending_head_, 02097 this->pending_tail_, 02098 this->late_head_, 02099 this->late_tail_, 02100 this->beyond_late_head_, 02101 this->beyond_late_tail_)); 02102 02103 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("message_strategy_ : \n"))); 02104 message_strategy_.dump (); 02105 02106 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); 02107 #endif /* ACE_HAS_DUMP */ 02108 } |
|
Just call priority enqueue method: head enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2692 of file Message_Queue_T.cpp. References ACE_TRACE, and ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_prio().
02694 { 02695 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head"); 02696 return this->enqueue_prio (new_item, timeout); 02697 } |
|
Enqueue an <ACE_Message_Block *> in accordance with its priority. priority may be *dynamic* or *static* or a combination or *both* It calls the priority evaluation function passed into the Dynamic Message Queue constructor to update the priorities of all enqueued messages. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2112 of file Message_Queue_T.cpp. References ACE_TRACE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_head_i(), ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_tail_i(), ACE_OS::gettimeofday(), ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Dynamic_Message_Strategy::priority_status(), ACE_Dynamic_Message_Queue<>::refresh_queue(), ACE_Message_Queue< ACE_SYNCH_USE >::signal_dequeue_waiters(), ACE_Dynamic_Message_Queue<>::sublist_enqueue_i(), and ACE_Message_Block::total_size_and_length().
02113 { 02114 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i"); 02115 02116 if (new_item == 0) 02117 return -1; 02118 02119 int result = 0; 02120 02121 // Get the current time. 02122 ACE_Time_Value current_time = ACE_OS::gettimeofday (); 02123 02124 // Refresh priority status boundaries in the queue. 02125 02126 result = this->refresh_queue (current_time); 02127 if (result < 0) 02128 return result; 02129 02130 // Where we enqueue depends on the message's priority status. 02131 switch (message_strategy_.priority_status (*new_item, 02132 current_time)) 02133 { 02134 case ACE_Dynamic_Message_Strategy::PENDING: 02135 if (this->pending_tail_ == 0) 02136 { 02137 // Check for simple case of an empty pending queue, where 02138 // all we need to do is insert <new_item> into the tail of 02139 // the queue. 02140 pending_head_ = new_item; 02141 pending_tail_ = pending_head_; 02142 return this->enqueue_tail_i (new_item); 02143 } 02144 else 02145 { 02146 // Enqueue the new message in priority order in the pending 02147 // sublist 02148 result = sublist_enqueue_i (new_item, 02149 current_time, 02150 this->pending_head_, 02151 this->pending_tail_, 02152 ACE_Dynamic_Message_Strategy::PENDING); 02153 } 02154 break; 02155 02156 case ACE_Dynamic_Message_Strategy::LATE: 02157 if (this->late_tail_ == 0) 02158 { 02159 late_head_ = new_item; 02160 late_tail_ = late_head_; 02161 02162 if (this->pending_head_ == 0) 02163 // Check for simple case of an empty pending queue, 02164 // where all we need to do is insert <new_item> into the 02165 // tail of the queue. 02166 return this->enqueue_tail_i (new_item); 02167 else if (this->beyond_late_tail_ == 0) 02168 // Check for simple case of an empty beyond late queue, where all 02169 // we need to do is insert <new_item> into the head of the queue. 02170 return this->enqueue_head_i (new_item); 02171 else 02172 { 02173 // Otherwise, we can just splice the new message in 02174 // between the pending and beyond late portions of the 02175 // queue. 02176 this->beyond_late_tail_->next (new_item); 02177 new_item->prev (this->beyond_late_tail_); 02178 this->pending_head_->prev (new_item); 02179 new_item->next (this->pending_head_); 02180 } 02181 } 02182 else 02183 { 02184 // Enqueue the new message in priority order in the late 02185 // sublist 02186 result = sublist_enqueue_i (new_item, 02187 current_time, 02188 this->late_head_, 02189 this->late_tail_, 02190 ACE_Dynamic_Message_Strategy::LATE); 02191 } 02192 break; 02193 02194 case ACE_Dynamic_Message_Strategy::BEYOND_LATE: 02195 if (this->beyond_late_tail_ == 0) 02196 { 02197 // Check for simple case of an empty beyond late queue, 02198 // where all we need to do is insert <new_item> into the 02199 // head of the queue. 02200 beyond_late_head_ = new_item; 02201 beyond_late_tail_ = beyond_late_head_; 02202 return this->enqueue_head_i (new_item); 02203 } 02204 else 02205 { 02206 // all beyond late messages have the same (zero) priority, 02207 // so just put the new one at the end of the beyond late 02208 // messages 02209 if (this->beyond_late_tail_->next ()) 02210 this->beyond_late_tail_->next ()->prev (new_item); 02211 else 02212 this->tail_ = new_item; 02213 02214 new_item->next (this->beyond_late_tail_->next ()); 02215 this->beyond_late_tail_->next (new_item); 02216 new_item->prev (this->beyond_late_tail_); 02217 this->beyond_late_tail_ = new_item; 02218 } 02219 02220 break; 02221 02222 // should never get here, but just in case... 02223 default: 02224 result = -1; 02225 break; 02226 } 02227 02228 if (result < 0) 02229 return result; 02230 02231 size_t mb_bytes = 0; 02232 size_t mb_length = 0; 02233 new_item->total_size_and_length (mb_bytes, 02234 mb_length); 02235 this->cur_bytes_ += mb_bytes; 02236 this->cur_length_ += mb_length; 02237 ++this->cur_count_; 02238 02239 if (this->signal_dequeue_waiters () == -1) 02240 return -1; 02241 else 02242 return this->cur_count_; 02243 } |
|
Just call priority enqueue method: tail enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2679 of file Message_Queue_T.cpp. References ACE_TRACE, and ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_prio().
02681 { 02682 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail"); 02683 return this->enqueue_prio (new_item, timeout); 02684 } |
|
|
|
Private method to hide public base class method: just calls base class method.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2668 of file Message_Queue_T.cpp. References ACE_Message_Queue<>::peek_dequeue_head().
02670 { 02671 return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item, 02672 timeout); 02673 } |
|
Refresh the late queue using the strategy specific priority status function. Definition at line 2583 of file Message_Queue_T.cpp. References ACE_ERROR_RETURN, ACE_TEXT, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, LM_ERROR, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Message_Block::prev(), and ACE_Dynamic_Message_Strategy::priority_status(). Referenced by ACE_Dynamic_Message_Queue<>::refresh_queue().
02584 { 02585 ACE_Dynamic_Message_Strategy::Priority_Status current_status; 02586 02587 if (this->late_head_) 02588 { 02589 current_status = message_strategy_.priority_status (*this->late_head_, 02590 current_time); 02591 switch (current_status) 02592 { 02593 case ACE_Dynamic_Message_Strategy::BEYOND_LATE: 02594 02595 // make sure the head of the beyond late queue is set 02596 // (there may not have been any beyond late messages previously) 02597 this->beyond_late_head_ = this->head_; 02598 02599 // advance through the beyond late messages in the late queue 02600 do 02601 { 02602 this->late_head_ = this->late_head_->next (); 02603 02604 if (this->late_head_) 02605 current_status = message_strategy_.priority_status (*this->late_head_, 02606 current_time); 02607 else 02608 break; // do while 02609 02610 } 02611 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE); 02612 02613 if (this->late_head_) 02614 { 02615 // point tail of beyond late sublist to previous item 02616 this->beyond_late_tail_ = this->late_head_->prev (); 02617 02618 if (current_status == ACE_Dynamic_Message_Strategy::PENDING) 02619 { 02620 // there are no late messages left in the queue 02621 this->late_head_ = 0; 02622 this->late_tail_ = 0; 02623 } 02624 else if (current_status != ACE_Dynamic_Message_Strategy::LATE) 02625 // if we got here, something is *seriously* wrong with the queue 02626 ACE_ERROR_RETURN ((LM_ERROR, 02627 ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"), 02628 (int) current_status), 02629 -1); 02630 } 02631 else 02632 { 02633 // there are no late messages left in the queue 02634 this->beyond_late_tail_ = this->tail_; 02635 this->late_head_ = 0; 02636 this->late_tail_ = 0; 02637 } 02638 02639 break; // switch 02640 02641 case ACE_Dynamic_Message_Strategy::LATE: 02642 // do nothing - the late queue is unchanged 02643 break; // switch 02644 02645 case ACE_Dynamic_Message_Strategy::PENDING: 02646 // if we got here, something is *seriously* wrong with the queue 02647 ACE_ERROR_RETURN ((LM_ERROR, 02648 ACE_TEXT ("Unexpected message priority status ") 02649 ACE_TEXT ("[%d] (expected LATE or BEYOND_LATE)"), 02650 (int) current_status), 02651 -1); 02652 default: 02653 // if we got here, something is *seriously* wrong with the queue 02654 ACE_ERROR_RETURN ((LM_ERROR, 02655 ACE_TEXT ("Unknown message priority status [%d]"), 02656 (int) current_status), 02657 -1); 02658 } 02659 } 02660 02661 return 0; 02662 } |
|
Refresh the pending queue using the strategy specific priority status function. Definition at line 2460 of file Message_Queue_T.cpp. References ACE_ERROR_RETURN, ACE_TEXT, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, LM_ERROR, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), and ACE_Dynamic_Message_Strategy::priority_status(). Referenced by ACE_Dynamic_Message_Queue<>::refresh_queue().
02461 { 02462 ACE_Dynamic_Message_Strategy::Priority_Status current_status; 02463 02464 // refresh priority status boundaries in the queue 02465 if (this->pending_head_) 02466 { 02467 current_status = message_strategy_.priority_status (*this->pending_head_, 02468 current_time); 02469 switch (current_status) 02470 { 02471 case ACE_Dynamic_Message_Strategy::BEYOND_LATE: 02472 // Make sure the head of the beyond late queue is set (there 02473 // may not have been any beyond late messages previously) 02474 this->beyond_late_head_ = this->head_; 02475 02476 // Zero out the late queue pointers, and set them only if 02477 // there turn out to be late messages in the pending sublist 02478 this->late_head_ = 0; 02479 this->late_tail_ = 0; 02480 02481 // Advance through the beyond late messages in the pending queue 02482 do 02483 { 02484 this->pending_head_ = this->pending_head_->next (); 02485 02486 if (this->pending_head_) 02487 current_status = message_strategy_.priority_status (*this->pending_head_, 02488 current_time); 02489 else 02490 break; // do while 02491 02492 } 02493 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE); 02494 02495 if (this->pending_head_) 02496 { 02497 // point tail of beyond late sublist to previous item 02498 this->beyond_late_tail_ = this->pending_head_->prev (); 02499 02500 if (current_status == ACE_Dynamic_Message_Strategy::PENDING) 02501 // there are no late messages left in the queue 02502 break; // switch 02503 else if (current_status != ACE_Dynamic_Message_Strategy::LATE) 02504 { 02505 // if we got here, something is *seriously* wrong with the queue 02506 ACE_ERROR_RETURN ((LM_ERROR, 02507 ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"), 02508 (int) current_status), 02509 -1); 02510 } 02511 /* FALLTHRU */ 02512 } 02513 else 02514 { 02515 // There are no pending or late messages left in the 02516 // queue. 02517 this->beyond_late_tail_ = this->tail_; 02518 this->pending_head_ = 0; 02519 this->pending_tail_ = 0; 02520 break; // switch 02521 } 02522 02523 case ACE_Dynamic_Message_Strategy::LATE: 02524 // Make sure the head of the late queue is set (there may 02525 // not have been any late messages previously, or they may 02526 // have all become beyond late). 02527 if (this->late_head_ == 0) 02528 this->late_head_ = this->pending_head_; 02529 02530 // advance through the beyond late messages in the pending queue 02531 do 02532 { 02533 this->pending_head_ = this->pending_head_->next (); 02534 02535 if (this->pending_head_) 02536 current_status = message_strategy_.priority_status (*this->pending_head_, 02537 current_time); 02538 else 02539 break; // do while 02540 02541 } 02542 while (current_status == ACE_Dynamic_Message_Strategy::LATE); 02543 02544 if (this->pending_head_) 02545 { 02546 if (current_status != ACE_Dynamic_Message_Strategy::PENDING) 02547 // if we got here, something is *seriously* wrong with the queue 02548 ACE_ERROR_RETURN((LM_ERROR, 02549 ACE_TEXT ("Unexpected message priority status [%d] (expected PENDING)"), 02550 (int) current_status), 02551 -1); 02552 02553 // Point tail of late sublist to previous item 02554 this->late_tail_ = this->pending_head_->prev (); 02555 } 02556 else 02557 { 02558 // there are no pending messages left in the queue 02559 this->late_tail_ = this->tail_; 02560 this->pending_head_ = 0; 02561 this->pending_tail_ = 0; 02562 } 02563 02564 break; // switch 02565 case ACE_Dynamic_Message_Strategy::PENDING: 02566 // do nothing - the pending queue is unchanged 02567 break; // switch 02568 default: 02569 // if we got here, something is *seriously* wrong with the queue 02570 ACE_ERROR_RETURN((LM_ERROR, 02571 ACE_TEXT ("Unknown message priority status [%d]"), 02572 (int) current_status), 02573 -1); 02574 } 02575 } 02576 return 0; 02577 } |
|
Refresh the queue using the strategy specific priority status function. Definition at line 2444 of file Message_Queue_T.cpp. References ACE_Dynamic_Message_Queue<>::refresh_late_queue(), and ACE_Dynamic_Message_Queue<>::refresh_pending_queue(). Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head(), ACE_Dynamic_Message_Queue<>::enqueue_i(), and ACE_Dynamic_Message_Queue<>::remove_messages().
02445 { 02446 int result; 02447 02448 result = refresh_pending_queue (current_time); 02449 02450 if (result != -1) 02451 result = refresh_late_queue (current_time); 02452 02453 return result; 02454 } |
|
Detach all messages with status given in the passed flags from the queue and return them by setting passed head and tail pointers to the linked list they comprise. This method is intended primarily as a means of periodically harvesting messages that have missed their deadlines, but is available in its most general form. All messages are returned in priority order, from head to tail, as of the time this method was called. Definition at line 1900 of file Message_Queue_T.cpp. References ACE_BIT_ENABLED, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_OS::gettimeofday(), ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Dynamic_Message_Queue<>::refresh_queue(), and ACE_Message_Block::total_size_and_length().
01903 { 01904 // start with an empty list 01905 list_head = 0; 01906 list_tail = 0; 01907 01908 // Get the current time 01909 ACE_Time_Value current_time = ACE_OS::gettimeofday (); 01910 01911 // Refresh priority status boundaries in the queue. 01912 int result = this->refresh_queue (current_time); 01913 if (result < 0) 01914 return result; 01915 01916 if (ACE_BIT_ENABLED (status_flags, 01917 (u_int) ACE_Dynamic_Message_Strategy::PENDING) 01918 && this->pending_head_ 01919 && this->pending_tail_) 01920 { 01921 // patch up pointers for the new tail of the queue 01922 if (this->pending_head_->prev ()) 01923 { 01924 this->tail_ = this->pending_head_->prev (); 01925 this->pending_head_->prev ()->next (0); 01926 } 01927 else 01928 { 01929 // the list has become empty 01930 this->head_ = 0; 01931 this->tail_ = 0; 01932 } 01933 01934 // point to the head and tail of the list 01935 list_head = this->pending_head_; 01936 list_tail = this->pending_tail_; 01937 01938 // cut the pending messages out of the queue entirely 01939 this->pending_head_->prev (0); 01940 this->pending_head_ = 0; 01941 this->pending_tail_ = 0; 01942 } 01943 01944 if (ACE_BIT_ENABLED (status_flags, 01945 (u_int) ACE_Dynamic_Message_Strategy::LATE) 01946 && this->late_head_ 01947 && this->late_tail_) 01948 { 01949 // Patch up pointers for the (possibly) new head and tail of the 01950 // queue. 01951 if (this->late_tail_->next ()) 01952 this->late_tail_->next ()->prev (this->late_head_->prev ()); 01953 else 01954 this->tail_ = this->late_head_->prev (); 01955 01956 if (this->late_head_->prev ()) 01957 this->late_head_->prev ()->next (this->late_tail_->next ()); 01958 else 01959 this->head_ = this->late_tail_->next (); 01960 01961 // put late messages behind pending messages (if any) being returned 01962 this->late_head_->prev (list_tail); 01963 if (list_tail) 01964 list_tail->next (this->late_head_); 01965 else 01966 list_head = this->late_head_; 01967 01968 list_tail = this->late_tail_; 01969 01970 this->late_tail_->next (0); 01971 this->late_head_ = 0; 01972 this->late_tail_ = 0; 01973 } 01974 01975 if (ACE_BIT_ENABLED (status_flags, 01976 (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE) 01977 && this->beyond_late_head_ 01978 && this->beyond_late_tail_) 01979 { 01980 // Patch up pointers for the new tail of the queue 01981 if (this->beyond_late_tail_->next ()) 01982 { 01983 this->head_ = this->beyond_late_tail_->next (); 01984 this->beyond_late_tail_->next ()->prev (0); 01985 } 01986 else 01987 { 01988 // the list has become empty 01989 this->head_ = 0; 01990 this->tail_ = 0; 01991 } 01992 01993 // Put beyond late messages at the end of the list being 01994 // returned. 01995 if (list_tail) 01996 { 01997 this->beyond_late_head_->prev (list_tail); 01998 list_tail->next (this->beyond_late_head_); 01999 } 02000 else 02001 list_head = this->beyond_late_head_; 02002 02003 list_tail = this->beyond_late_tail_; 02004 02005 this->beyond_late_tail_->next (0); 02006 this->beyond_late_head_ = 0; 02007 this->beyond_late_tail_ = 0; 02008 } 02009 02010 // Decrement message and size counts for removed messages. 02011 ACE_Message_Block *temp1; 02012 02013 for (temp1 = list_head; 02014 temp1 != 0; 02015 temp1 = temp1->next ()) 02016 { 02017 --this->cur_count_; 02018 02019 size_t mb_bytes = 0; 02020 size_t mb_length = 0; 02021 temp1->total_size_and_length (mb_bytes, 02022 mb_length); 02023 // Subtract off all of the bytes associated with this message. 02024 this->cur_bytes_ -= mb_bytes; 02025 this->cur_length_ -= mb_length; 02026 } 02027 02028 return result; 02029 } |
|
Enqueue a message in priority order within a given priority status sublist.
Definition at line 2252 of file Message_Queue_T.cpp. References ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::msg_priority(), ACE_Message_Block::next(), ACE_Message_Block::prev(), and ACE_Dynamic_Message_Strategy::priority_status(). Referenced by ACE_Dynamic_Message_Queue<>::enqueue_i().
02257 { 02258 int result = 0; 02259 ACE_Message_Block *current_item = 0; 02260 02261 // Find message after which to enqueue new item, based on message 02262 // priority and priority status. 02263 for (current_item = sublist_tail; 02264 current_item; 02265 current_item = current_item->prev ()) 02266 { 02267 if (message_strategy_.priority_status (*current_item, current_time) == status) 02268 { 02269 if (current_item->msg_priority () >= new_item->msg_priority ()) 02270 break; 02271 } 02272 else 02273 { 02274 sublist_head = new_item; 02275 break; 02276 } 02277 } 02278 02279 if (current_item == 0) 02280 { 02281 // If the new message has highest priority of any, put it at the 02282 // head of the list (and sublist). 02283 new_item->prev (0); 02284 new_item->next (this->head_); 02285 if (this->head_ != 0) 02286 this->head_->prev (new_item); 02287 else 02288 { 02289 this->tail_ = new_item; 02290 sublist_tail = new_item; 02291 } 02292 this->head_ = new_item; 02293 sublist_head = new_item; 02294 } 02295 else 02296 { 02297 // insert the new item into the list 02298 new_item->next (current_item->next ()); 02299 new_item->prev (current_item); 02300 02301 if (current_item->next ()) 02302 current_item->next ()->prev (new_item); 02303 else 02304 this->tail_ = new_item; 02305 02306 current_item->next (new_item); 02307 02308 // If the new item has lowest priority of any in the sublist, 02309 // move the tail pointer of the sublist back to the new item 02310 if (current_item == sublist_tail) 02311 sublist_tail = new_item; 02312 } 02313 02314 return result; 02315 } |
|
Declare the dynamic allocation hooks.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 806 of file Message_Queue_T.h. |
|
Pointer to head of the beyond late messages.
Definition at line 860 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
|
Pointer to tail of the beyond late messages.
Definition at line 863 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
|
Pointer to head of the late messages.
Definition at line 854 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
|
Pointer to tail of the late messages.
Definition at line 857 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
|
Pointer to a dynamic priority evaluation function.
Definition at line 866 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dump(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), ACE_Dynamic_Message_Queue<>::sublist_enqueue_i(), and ACE_Dynamic_Message_Queue<>::~ACE_Dynamic_Message_Queue(). |
|
Pointer to head of the pending messages.
Definition at line 848 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
|
Pointer to tail of the pending messages.
Definition at line 851 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |