#include <Message_Queue_T.h>
Inheritance diagram for ACE_Dynamic_Message_Queue<>:
Public Member Functions | |
ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy &message_strategy, size_t hwm=ACE_Message_Queue_Base::DEFAULT_HWM, size_t lwm=ACE_Message_Queue_Base::DEFAULT_LWM, ACE_Notification_Strategy *=0) | |
virtual | ~ACE_Dynamic_Message_Queue (void) |
Close down the message queue and release all resources. | |
virtual int | remove_messages (ACE_Message_Block *&list_head, ACE_Message_Block *&list_tail, u_int status_flags) |
virtual int | dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0) |
virtual void | dump (void) const |
Dump the state of the queue. | |
virtual int | enqueue_tail (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0) |
virtual int | enqueue_head (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0) |
Public Attributes | |
ACE_ALLOC_HOOK_DECLARE | |
Declare the dynamic allocation hooks. | |
Protected Member Functions | |
virtual int | enqueue_i (ACE_Message_Block *new_item) |
virtual int | sublist_enqueue_i (ACE_Message_Block *new_item, const ACE_Time_Value ¤t_time, ACE_Message_Block *&sublist_head, ACE_Message_Block *&sublist_tail, ACE_Dynamic_Message_Strategy::Priority_Status status) |
Enqueue a message in priority order within a given priority status sublist. | |
virtual int | dequeue_head_i (ACE_Message_Block *&first_item) |
virtual int | refresh_queue (const ACE_Time_Value ¤t_time) |
virtual int | refresh_pending_queue (const ACE_Time_Value ¤t_time) |
virtual int | refresh_late_queue (const ACE_Time_Value ¤t_time) |
Protected Attributes | |
ACE_Message_Block * | pending_head_ |
Pointer to head of the pending messages. | |
ACE_Message_Block * | pending_tail_ |
Pointer to tail of the pending messages. | |
ACE_Message_Block * | late_head_ |
Pointer to head of the late messages. | |
ACE_Message_Block * | late_tail_ |
Pointer to tail of the late messages. | |
ACE_Message_Block * | beyond_late_head_ |
Pointer to head of the beyond late messages. | |
ACE_Message_Block * | beyond_late_tail_ |
Pointer to tail of the beyond late messages. | |
ACE_Dynamic_Message_Strategy & | message_strategy_ |
Pointer to a dynamic priority evaluation function. | |
Private Member Functions | |
void | operator= (const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &) |
ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &) | |
virtual int | peek_dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0) |
Private method to hide public base class method: just calls base class method. |
The messages in the queue are managed so as to preserve a logical ordering with minimal overhead per enqueue and dequeue operation. For this reason, the actual order of messages in the linked list of the queue may differ from their priority order. As time passes, a message may change from pending status to late status, and eventually to beyond late status. To minimize reordering overhead under this design force, three separate boundaries are maintained within the linked list of messages. Messages are dequeued preferentially from the head of the pending portion, then the head of the late portion, and finally from the head of the beyond late portion. In this way, only the boundaries need to be maintained (which can be done efficiently, as aging messages maintain the same linked list order as they progress from one status to the next), with no reordering of the messages themselves, while providing correct priority ordered dequeueing semantics. Head and tail enqueue methods inherited from ACE_Message_Queue are made private to prevent out-of-order messages from confusing management of the various portions of the queue. Messages in the pending portion of the queue whose priority becomes late (according to the specific dynamic strategy) advance into the late portion of the queue. Messages in the late portion of the queue whose priority becomes later than can be represented advance to the beyond_late portion of the queue. These behaviors support a limited schedule overrun, with pending messages prioritized ahead of late messages, and late messages ahead of beyond late messages. These behaviors can be modified in derived classes by providing alternative definitions for the appropriate virtual methods. When filled with messages, the queue's linked list should look like: H T | | B - B - B - B - L - L - L - P - P - P - P - P | | | | | | BH BT LH LT PH PT Where the symbols are as follows: H = Head of the entire list T = Tail of the entire list B = Beyond late message BH = Beyond late messages Head BT = Beyond late messages Tail L = Late message LH = Late messages Head LT = Late messages Tail P = Pending message PH = Pending messages Head PT = Pending messages Tail Caveat: the virtual methods enqueue_tail, enqueue_head, and peek_dequeue_head have semantics for the static message queues that cannot be guaranteed for dynamic message queues. The peek_dequeue_head method just calls the base class method, while the two enqueue methods call the priority enqueue method. The order of messages in the dynamic queue is a function of message deadlines and how long they are in the queues. You can manipulate these in some cases to ensure the correct semantics, but that is not a very stable or portable approach (discouraged).
Definition at line 749 of file Message_Queue_T.h.
|
Definition at line 1765 of file Message_Queue_T.cpp. References ACE_SYNCH_USE.
01769 : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns), 01770 pending_head_ (0), 01771 pending_tail_ (0), 01772 late_head_ (0), 01773 late_tail_ (0), 01774 beyond_late_head_ (0), 01775 beyond_late_tail_ (0), 01776 message_strategy_ (message_strategy) 01777 { 01778 // Note, the ACE_Dynamic_Message_Queue assumes full responsibility 01779 // for the passed ACE_Dynamic_Message_Strategy object, and deletes 01780 // it in its own dtor 01781 } |
|
Close down the message queue and release all resources.
Definition at line 1786 of file Message_Queue_T.cpp. References ACE_Dynamic_Message_Queue<>::message_strategy_.
01787 { 01788 delete &this->message_strategy_; 01789 } |
|
|
|
Dequeue and return the <ACE_Message_Block *> at the head of the queue. Returns -1 on failure, else the number of items still on the queue. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 1932 of file Message_Queue_T.cpp. References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX_T, ACE_TRACE, ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ESHUTDOWN, ACE_OS::gettimeofday(), ACE_Dynamic_Message_Queue<>::refresh_queue(), and ACE_Message_Queue< ACE_SYNCH_USE >::wait_not_empty_cond().
01934 { 01935 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head"); 01936 01937 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); 01938 01939 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED) 01940 { 01941 errno = ESHUTDOWN; 01942 return -1; 01943 } 01944 01945 int result; 01946 01947 // get the current time 01948 ACE_Time_Value current_time = ACE_OS::gettimeofday (); 01949 01950 // refresh priority status boundaries in the queue 01951 result = this->refresh_queue (current_time); 01952 if (result < 0) 01953 return result; 01954 01955 // *now* it's appropriate to wait for an enqueued item 01956 result = this->wait_not_empty_cond (ace_mon, timeout); 01957 if (result == -1) 01958 return result; 01959 01960 // call the internal dequeue method, which selects an item from the 01961 // highest priority status portion of the queue that has messages 01962 // enqueued. 01963 result = this->dequeue_head_i (first_item); 01964 01965 return result; 01966 } |
|
Dequeue and return the <ACE_Message_Block *> at the head of the logical queue. Attempts first to dequeue from the pending portion of the queue, or if that is empty from the late portion, or if that is empty from the beyond late portion, or if that is empty just sets the passed pointer to zero and returns -1. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2213 of file Message_Queue_T.cpp. References ACE_TRACE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Message_Queue< ACE_SYNCH_USE >::signal_enqueue_waiters(), and ACE_Message_Block::total_size_and_length(). Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head().
02214 { 02215 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i"); 02216 02217 int result = 0; 02218 int last_in_subqueue = 0; 02219 02220 // first, try to dequeue from the head of the pending list 02221 if (this->pending_head_) 02222 { 02223 first_item = this->pending_head_; 02224 02225 if (0 == this->pending_head_->prev ()) 02226 this->head_ = this->pending_head_->next (); 02227 else 02228 this->pending_head_->prev ()->next (this->pending_head_->next ()); 02229 02230 if (0 == this->pending_head_->next ()) 02231 { 02232 this->tail_ = this->pending_head_->prev (); 02233 this->pending_head_ = 0; 02234 this->pending_tail_ = 0; 02235 } 02236 else 02237 { 02238 this->pending_head_->next ()->prev (this->pending_head_->prev ()); 02239 this->pending_head_ = this->pending_head_->next (); 02240 } 02241 02242 first_item->prev (0); 02243 first_item->next (0); 02244 } 02245 02246 // Second, try to dequeue from the head of the late list 02247 else if (this->late_head_) 02248 { 02249 last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0; 02250 02251 first_item = this->late_head_; 02252 02253 if (0 == this->late_head_->prev ()) 02254 this->head_ = this->late_head_->next (); 02255 else 02256 this->late_head_->prev ()->next (this->late_head_->next ()); 02257 02258 if (0 == this->late_head_->next ()) 02259 this->tail_ = this->late_head_->prev (); 02260 else 02261 { 02262 this->late_head_->next ()->prev (this->late_head_->prev ()); 02263 this->late_head_ = this->late_head_->next (); 02264 } 02265 02266 if (last_in_subqueue) 02267 { 02268 this->late_head_ = 0; 02269 this->late_tail_ = 0; 02270 } 02271 02272 first_item->prev (0); 02273 first_item->next (0); 02274 } 02275 // finally, try to dequeue from the head of the beyond late list 02276 else if (this->beyond_late_head_) 02277 { 02278 last_in_subqueue = 02279 (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0; 02280 02281 first_item = this->beyond_late_head_; 02282 this->head_ = this->beyond_late_head_->next (); 02283 02284 if (0 == this->beyond_late_head_->next ()) 02285 this->tail_ = this->beyond_late_head_->prev (); 02286 else 02287 { 02288 this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ()); 02289 this->beyond_late_head_ = this->beyond_late_head_->next (); 02290 } 02291 02292 if (last_in_subqueue) 02293 { 02294 this->beyond_late_head_ = 0; 02295 this->beyond_late_tail_ = 0; 02296 } 02297 02298 first_item->prev (0); 02299 first_item->next (0); 02300 } 02301 else 02302 { 02303 // nothing to dequeue: set the pointer to zero and return an error code 02304 first_item = 0; 02305 result = -1; 02306 } 02307 02308 if (result < 0) 02309 return result; 02310 02311 size_t mb_bytes = 0; 02312 size_t mb_length = 0; 02313 first_item->total_size_and_length (mb_bytes, 02314 mb_length); 02315 // Subtract off all of the bytes associated with this message. 02316 this->cur_bytes_ -= mb_bytes; 02317 this->cur_length_ -= mb_length; 02318 --this->cur_count_; 02319 02320 // Only signal enqueueing threads if we've fallen below the low 02321 // water mark. 02322 if (this->cur_bytes_ <= this->low_water_mark_ 02323 && this->signal_enqueue_waiters () == -1) 02324 return -1; 02325 else 02326 return this->cur_count_; 02327 } |
|
Dump the state of the queue.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 1972 of file Message_Queue_T.cpp. References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_LIB_TEXT, ACE_TRACE, ACE_Dynamic_Message_Strategy::dump(), ACE_Message_Queue<>::dump(), LM_DEBUG, and ACE_Dynamic_Message_Queue<>::message_strategy_.
01973 { 01974 #if defined (ACE_HAS_DUMP) 01975 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump"); 01976 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); 01977 01978 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n"))); 01979 this->ACE_Message_Queue<ACE_SYNCH_USE>::dump (); 01980 01981 ACE_DEBUG ((LM_DEBUG, 01982 ACE_LIB_TEXT ("pending_head_ = %u\n") 01983 ACE_LIB_TEXT ("pending_tail_ = %u\n") 01984 ACE_LIB_TEXT ("late_head_ = %u\n") 01985 ACE_LIB_TEXT ("late_tail_ = %u\n") 01986 ACE_LIB_TEXT ("beyond_late_head_ = %u\n") 01987 ACE_LIB_TEXT ("beyond_late_tail_ = %u\n"), 01988 this->pending_head_, 01989 this->pending_tail_, 01990 this->late_head_, 01991 this->late_tail_, 01992 this->beyond_late_head_, 01993 this->beyond_late_tail_)); 01994 01995 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("message_strategy_ : \n"))); 01996 message_strategy_.dump (); 01997 01998 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); 01999 #endif /* ACE_HAS_DUMP */ 02000 } |
|
Just call priority enqueue method: head enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2584 of file Message_Queue_T.cpp. References ACE_TRACE, and ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_prio().
02586 { 02587 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head"); 02588 return this->enqueue_prio (new_item, timeout); 02589 } |
|
Enqueue an <ACE_Message_Block *> in accordance with its priority. priority may be *dynamic* or *static* or a combination or *both* It calls the priority evaluation function passed into the Dynamic Message Queue constructor to update the priorities of all enqueued messages. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2004 of file Message_Queue_T.cpp. References ACE_TRACE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_head_i(), ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_tail_i(), ACE_OS::gettimeofday(), ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Dynamic_Message_Strategy::priority_status(), ACE_Dynamic_Message_Queue<>::refresh_queue(), ACE_Message_Queue< ACE_SYNCH_USE >::signal_dequeue_waiters(), ACE_Dynamic_Message_Queue<>::sublist_enqueue_i(), and ACE_Message_Block::total_size_and_length().
02005 { 02006 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i"); 02007 02008 if (new_item == 0) 02009 return -1; 02010 02011 int result = 0; 02012 02013 // Get the current time. 02014 ACE_Time_Value current_time = ACE_OS::gettimeofday (); 02015 02016 // Refresh priority status boundaries in the queue. 02017 02018 result = this->refresh_queue (current_time); 02019 if (result < 0) 02020 return result; 02021 02022 // Where we enqueue depends on the message's priority status. 02023 switch (message_strategy_.priority_status (*new_item, 02024 current_time)) 02025 { 02026 case ACE_Dynamic_Message_Strategy::PENDING: 02027 if (this->pending_tail_ == 0) 02028 { 02029 // Check for simple case of an empty pending queue, where 02030 // all we need to do is insert <new_item> into the tail of 02031 // the queue. 02032 pending_head_ = new_item; 02033 pending_tail_ = pending_head_; 02034 return this->enqueue_tail_i (new_item); 02035 } 02036 else 02037 { 02038 // Enqueue the new message in priority order in the pending 02039 // sublist 02040 result = sublist_enqueue_i (new_item, 02041 current_time, 02042 this->pending_head_, 02043 this->pending_tail_, 02044 ACE_Dynamic_Message_Strategy::PENDING); 02045 } 02046 break; 02047 02048 case ACE_Dynamic_Message_Strategy::LATE: 02049 if (this->late_tail_ == 0) 02050 { 02051 late_head_ = new_item; 02052 late_tail_ = late_head_; 02053 02054 if (this->pending_head_ == 0) 02055 // Check for simple case of an empty pending queue, 02056 // where all we need to do is insert <new_item> into the 02057 // tail of the queue. 02058 return this->enqueue_tail_i (new_item); 02059 else if (this->beyond_late_tail_ == 0) 02060 // Check for simple case of an empty beyond late queue, where all 02061 // we need to do is insert <new_item> into the head of the queue. 02062 return this->enqueue_head_i (new_item); 02063 else 02064 { 02065 // Otherwise, we can just splice the new message in 02066 // between the pending and beyond late portions of the 02067 // queue. 02068 this->beyond_late_tail_->next (new_item); 02069 new_item->prev (this->beyond_late_tail_); 02070 this->pending_head_->prev (new_item); 02071 new_item->next (this->pending_head_); 02072 } 02073 } 02074 else 02075 { 02076 // Enqueue the new message in priority order in the late 02077 // sublist 02078 result = sublist_enqueue_i (new_item, 02079 current_time, 02080 this->late_head_, 02081 this->late_tail_, 02082 ACE_Dynamic_Message_Strategy::LATE); 02083 } 02084 break; 02085 02086 case ACE_Dynamic_Message_Strategy::BEYOND_LATE: 02087 if (this->beyond_late_tail_ == 0) 02088 { 02089 // Check for simple case of an empty beyond late queue, 02090 // where all we need to do is insert <new_item> into the 02091 // head of the queue. 02092 beyond_late_head_ = new_item; 02093 beyond_late_tail_ = beyond_late_head_; 02094 return this->enqueue_head_i (new_item); 02095 } 02096 else 02097 { 02098 // all beyond late messages have the same (zero) priority, 02099 // so just put the new one at the end of the beyond late 02100 // messages 02101 if (this->beyond_late_tail_->next ()) 02102 this->beyond_late_tail_->next ()->prev (new_item); 02103 else 02104 this->tail_ = new_item; 02105 02106 new_item->next (this->beyond_late_tail_->next ()); 02107 this->beyond_late_tail_->next (new_item); 02108 new_item->prev (this->beyond_late_tail_); 02109 this->beyond_late_tail_ = new_item; 02110 } 02111 02112 break; 02113 02114 // should never get here, but just in case... 02115 default: 02116 result = -1; 02117 break; 02118 } 02119 02120 if (result < 0) 02121 return result; 02122 02123 size_t mb_bytes = 0; 02124 size_t mb_length = 0; 02125 new_item->total_size_and_length (mb_bytes, 02126 mb_length); 02127 this->cur_bytes_ += mb_bytes; 02128 this->cur_length_ += mb_length; 02129 ++this->cur_count_; 02130 02131 if (this->signal_dequeue_waiters () == -1) 02132 return -1; 02133 else 02134 return this->cur_count_; 02135 } |
|
Just call priority enqueue method: tail enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2571 of file Message_Queue_T.cpp. References ACE_TRACE, and ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_prio().
02573 { 02574 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail"); 02575 return this->enqueue_prio (new_item, timeout); 02576 } |
|
|
|
Private method to hide public base class method: just calls base class method.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2560 of file Message_Queue_T.cpp. References ACE_Message_Queue<>::peek_dequeue_head().
02562 { 02563 return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item, 02564 timeout); 02565 } |
|
Refresh the late queue using the strategy specific priority status function. Definition at line 2475 of file Message_Queue_T.cpp. References ACE_ERROR_RETURN, ACE_LIB_TEXT, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, LM_ERROR, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Message_Block::prev(), and ACE_Dynamic_Message_Strategy::priority_status(). Referenced by ACE_Dynamic_Message_Queue<>::refresh_queue().
02476 { 02477 ACE_Dynamic_Message_Strategy::Priority_Status current_status; 02478 02479 if (this->late_head_) 02480 { 02481 current_status = message_strategy_.priority_status (*this->late_head_, 02482 current_time); 02483 switch (current_status) 02484 { 02485 case ACE_Dynamic_Message_Strategy::BEYOND_LATE: 02486 02487 // make sure the head of the beyond late queue is set 02488 // (there may not have been any beyond late messages previously) 02489 this->beyond_late_head_ = this->head_; 02490 02491 // advance through the beyond late messages in the late queue 02492 do 02493 { 02494 this->late_head_ = this->late_head_->next (); 02495 02496 if (this->late_head_) 02497 current_status = message_strategy_.priority_status (*this->late_head_, 02498 current_time); 02499 else 02500 break; // do while 02501 02502 } 02503 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE); 02504 02505 if (this->late_head_) 02506 { 02507 // point tail of beyond late sublist to previous item 02508 this->beyond_late_tail_ = this->late_head_->prev (); 02509 02510 if (current_status == ACE_Dynamic_Message_Strategy::PENDING) 02511 { 02512 // there are no late messages left in the queue 02513 this->late_head_ = 0; 02514 this->late_tail_ = 0; 02515 } 02516 else if (current_status != ACE_Dynamic_Message_Strategy::LATE) 02517 // if we got here, something is *seriously* wrong with the queue 02518 ACE_ERROR_RETURN ((LM_ERROR, 02519 ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected LATE)"), 02520 (int) current_status), 02521 -1); 02522 } 02523 else 02524 { 02525 // there are no late messages left in the queue 02526 this->beyond_late_tail_ = this->tail_; 02527 this->late_head_ = 0; 02528 this->late_tail_ = 0; 02529 } 02530 02531 break; // switch 02532 02533 case ACE_Dynamic_Message_Strategy::LATE: 02534 // do nothing - the late queue is unchanged 02535 break; // switch 02536 02537 case ACE_Dynamic_Message_Strategy::PENDING: 02538 // if we got here, something is *seriously* wrong with the queue 02539 ACE_ERROR_RETURN ((LM_ERROR, 02540 ACE_LIB_TEXT ("Unexpected message priority status ") 02541 ACE_LIB_TEXT ("[%d] (expected LATE or BEYOND_LATE)"), 02542 (int) current_status), 02543 -1); 02544 default: 02545 // if we got here, something is *seriously* wrong with the queue 02546 ACE_ERROR_RETURN ((LM_ERROR, 02547 ACE_LIB_TEXT ("Unknown message priority status [%d]"), 02548 (int) current_status), 02549 -1); 02550 } 02551 } 02552 02553 return 0; 02554 } |
|
Refresh the pending queue using the strategy specific priority status function. Definition at line 2352 of file Message_Queue_T.cpp. References ACE_ERROR_RETURN, ACE_LIB_TEXT, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, LM_ERROR, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), and ACE_Dynamic_Message_Strategy::priority_status(). Referenced by ACE_Dynamic_Message_Queue<>::refresh_queue().
02353 { 02354 ACE_Dynamic_Message_Strategy::Priority_Status current_status; 02355 02356 // refresh priority status boundaries in the queue 02357 if (this->pending_head_) 02358 { 02359 current_status = message_strategy_.priority_status (*this->pending_head_, 02360 current_time); 02361 switch (current_status) 02362 { 02363 case ACE_Dynamic_Message_Strategy::BEYOND_LATE: 02364 // Make sure the head of the beyond late queue is set (there 02365 // may not have been any beyond late messages previously) 02366 this->beyond_late_head_ = this->head_; 02367 02368 // Zero out the late queue pointers, and set them only if 02369 // there turn out to be late messages in the pending sublist 02370 this->late_head_ = 0; 02371 this->late_tail_ = 0; 02372 02373 // Advance through the beyond late messages in the pending queue 02374 do 02375 { 02376 this->pending_head_ = this->pending_head_->next (); 02377 02378 if (this->pending_head_) 02379 current_status = message_strategy_.priority_status (*this->pending_head_, 02380 current_time); 02381 else 02382 break; // do while 02383 02384 } 02385 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE); 02386 02387 if (this->pending_head_) 02388 { 02389 // point tail of beyond late sublist to previous item 02390 this->beyond_late_tail_ = this->pending_head_->prev (); 02391 02392 if (current_status == ACE_Dynamic_Message_Strategy::PENDING) 02393 // there are no late messages left in the queue 02394 break; // switch 02395 else if (current_status != ACE_Dynamic_Message_Strategy::LATE) 02396 { 02397 // if we got here, something is *seriously* wrong with the queue 02398 ACE_ERROR_RETURN ((LM_ERROR, 02399 ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected LATE)"), 02400 (int) current_status), 02401 -1); 02402 } 02403 /* FALLTHRU */ 02404 } 02405 else 02406 { 02407 // There are no pending or late messages left in the 02408 // queue. 02409 this->beyond_late_tail_ = this->tail_; 02410 this->pending_head_ = 0; 02411 this->pending_tail_ = 0; 02412 break; // switch 02413 } 02414 02415 case ACE_Dynamic_Message_Strategy::LATE: 02416 // Make sure the head of the late queue is set (there may 02417 // not have been any late messages previously, or they may 02418 // have all become beyond late). 02419 if (this->late_head_ == 0) 02420 this->late_head_ = this->pending_head_; 02421 02422 // advance through the beyond late messages in the pending queue 02423 do 02424 { 02425 this->pending_head_ = this->pending_head_->next (); 02426 02427 if (this->pending_head_) 02428 current_status = message_strategy_.priority_status (*this->pending_head_, 02429 current_time); 02430 else 02431 break; // do while 02432 02433 } 02434 while (current_status == ACE_Dynamic_Message_Strategy::LATE); 02435 02436 if (this->pending_head_) 02437 { 02438 if (current_status != ACE_Dynamic_Message_Strategy::PENDING) 02439 // if we got here, something is *seriously* wrong with the queue 02440 ACE_ERROR_RETURN((LM_ERROR, 02441 ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected PENDING)"), 02442 (int) current_status), 02443 -1); 02444 02445 // Point tail of late sublist to previous item 02446 this->late_tail_ = this->pending_head_->prev (); 02447 } 02448 else 02449 { 02450 // there are no pending messages left in the queue 02451 this->late_tail_ = this->tail_; 02452 this->pending_head_ = 0; 02453 this->pending_tail_ = 0; 02454 } 02455 02456 break; // switch 02457 case ACE_Dynamic_Message_Strategy::PENDING: 02458 // do nothing - the pending queue is unchanged 02459 break; // switch 02460 default: 02461 // if we got here, something is *seriously* wrong with the queue 02462 ACE_ERROR_RETURN((LM_ERROR, 02463 ACE_LIB_TEXT ("Unknown message priority status [%d]"), 02464 (int) current_status), 02465 -1); 02466 } 02467 } 02468 return 0; 02469 } |
|
Refresh the queue using the strategy specific priority status function. Definition at line 2336 of file Message_Queue_T.cpp. References ACE_Dynamic_Message_Queue<>::refresh_late_queue(), and ACE_Dynamic_Message_Queue<>::refresh_pending_queue(). Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head(), ACE_Dynamic_Message_Queue<>::enqueue_i(), and ACE_Dynamic_Message_Queue<>::remove_messages().
02337 { 02338 int result; 02339 02340 result = refresh_pending_queue (current_time); 02341 02342 if (result != -1) 02343 result = refresh_late_queue (current_time); 02344 02345 return result; 02346 } |
|
Detach all messages with status given in the passed flags from the queue and return them by setting passed head and tail pointers to the linked list they comprise. This method is intended primarily as a means of periodically harvesting messages that have missed their deadlines, but is available in its most general form. All messages are returned in priority order, from head to tail, as of the time this method was called. Definition at line 1792 of file Message_Queue_T.cpp. References ACE_BIT_ENABLED, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_OS::gettimeofday(), ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Dynamic_Message_Queue<>::refresh_queue(), and ACE_Message_Block::total_size_and_length().
01795 { 01796 // start with an empty list 01797 list_head = 0; 01798 list_tail = 0; 01799 01800 // Get the current time 01801 ACE_Time_Value current_time = ACE_OS::gettimeofday (); 01802 01803 // Refresh priority status boundaries in the queue. 01804 int result = this->refresh_queue (current_time); 01805 if (result < 0) 01806 return result; 01807 01808 if (ACE_BIT_ENABLED (status_flags, 01809 (u_int) ACE_Dynamic_Message_Strategy::PENDING) 01810 && this->pending_head_ 01811 && this->pending_tail_) 01812 { 01813 // patch up pointers for the new tail of the queue 01814 if (this->pending_head_->prev ()) 01815 { 01816 this->tail_ = this->pending_head_->prev (); 01817 this->pending_head_->prev ()->next (0); 01818 } 01819 else 01820 { 01821 // the list has become empty 01822 this->head_ = 0; 01823 this->tail_ = 0; 01824 } 01825 01826 // point to the head and tail of the list 01827 list_head = this->pending_head_; 01828 list_tail = this->pending_tail_; 01829 01830 // cut the pending messages out of the queue entirely 01831 this->pending_head_->prev (0); 01832 this->pending_head_ = 0; 01833 this->pending_tail_ = 0; 01834 } 01835 01836 if (ACE_BIT_ENABLED (status_flags, 01837 (u_int) ACE_Dynamic_Message_Strategy::LATE) 01838 && this->late_head_ 01839 && this->late_tail_) 01840 { 01841 // Patch up pointers for the (possibly) new head and tail of the 01842 // queue. 01843 if (this->late_tail_->next ()) 01844 this->late_tail_->next ()->prev (this->late_head_->prev ()); 01845 else 01846 this->tail_ = this->late_head_->prev (); 01847 01848 if (this->late_head_->prev ()) 01849 this->late_head_->prev ()->next (this->late_tail_->next ()); 01850 else 01851 this->head_ = this->late_tail_->next (); 01852 01853 // put late messages behind pending messages (if any) being returned 01854 this->late_head_->prev (list_tail); 01855 if (list_tail) 01856 list_tail->next (this->late_head_); 01857 else 01858 list_head = this->late_head_; 01859 01860 list_tail = this->late_tail_; 01861 01862 this->late_tail_->next (0); 01863 this->late_head_ = 0; 01864 this->late_tail_ = 0; 01865 } 01866 01867 if (ACE_BIT_ENABLED (status_flags, 01868 (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE) 01869 && this->beyond_late_head_ 01870 && this->beyond_late_tail_) 01871 { 01872 // Patch up pointers for the new tail of the queue 01873 if (this->beyond_late_tail_->next ()) 01874 { 01875 this->head_ = this->beyond_late_tail_->next (); 01876 this->beyond_late_tail_->next ()->prev (0); 01877 } 01878 else 01879 { 01880 // the list has become empty 01881 this->head_ = 0; 01882 this->tail_ = 0; 01883 } 01884 01885 // Put beyond late messages at the end of the list being 01886 // returned. 01887 if (list_tail) 01888 { 01889 this->beyond_late_head_->prev (list_tail); 01890 list_tail->next (this->beyond_late_head_); 01891 } 01892 else 01893 list_head = this->beyond_late_head_; 01894 01895 list_tail = this->beyond_late_tail_; 01896 01897 this->beyond_late_tail_->next (0); 01898 this->beyond_late_head_ = 0; 01899 this->beyond_late_tail_ = 0; 01900 } 01901 01902 // Decrement message and size counts for removed messages. 01903 ACE_Message_Block *temp1; 01904 01905 for (temp1 = list_head; 01906 temp1 != 0; 01907 temp1 = temp1->next ()) 01908 { 01909 --this->cur_count_; 01910 01911 size_t mb_bytes = 0; 01912 size_t mb_length = 0; 01913 temp1->total_size_and_length (mb_bytes, 01914 mb_length); 01915 // Subtract off all of the bytes associated with this message. 01916 this->cur_bytes_ -= mb_bytes; 01917 this->cur_length_ -= mb_length; 01918 } 01919 01920 return result; 01921 } |
|
Enqueue a message in priority order within a given priority status sublist.
Definition at line 2144 of file Message_Queue_T.cpp. References ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::msg_priority(), ACE_Message_Block::next(), ACE_Message_Block::prev(), and ACE_Dynamic_Message_Strategy::priority_status(). Referenced by ACE_Dynamic_Message_Queue<>::enqueue_i().
02149 { 02150 int result = 0; 02151 ACE_Message_Block *current_item = 0; 02152 02153 // Find message after which to enqueue new item, based on message 02154 // priority and priority status. 02155 for (current_item = sublist_tail; 02156 current_item; 02157 current_item = current_item->prev ()) 02158 { 02159 if (message_strategy_.priority_status (*current_item, current_time) == status) 02160 { 02161 if (current_item->msg_priority () >= new_item->msg_priority ()) 02162 break; 02163 } 02164 else 02165 { 02166 sublist_head = new_item; 02167 break; 02168 } 02169 } 02170 02171 if (current_item == 0) 02172 { 02173 // If the new message has highest priority of any, put it at the 02174 // head of the list (and sublist). 02175 new_item->prev (0); 02176 new_item->next (this->head_); 02177 if (this->head_ != 0) 02178 this->head_->prev (new_item); 02179 else 02180 { 02181 this->tail_ = new_item; 02182 sublist_tail = new_item; 02183 } 02184 this->head_ = new_item; 02185 sublist_head = new_item; 02186 } 02187 else 02188 { 02189 // insert the new item into the list 02190 new_item->next (current_item->next ()); 02191 new_item->prev (current_item); 02192 02193 if (current_item->next ()) 02194 current_item->next ()->prev (new_item); 02195 else 02196 this->tail_ = new_item; 02197 02198 current_item->next (new_item); 02199 02200 // If the new item has lowest priority of any in the sublist, 02201 // move the tail pointer of the sublist back to the new item 02202 if (current_item == sublist_tail) 02203 sublist_tail = new_item; 02204 } 02205 02206 return result; 02207 } |
|
Declare the dynamic allocation hooks.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 805 of file Message_Queue_T.h. |
|
Pointer to head of the beyond late messages.
Definition at line 859 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
|
Pointer to tail of the beyond late messages.
Definition at line 862 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
|
Pointer to head of the late messages.
Definition at line 853 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
|
Pointer to tail of the late messages.
Definition at line 856 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
|
Pointer to a dynamic priority evaluation function.
Definition at line 865 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dump(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), ACE_Dynamic_Message_Queue<>::sublist_enqueue_i(), and ACE_Dynamic_Message_Queue<>::~ACE_Dynamic_Message_Queue(). |
|
Pointer to head of the pending messages.
Definition at line 847 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
|
Pointer to tail of the pending messages.
Definition at line 850 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |