#include <Message_Queue_T.h>
Inheritance diagram for ACE_Dynamic_Message_Queue<>:
Public Member Functions | |
ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy &message_strategy, size_t hwm=ACE_Message_Queue_Base::DEFAULT_HWM, size_t lwm=ACE_Message_Queue_Base::DEFAULT_LWM, ACE_Notification_Strategy *=0) | |
virtual | ~ACE_Dynamic_Message_Queue (void) |
Close down the message queue and release all resources. | |
virtual int | remove_messages (ACE_Message_Block *&list_head, ACE_Message_Block *&list_tail, u_int status_flags) |
virtual int | dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0) |
virtual void | dump (void) const |
Dump the state of the queue. | |
virtual int | enqueue_tail (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0) |
virtual int | enqueue_head (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0) |
Public Attributes | |
ACE_ALLOC_HOOK_DECLARE | |
Declare the dynamic allocation hooks. | |
Protected Member Functions | |
virtual int | enqueue_i (ACE_Message_Block *new_item) |
virtual int | sublist_enqueue_i (ACE_Message_Block *new_item, const ACE_Time_Value ¤t_time, ACE_Message_Block *&sublist_head, ACE_Message_Block *&sublist_tail, ACE_Dynamic_Message_Strategy::Priority_Status status) |
Enqueue a message in priority order within a given priority status sublist. | |
virtual int | dequeue_head_i (ACE_Message_Block *&first_item) |
virtual int | refresh_queue (const ACE_Time_Value ¤t_time) |
virtual int | refresh_pending_queue (const ACE_Time_Value ¤t_time) |
virtual int | refresh_late_queue (const ACE_Time_Value ¤t_time) |
Protected Attributes | |
ACE_Message_Block * | pending_head_ |
Pointer to head of the pending messages. | |
ACE_Message_Block * | pending_tail_ |
Pointer to tail of the pending messages. | |
ACE_Message_Block * | late_head_ |
Pointer to head of the late messages. | |
ACE_Message_Block * | late_tail_ |
Pointer to tail of the late messages. | |
ACE_Message_Block * | beyond_late_head_ |
Pointer to head of the beyond late messages. | |
ACE_Message_Block * | beyond_late_tail_ |
Pointer to tail of the beyond late messages. | |
ACE_Dynamic_Message_Strategy & | message_strategy_ |
Pointer to a dynamic priority evaluation function. | |
Private Member Functions | |
void | operator= (const ACE_Dynamic_Message_Queue< _ACE_SYNCH_MUTEX_T, _ACE_SYNCH_CONDITION_T > &) |
ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue< _ACE_SYNCH_MUTEX_T, _ACE_SYNCH_CONDITION_T > &) | |
virtual int | peek_dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0) |
Private method to hide public base class method: just calls base class method. |
The messages in the queue are managed so as to preserve a logical ordering with minimal overhead per enqueue and dequeue operation. For this reason, the actual order of messages in the linked list of the queue may differ from their priority order. As time passes, a message may change from pending status to late status, and eventually to beyond late status. To minimize reordering overhead under this design force, three separate boundaries are maintained within the linked list of messages. Messages are dequeued preferentially from the head of the pending portion, then the head of the late portion, and finally from the head of the beyond late portion. In this way, only the boundaries need to be maintained (which can be done efficiently, as aging messages maintain the same linked list order as they progress from one status to the next), with no reordering of the messages themselves, while providing correct priority ordered dequeueing semantics. Head and tail enqueue methods inherited from ACE_Message_Queue are made private to prevent out-of-order messages from confusing management of the various portions of the queue. Messages in the pending portion of the queue whose priority becomes late (according to the specific dynamic strategy) advance into the late portion of the queue. Messages in the late portion of the queue whose priority becomes later than can be represented advance to the beyond_late portion of the queue. These behaviors support a limited schedule overrun, with pending messages prioritized ahead of late messages, and late messages ahead of beyond late messages. These behaviors can be modified in derived classes by providing alternative definitions for the appropriate virtual methods. When filled with messages, the queue's linked list should look like: H T | | B - B - B - B - L - L - L - P - P - P - P - P | | | | | | BH BT LH LT PH PT Where the symbols are as follows: H = Head of the entire list T = Tail of the entire list B = Beyond late message BH = Beyond late messages Head BT = Beyond late messages Tail L = Late message LH = Late messages Head LT = Late messages Tail P = Pending message PH = Pending messages Head PT = Pending messages Tail Caveat: the virtual methods enqueue_tail, enqueue_head, and peek_dequeue_head have semantics for the static message queues that cannot be guaranteed for dynamic message queues. The peek_dequeue_head method just calls the base class method, while the two enqueue methods call the priority enqueue method. The order of messages in the dynamic queue is a function of message deadlines and how long they are in the queues. You can manipulate these in some cases to ensure the correct semantics, but that is not a very stable or portable approach (discouraged).
Definition at line 767 of file Message_Queue_T.h.
ACE_Dynamic_Message_Queue<>::ACE_Dynamic_Message_Queue | ( | ACE_Dynamic_Message_Strategy & | message_strategy, | |
size_t | hwm = ACE_Message_Queue_Base::DEFAULT_HWM , |
|||
size_t | lwm = ACE_Message_Queue_Base::DEFAULT_LWM , |
|||
ACE_Notification_Strategy * | = 0 | |||
) |
Definition at line 1913 of file Message_Queue_T.cpp.
01917 : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns), 01918 pending_head_ (0), 01919 pending_tail_ (0), 01920 late_head_ (0), 01921 late_tail_ (0), 01922 beyond_late_head_ (0), 01923 beyond_late_tail_ (0), 01924 message_strategy_ (message_strategy) 01925 { 01926 // Note, the ACE_Dynamic_Message_Queue assumes full responsibility 01927 // for the passed ACE_Dynamic_Message_Strategy object, and deletes 01928 // it in its own dtor 01929 }
ACE_Dynamic_Message_Queue<>::~ACE_Dynamic_Message_Queue | ( | void | ) | [virtual] |
Close down the message queue and release all resources.
Definition at line 1934 of file Message_Queue_T.cpp.
References ACE_Dynamic_Message_Queue<>::message_strategy_.
01935 { 01936 delete &this->message_strategy_; 01937 }
ACE_Dynamic_Message_Queue<>::ACE_Dynamic_Message_Queue | ( | const ACE_Dynamic_Message_Queue< _ACE_SYNCH_MUTEX_T, _ACE_SYNCH_CONDITION_T > & | ) | [private] |
int ACE_Dynamic_Message_Queue<>::dequeue_head | ( | ACE_Message_Block *& | first_item, | |
ACE_Time_Value * | timeout = 0 | |||
) | [virtual] |
Dequeue and return the <ACE_Message_Block *> at the head of the queue. Returns -1 on failure, else the number of items still on the queue.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.
Definition at line 2080 of file Message_Queue_T.cpp.
References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX_T, ACE_TRACE, ACE_Message_Queue_Base::DEACTIVATED, ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_OS::gettimeofday(), ACE_Dynamic_Message_Queue<>::refresh_queue(), and ACE_Message_Queue< ACE_SYNCH_USE >::wait_not_empty_cond().
02082 { 02083 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head"); 02084 02085 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); 02086 02087 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED) 02088 { 02089 errno = ESHUTDOWN; 02090 return -1; 02091 } 02092 02093 int result; 02094 02095 // get the current time 02096 ACE_Time_Value current_time = ACE_OS::gettimeofday (); 02097 02098 // refresh priority status boundaries in the queue 02099 result = this->refresh_queue (current_time); 02100 if (result < 0) 02101 return result; 02102 02103 // *now* it's appropriate to wait for an enqueued item 02104 result = this->wait_not_empty_cond (ace_mon, timeout); 02105 if (result == -1) 02106 return result; 02107 02108 // call the internal dequeue method, which selects an item from the 02109 // highest priority status portion of the queue that has messages 02110 // enqueued. 02111 result = this->dequeue_head_i (first_item); 02112 02113 return result; 02114 }
int ACE_Dynamic_Message_Queue<>::dequeue_head_i | ( | ACE_Message_Block *& | first_item | ) | [protected, virtual] |
Dequeue and return the <ACE_Message_Block *> at the head of the logical queue. Attempts first to dequeue from the pending portion of the queue, or if that is empty from the late portion, or if that is empty from the beyond late portion, or if that is empty just sets the passed pointer to zero and returns -1.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.
Definition at line 2361 of file Message_Queue_T.cpp.
References ACE_TRACE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_bytes_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_count_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_length_, ACE_Message_Queue< ACE_SYNCH_USE >::head_, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), and ACE_Message_Block::total_size_and_length().
Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head().
02362 { 02363 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i"); 02364 02365 int result = 0; 02366 int last_in_subqueue = 0; 02367 02368 // first, try to dequeue from the head of the pending list 02369 if (this->pending_head_) 02370 { 02371 first_item = this->pending_head_; 02372 02373 if (0 == this->pending_head_->prev ()) 02374 this->head_ = this->pending_head_->next (); 02375 else 02376 this->pending_head_->prev ()->next (this->pending_head_->next ()); 02377 02378 if (0 == this->pending_head_->next ()) 02379 { 02380 this->tail_ = this->pending_head_->prev (); 02381 this->pending_head_ = 0; 02382 this->pending_tail_ = 0; 02383 } 02384 else 02385 { 02386 this->pending_head_->next ()->prev (this->pending_head_->prev ()); 02387 this->pending_head_ = this->pending_head_->next (); 02388 } 02389 02390 first_item->prev (0); 02391 first_item->next (0); 02392 } 02393 02394 // Second, try to dequeue from the head of the late list 02395 else if (this->late_head_) 02396 { 02397 last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0; 02398 02399 first_item = this->late_head_; 02400 02401 if (0 == this->late_head_->prev ()) 02402 this->head_ = this->late_head_->next (); 02403 else 02404 this->late_head_->prev ()->next (this->late_head_->next ()); 02405 02406 if (0 == this->late_head_->next ()) 02407 this->tail_ = this->late_head_->prev (); 02408 else 02409 { 02410 this->late_head_->next ()->prev (this->late_head_->prev ()); 02411 this->late_head_ = this->late_head_->next (); 02412 } 02413 02414 if (last_in_subqueue) 02415 { 02416 this->late_head_ = 0; 02417 this->late_tail_ = 0; 02418 } 02419 02420 first_item->prev (0); 02421 first_item->next (0); 02422 } 02423 // finally, try to dequeue from the head of the beyond late list 02424 else if (this->beyond_late_head_) 02425 { 02426 last_in_subqueue = 02427 (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0; 02428 02429 first_item = this->beyond_late_head_; 02430 this->head_ = this->beyond_late_head_->next (); 02431 02432 if (0 == this->beyond_late_head_->next ()) 02433 this->tail_ = this->beyond_late_head_->prev (); 02434 else 02435 { 02436 this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ()); 02437 this->beyond_late_head_ = this->beyond_late_head_->next (); 02438 } 02439 02440 if (last_in_subqueue) 02441 { 02442 this->beyond_late_head_ = 0; 02443 this->beyond_late_tail_ = 0; 02444 } 02445 02446 first_item->prev (0); 02447 first_item->next (0); 02448 } 02449 else 02450 { 02451 // nothing to dequeue: set the pointer to zero and return an error code 02452 first_item = 0; 02453 result = -1; 02454 } 02455 02456 if (result < 0) 02457 return result; 02458 02459 size_t mb_bytes = 0; 02460 size_t mb_length = 0; 02461 first_item->total_size_and_length (mb_bytes, 02462 mb_length); 02463 // Subtract off all of the bytes associated with this message. 02464 this->cur_bytes_ -= mb_bytes; 02465 this->cur_length_ -= mb_length; 02466 --this->cur_count_; 02467 02468 // Only signal enqueueing threads if we've fallen below the low 02469 // water mark. 02470 if (this->cur_bytes_ <= this->low_water_mark_ 02471 && this->signal_enqueue_waiters () == -1) 02472 return -1; 02473 else 02474 return this->cur_count_; 02475 }
void ACE_Dynamic_Message_Queue<>::dump | ( | void | ) | const [virtual] |
Dump the state of the queue.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.
Definition at line 2120 of file Message_Queue_T.cpp.
References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_TEXT, ACE_TRACE, ACE_Dynamic_Message_Strategy::dump(), ACE_Message_Queue<>::dump(), LM_DEBUG, and ACE_Dynamic_Message_Queue<>::message_strategy_.
02121 { 02122 #if defined (ACE_HAS_DUMP) 02123 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump"); 02124 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); 02125 02126 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n"))); 02127 this->ACE_Message_Queue<ACE_SYNCH_USE>::dump (); 02128 02129 ACE_DEBUG ((LM_DEBUG, 02130 ACE_TEXT ("pending_head_ = %u\n") 02131 ACE_TEXT ("pending_tail_ = %u\n") 02132 ACE_TEXT ("late_head_ = %u\n") 02133 ACE_TEXT ("late_tail_ = %u\n") 02134 ACE_TEXT ("beyond_late_head_ = %u\n") 02135 ACE_TEXT ("beyond_late_tail_ = %u\n"), 02136 this->pending_head_, 02137 this->pending_tail_, 02138 this->late_head_, 02139 this->late_tail_, 02140 this->beyond_late_head_, 02141 this->beyond_late_tail_)); 02142 02143 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("message_strategy_ : \n"))); 02144 message_strategy_.dump (); 02145 02146 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); 02147 #endif /* ACE_HAS_DUMP */ 02148 }
int ACE_Dynamic_Message_Queue<>::enqueue_head | ( | ACE_Message_Block * | new_item, | |
ACE_Time_Value * | timeout = 0 | |||
) | [virtual] |
Just call priority enqueue method: head enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.
Definition at line 2732 of file Message_Queue_T.cpp.
References ACE_TRACE, and ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_prio().
02734 { 02735 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head"); 02736 return this->enqueue_prio (new_item, timeout); 02737 }
int ACE_Dynamic_Message_Queue<>::enqueue_i | ( | ACE_Message_Block * | new_item | ) | [protected, virtual] |
Enqueue an <ACE_Message_Block *> in accordance with its priority. priority may be *dynamic* or *static* or a combination or *both* It calls the priority evaluation function passed into the Dynamic Message Queue constructor to update the priorities of all enqueued messages.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.
Definition at line 2152 of file Message_Queue_T.cpp.
References ACE_TRACE, ACE_Dynamic_Message_Strategy::BEYOND_LATE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_bytes_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_count_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_length_, ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_head_i(), ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_tail_i(), ACE_OS::gettimeofday(), ACE_Dynamic_Message_Strategy::LATE, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Dynamic_Message_Strategy::PENDING, ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Dynamic_Message_Strategy::priority_status(), ACE_Dynamic_Message_Queue<>::refresh_queue(), ACE_Dynamic_Message_Queue<>::sublist_enqueue_i(), ACE_Message_Queue< ACE_SYNCH_USE >::tail_, and ACE_Message_Block::total_size_and_length().
02153 { 02154 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i"); 02155 02156 if (new_item == 0) 02157 return -1; 02158 02159 int result = 0; 02160 02161 // Get the current time. 02162 ACE_Time_Value current_time = ACE_OS::gettimeofday (); 02163 02164 // Refresh priority status boundaries in the queue. 02165 02166 result = this->refresh_queue (current_time); 02167 if (result < 0) 02168 return result; 02169 02170 // Where we enqueue depends on the message's priority status. 02171 switch (message_strategy_.priority_status (*new_item, 02172 current_time)) 02173 { 02174 case ACE_Dynamic_Message_Strategy::PENDING: 02175 if (this->pending_tail_ == 0) 02176 { 02177 // Check for simple case of an empty pending queue, where 02178 // all we need to do is insert <new_item> into the tail of 02179 // the queue. 02180 pending_head_ = new_item; 02181 pending_tail_ = pending_head_; 02182 return this->enqueue_tail_i (new_item); 02183 } 02184 else 02185 { 02186 // Enqueue the new message in priority order in the pending 02187 // sublist 02188 result = sublist_enqueue_i (new_item, 02189 current_time, 02190 this->pending_head_, 02191 this->pending_tail_, 02192 ACE_Dynamic_Message_Strategy::PENDING); 02193 } 02194 break; 02195 02196 case ACE_Dynamic_Message_Strategy::LATE: 02197 if (this->late_tail_ == 0) 02198 { 02199 late_head_ = new_item; 02200 late_tail_ = late_head_; 02201 02202 if (this->pending_head_ == 0) 02203 // Check for simple case of an empty pending queue, 02204 // where all we need to do is insert <new_item> into the 02205 // tail of the queue. 02206 return this->enqueue_tail_i (new_item); 02207 else if (this->beyond_late_tail_ == 0) 02208 // Check for simple case of an empty beyond late queue, where all 02209 // we need to do is insert <new_item> into the head of the queue. 02210 return this->enqueue_head_i (new_item); 02211 else 02212 { 02213 // Otherwise, we can just splice the new message in 02214 // between the pending and beyond late portions of the 02215 // queue. 02216 this->beyond_late_tail_->next (new_item); 02217 new_item->prev (this->beyond_late_tail_); 02218 this->pending_head_->prev (new_item); 02219 new_item->next (this->pending_head_); 02220 } 02221 } 02222 else 02223 { 02224 // Enqueue the new message in priority order in the late 02225 // sublist 02226 result = sublist_enqueue_i (new_item, 02227 current_time, 02228 this->late_head_, 02229 this->late_tail_, 02230 ACE_Dynamic_Message_Strategy::LATE); 02231 } 02232 break; 02233 02234 case ACE_Dynamic_Message_Strategy::BEYOND_LATE: 02235 if (this->beyond_late_tail_ == 0) 02236 { 02237 // Check for simple case of an empty beyond late queue, 02238 // where all we need to do is insert <new_item> into the 02239 // head of the queue. 02240 beyond_late_head_ = new_item; 02241 beyond_late_tail_ = beyond_late_head_; 02242 return this->enqueue_head_i (new_item); 02243 } 02244 else 02245 { 02246 // all beyond late messages have the same (zero) priority, 02247 // so just put the new one at the end of the beyond late 02248 // messages 02249 if (this->beyond_late_tail_->next ()) 02250 this->beyond_late_tail_->next ()->prev (new_item); 02251 else 02252 this->tail_ = new_item; 02253 02254 new_item->next (this->beyond_late_tail_->next ()); 02255 this->beyond_late_tail_->next (new_item); 02256 new_item->prev (this->beyond_late_tail_); 02257 this->beyond_late_tail_ = new_item; 02258 } 02259 02260 break; 02261 02262 // should never get here, but just in case... 02263 default: 02264 result = -1; 02265 break; 02266 } 02267 02268 if (result < 0) 02269 return result; 02270 02271 size_t mb_bytes = 0; 02272 size_t mb_length = 0; 02273 new_item->total_size_and_length (mb_bytes, 02274 mb_length); 02275 this->cur_bytes_ += mb_bytes; 02276 this->cur_length_ += mb_length; 02277 ++this->cur_count_; 02278 02279 if (this->signal_dequeue_waiters () == -1) 02280 return -1; 02281 else 02282 return this->cur_count_; 02283 }
int ACE_Dynamic_Message_Queue<>::enqueue_tail | ( | ACE_Message_Block * | new_item, | |
ACE_Time_Value * | timeout = 0 | |||
) | [virtual] |
Just call priority enqueue method: tail enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.
Definition at line 2719 of file Message_Queue_T.cpp.
References ACE_TRACE, and ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_prio().
02721 { 02722 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail"); 02723 return this->enqueue_prio (new_item, timeout); 02724 }
void ACE_Dynamic_Message_Queue<>::operator= | ( | const ACE_Dynamic_Message_Queue< _ACE_SYNCH_MUTEX_T, _ACE_SYNCH_CONDITION_T > & | ) | [private] |
int ACE_Dynamic_Message_Queue<>::peek_dequeue_head | ( | ACE_Message_Block *& | first_item, | |
ACE_Time_Value * | timeout = 0 | |||
) | [private, virtual] |
Private method to hide public base class method: just calls base class method.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.
Definition at line 2708 of file Message_Queue_T.cpp.
References ACE_Message_Queue<>::peek_dequeue_head().
02710 { 02711 return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item, 02712 timeout); 02713 }
int ACE_Dynamic_Message_Queue<>::refresh_late_queue | ( | const ACE_Time_Value & | current_time | ) | [protected, virtual] |
Refresh the late queue using the strategy specific priority status function.
Definition at line 2623 of file Message_Queue_T.cpp.
References ACE_ERROR_RETURN, ACE_TEXT, ACE_Dynamic_Message_Strategy::BEYOND_LATE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::head_, ACE_Dynamic_Message_Strategy::LATE, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, LM_ERROR, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Dynamic_Message_Strategy::PENDING, ACE_Message_Block::prev(), ACE_Dynamic_Message_Strategy::priority_status(), and ACE_Message_Queue< ACE_SYNCH_USE >::tail_.
Referenced by ACE_Dynamic_Message_Queue<>::refresh_queue().
02624 { 02625 ACE_Dynamic_Message_Strategy::Priority_Status current_status; 02626 02627 if (this->late_head_) 02628 { 02629 current_status = message_strategy_.priority_status (*this->late_head_, 02630 current_time); 02631 switch (current_status) 02632 { 02633 case ACE_Dynamic_Message_Strategy::BEYOND_LATE: 02634 02635 // make sure the head of the beyond late queue is set 02636 // (there may not have been any beyond late messages previously) 02637 this->beyond_late_head_ = this->head_; 02638 02639 // advance through the beyond late messages in the late queue 02640 do 02641 { 02642 this->late_head_ = this->late_head_->next (); 02643 02644 if (this->late_head_) 02645 current_status = message_strategy_.priority_status (*this->late_head_, 02646 current_time); 02647 else 02648 break; // do while 02649 02650 } 02651 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE); 02652 02653 if (this->late_head_) 02654 { 02655 // point tail of beyond late sublist to previous item 02656 this->beyond_late_tail_ = this->late_head_->prev (); 02657 02658 if (current_status == ACE_Dynamic_Message_Strategy::PENDING) 02659 { 02660 // there are no late messages left in the queue 02661 this->late_head_ = 0; 02662 this->late_tail_ = 0; 02663 } 02664 else if (current_status != ACE_Dynamic_Message_Strategy::LATE) 02665 // if we got here, something is *seriously* wrong with the queue 02666 ACE_ERROR_RETURN ((LM_ERROR, 02667 ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"), 02668 (int) current_status), 02669 -1); 02670 } 02671 else 02672 { 02673 // there are no late messages left in the queue 02674 this->beyond_late_tail_ = this->tail_; 02675 this->late_head_ = 0; 02676 this->late_tail_ = 0; 02677 } 02678 02679 break; // switch 02680 02681 case ACE_Dynamic_Message_Strategy::LATE: 02682 // do nothing - the late queue is unchanged 02683 break; // switch 02684 02685 case ACE_Dynamic_Message_Strategy::PENDING: 02686 // if we got here, something is *seriously* wrong with the queue 02687 ACE_ERROR_RETURN ((LM_ERROR, 02688 ACE_TEXT ("Unexpected message priority status ") 02689 ACE_TEXT ("[%d] (expected LATE or BEYOND_LATE)"), 02690 (int) current_status), 02691 -1); 02692 default: 02693 // if we got here, something is *seriously* wrong with the queue 02694 ACE_ERROR_RETURN ((LM_ERROR, 02695 ACE_TEXT ("Unknown message priority status [%d]"), 02696 (int) current_status), 02697 -1); 02698 } 02699 } 02700 02701 return 0; 02702 }
int ACE_Dynamic_Message_Queue<>::refresh_pending_queue | ( | const ACE_Time_Value & | current_time | ) | [protected, virtual] |
Refresh the pending queue using the strategy specific priority status function.
Definition at line 2500 of file Message_Queue_T.cpp.
References ACE_ERROR_RETURN, ACE_TEXT, ACE_Dynamic_Message_Strategy::BEYOND_LATE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::head_, ACE_Dynamic_Message_Strategy::LATE, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, LM_ERROR, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Dynamic_Message_Strategy::PENDING, ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Dynamic_Message_Strategy::priority_status(), and ACE_Message_Queue< ACE_SYNCH_USE >::tail_.
Referenced by ACE_Dynamic_Message_Queue<>::refresh_queue().
02501 { 02502 ACE_Dynamic_Message_Strategy::Priority_Status current_status; 02503 02504 // refresh priority status boundaries in the queue 02505 if (this->pending_head_) 02506 { 02507 current_status = message_strategy_.priority_status (*this->pending_head_, 02508 current_time); 02509 switch (current_status) 02510 { 02511 case ACE_Dynamic_Message_Strategy::BEYOND_LATE: 02512 // Make sure the head of the beyond late queue is set (there 02513 // may not have been any beyond late messages previously) 02514 this->beyond_late_head_ = this->head_; 02515 02516 // Zero out the late queue pointers, and set them only if 02517 // there turn out to be late messages in the pending sublist 02518 this->late_head_ = 0; 02519 this->late_tail_ = 0; 02520 02521 // Advance through the beyond late messages in the pending queue 02522 do 02523 { 02524 this->pending_head_ = this->pending_head_->next (); 02525 02526 if (this->pending_head_) 02527 current_status = message_strategy_.priority_status (*this->pending_head_, 02528 current_time); 02529 else 02530 break; // do while 02531 02532 } 02533 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE); 02534 02535 if (this->pending_head_) 02536 { 02537 // point tail of beyond late sublist to previous item 02538 this->beyond_late_tail_ = this->pending_head_->prev (); 02539 02540 if (current_status == ACE_Dynamic_Message_Strategy::PENDING) 02541 // there are no late messages left in the queue 02542 break; // switch 02543 else if (current_status != ACE_Dynamic_Message_Strategy::LATE) 02544 { 02545 // if we got here, something is *seriously* wrong with the queue 02546 ACE_ERROR_RETURN ((LM_ERROR, 02547 ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"), 02548 (int) current_status), 02549 -1); 02550 } 02551 /* FALLTHRU */ 02552 } 02553 else 02554 { 02555 // There are no pending or late messages left in the 02556 // queue. 02557 this->beyond_late_tail_ = this->tail_; 02558 this->pending_head_ = 0; 02559 this->pending_tail_ = 0; 02560 break; // switch 02561 } 02562 02563 case ACE_Dynamic_Message_Strategy::LATE: 02564 // Make sure the head of the late queue is set (there may 02565 // not have been any late messages previously, or they may 02566 // have all become beyond late). 02567 if (this->late_head_ == 0) 02568 this->late_head_ = this->pending_head_; 02569 02570 // advance through the beyond late messages in the pending queue 02571 do 02572 { 02573 this->pending_head_ = this->pending_head_->next (); 02574 02575 if (this->pending_head_) 02576 current_status = message_strategy_.priority_status (*this->pending_head_, 02577 current_time); 02578 else 02579 break; // do while 02580 02581 } 02582 while (current_status == ACE_Dynamic_Message_Strategy::LATE); 02583 02584 if (this->pending_head_) 02585 { 02586 if (current_status != ACE_Dynamic_Message_Strategy::PENDING) 02587 // if we got here, something is *seriously* wrong with the queue 02588 ACE_ERROR_RETURN((LM_ERROR, 02589 ACE_TEXT ("Unexpected message priority status [%d] (expected PENDING)"), 02590 (int) current_status), 02591 -1); 02592 02593 // Point tail of late sublist to previous item 02594 this->late_tail_ = this->pending_head_->prev (); 02595 } 02596 else 02597 { 02598 // there are no pending messages left in the queue 02599 this->late_tail_ = this->tail_; 02600 this->pending_head_ = 0; 02601 this->pending_tail_ = 0; 02602 } 02603 02604 break; // switch 02605 case ACE_Dynamic_Message_Strategy::PENDING: 02606 // do nothing - the pending queue is unchanged 02607 break; // switch 02608 default: 02609 // if we got here, something is *seriously* wrong with the queue 02610 ACE_ERROR_RETURN((LM_ERROR, 02611 ACE_TEXT ("Unknown message priority status [%d]"), 02612 (int) current_status), 02613 -1); 02614 } 02615 } 02616 return 0; 02617 }
int ACE_Dynamic_Message_Queue<>::refresh_queue | ( | const ACE_Time_Value & | current_time | ) | [protected, virtual] |
Refresh the queue using the strategy specific priority status function.
Definition at line 2484 of file Message_Queue_T.cpp.
References ACE_Dynamic_Message_Queue<>::refresh_late_queue(), and ACE_Dynamic_Message_Queue<>::refresh_pending_queue().
Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head(), ACE_Dynamic_Message_Queue<>::enqueue_i(), and ACE_Dynamic_Message_Queue<>::remove_messages().
02485 { 02486 int result; 02487 02488 result = refresh_pending_queue (current_time); 02489 02490 if (result != -1) 02491 result = refresh_late_queue (current_time); 02492 02493 return result; 02494 }
int ACE_Dynamic_Message_Queue<>::remove_messages | ( | ACE_Message_Block *& | list_head, | |
ACE_Message_Block *& | list_tail, | |||
u_int | status_flags | |||
) | [virtual] |
Detach all messages with status given in the passed flags from the queue and return them by setting passed head and tail pointers to the linked list they comprise. This method is intended primarily as a means of periodically harvesting messages that have missed their deadlines, but is available in its most general form. All messages are returned in priority order, from head to tail, as of the time this method was called.
Definition at line 1940 of file Message_Queue_T.cpp.
References ACE_BIT_ENABLED, ACE_Dynamic_Message_Strategy::BEYOND_LATE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_bytes_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_length_, ACE_OS::gettimeofday(), ACE_Message_Queue< ACE_SYNCH_USE >::head_, ACE_Dynamic_Message_Strategy::LATE, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Message_Block::next(), ACE_Dynamic_Message_Strategy::PENDING, ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Dynamic_Message_Queue<>::refresh_queue(), ACE_Message_Queue< ACE_SYNCH_USE >::tail_, and ACE_Message_Block::total_size_and_length().
01943 { 01944 // start with an empty list 01945 list_head = 0; 01946 list_tail = 0; 01947 01948 // Get the current time 01949 ACE_Time_Value current_time = ACE_OS::gettimeofday (); 01950 01951 // Refresh priority status boundaries in the queue. 01952 int result = this->refresh_queue (current_time); 01953 if (result < 0) 01954 return result; 01955 01956 if (ACE_BIT_ENABLED (status_flags, 01957 (u_int) ACE_Dynamic_Message_Strategy::PENDING) 01958 && this->pending_head_ 01959 && this->pending_tail_) 01960 { 01961 // patch up pointers for the new tail of the queue 01962 if (this->pending_head_->prev ()) 01963 { 01964 this->tail_ = this->pending_head_->prev (); 01965 this->pending_head_->prev ()->next (0); 01966 } 01967 else 01968 { 01969 // the list has become empty 01970 this->head_ = 0; 01971 this->tail_ = 0; 01972 } 01973 01974 // point to the head and tail of the list 01975 list_head = this->pending_head_; 01976 list_tail = this->pending_tail_; 01977 01978 // cut the pending messages out of the queue entirely 01979 this->pending_head_->prev (0); 01980 this->pending_head_ = 0; 01981 this->pending_tail_ = 0; 01982 } 01983 01984 if (ACE_BIT_ENABLED (status_flags, 01985 (u_int) ACE_Dynamic_Message_Strategy::LATE) 01986 && this->late_head_ 01987 && this->late_tail_) 01988 { 01989 // Patch up pointers for the (possibly) new head and tail of the 01990 // queue. 01991 if (this->late_tail_->next ()) 01992 this->late_tail_->next ()->prev (this->late_head_->prev ()); 01993 else 01994 this->tail_ = this->late_head_->prev (); 01995 01996 if (this->late_head_->prev ()) 01997 this->late_head_->prev ()->next (this->late_tail_->next ()); 01998 else 01999 this->head_ = this->late_tail_->next (); 02000 02001 // put late messages behind pending messages (if any) being returned 02002 this->late_head_->prev (list_tail); 02003 if (list_tail) 02004 list_tail->next (this->late_head_); 02005 else 02006 list_head = this->late_head_; 02007 02008 list_tail = this->late_tail_; 02009 02010 this->late_tail_->next (0); 02011 this->late_head_ = 0; 02012 this->late_tail_ = 0; 02013 } 02014 02015 if (ACE_BIT_ENABLED (status_flags, 02016 (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE) 02017 && this->beyond_late_head_ 02018 && this->beyond_late_tail_) 02019 { 02020 // Patch up pointers for the new tail of the queue 02021 if (this->beyond_late_tail_->next ()) 02022 { 02023 this->head_ = this->beyond_late_tail_->next (); 02024 this->beyond_late_tail_->next ()->prev (0); 02025 } 02026 else 02027 { 02028 // the list has become empty 02029 this->head_ = 0; 02030 this->tail_ = 0; 02031 } 02032 02033 // Put beyond late messages at the end of the list being 02034 // returned. 02035 if (list_tail) 02036 { 02037 this->beyond_late_head_->prev (list_tail); 02038 list_tail->next (this->beyond_late_head_); 02039 } 02040 else 02041 list_head = this->beyond_late_head_; 02042 02043 list_tail = this->beyond_late_tail_; 02044 02045 this->beyond_late_tail_->next (0); 02046 this->beyond_late_head_ = 0; 02047 this->beyond_late_tail_ = 0; 02048 } 02049 02050 // Decrement message and size counts for removed messages. 02051 ACE_Message_Block *temp1; 02052 02053 for (temp1 = list_head; 02054 temp1 != 0; 02055 temp1 = temp1->next ()) 02056 { 02057 --this->cur_count_; 02058 02059 size_t mb_bytes = 0; 02060 size_t mb_length = 0; 02061 temp1->total_size_and_length (mb_bytes, 02062 mb_length); 02063 // Subtract off all of the bytes associated with this message. 02064 this->cur_bytes_ -= mb_bytes; 02065 this->cur_length_ -= mb_length; 02066 } 02067 02068 return result; 02069 }
int ACE_Dynamic_Message_Queue<>::sublist_enqueue_i | ( | ACE_Message_Block * | new_item, | |
const ACE_Time_Value & | current_time, | |||
ACE_Message_Block *& | sublist_head, | |||
ACE_Message_Block *& | sublist_tail, | |||
ACE_Dynamic_Message_Strategy::Priority_Status | status | |||
) | [protected, virtual] |
Enqueue a message in priority order within a given priority status sublist.
Definition at line 2292 of file Message_Queue_T.cpp.
References ACE_Message_Queue< ACE_SYNCH_USE >::head_, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::msg_priority(), ACE_Message_Block::next(), ACE_Message_Block::prev(), ACE_Dynamic_Message_Strategy::priority_status(), and ACE_Message_Queue< ACE_SYNCH_USE >::tail_.
Referenced by ACE_Dynamic_Message_Queue<>::enqueue_i().
02297 { 02298 int result = 0; 02299 ACE_Message_Block *current_item = 0; 02300 02301 // Find message after which to enqueue new item, based on message 02302 // priority and priority status. 02303 for (current_item = sublist_tail; 02304 current_item; 02305 current_item = current_item->prev ()) 02306 { 02307 if (message_strategy_.priority_status (*current_item, current_time) == status) 02308 { 02309 if (current_item->msg_priority () >= new_item->msg_priority ()) 02310 break; 02311 } 02312 else 02313 { 02314 sublist_head = new_item; 02315 break; 02316 } 02317 } 02318 02319 if (current_item == 0) 02320 { 02321 // If the new message has highest priority of any, put it at the 02322 // head of the list (and sublist). 02323 new_item->prev (0); 02324 new_item->next (this->head_); 02325 if (this->head_ != 0) 02326 this->head_->prev (new_item); 02327 else 02328 { 02329 this->tail_ = new_item; 02330 sublist_tail = new_item; 02331 } 02332 this->head_ = new_item; 02333 sublist_head = new_item; 02334 } 02335 else 02336 { 02337 // insert the new item into the list 02338 new_item->next (current_item->next ()); 02339 new_item->prev (current_item); 02340 02341 if (current_item->next ()) 02342 current_item->next ()->prev (new_item); 02343 else 02344 this->tail_ = new_item; 02345 02346 current_item->next (new_item); 02347 02348 // If the new item has lowest priority of any in the sublist, 02349 // move the tail pointer of the sublist back to the new item 02350 if (current_item == sublist_tail) 02351 sublist_tail = new_item; 02352 } 02353 02354 return result; 02355 }
ACE_Dynamic_Message_Queue<>::ACE_ALLOC_HOOK_DECLARE |
Declare the dynamic allocation hooks.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.
Definition at line 823 of file Message_Queue_T.h.
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::beyond_late_head_ [protected] |
Pointer to head of the beyond late messages.
Definition at line 877 of file Message_Queue_T.h.
Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::beyond_late_tail_ [protected] |
Pointer to tail of the beyond late messages.
Definition at line 880 of file Message_Queue_T.h.
Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::late_head_ [protected] |
Pointer to head of the late messages.
Definition at line 871 of file Message_Queue_T.h.
Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::late_tail_ [protected] |
Pointer to tail of the late messages.
Definition at line 874 of file Message_Queue_T.h.
Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().
ACE_Dynamic_Message_Strategy& ACE_Dynamic_Message_Queue<>::message_strategy_ [protected] |
Pointer to a dynamic priority evaluation function.
Definition at line 883 of file Message_Queue_T.h.
Referenced by ACE_Dynamic_Message_Queue<>::dump(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), ACE_Dynamic_Message_Queue<>::sublist_enqueue_i(), and ACE_Dynamic_Message_Queue<>::~ACE_Dynamic_Message_Queue().
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::pending_head_ [protected] |
Pointer to head of the pending messages.
Definition at line 865 of file Message_Queue_T.h.
Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::pending_tail_ [protected] |
Pointer to tail of the pending messages.
Definition at line 868 of file Message_Queue_T.h.
Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().