#include <Message_Queue_T.h>
Inheritance diagram for ACE_Dynamic_Message_Queue<>:


Public Member Functions | |
| ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy &message_strategy, size_t hwm=ACE_Message_Queue_Base::DEFAULT_HWM, size_t lwm=ACE_Message_Queue_Base::DEFAULT_LWM, ACE_Notification_Strategy *=0) | |
| virtual | ~ACE_Dynamic_Message_Queue (void) |
| Close down the message queue and release all resources. | |
| virtual int | remove_messages (ACE_Message_Block *&list_head, ACE_Message_Block *&list_tail, u_int status_flags) |
| virtual int | dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0) |
| virtual void | dump (void) const |
| Dump the state of the queue. | |
| virtual int | enqueue_tail (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0) |
| virtual int | enqueue_head (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0) |
Public Attributes | |
| ACE_ALLOC_HOOK_DECLARE | |
| Declare the dynamic allocation hooks. | |
Protected Member Functions | |
| virtual int | enqueue_i (ACE_Message_Block *new_item) |
| virtual int | sublist_enqueue_i (ACE_Message_Block *new_item, const ACE_Time_Value ¤t_time, ACE_Message_Block *&sublist_head, ACE_Message_Block *&sublist_tail, ACE_Dynamic_Message_Strategy::Priority_Status status) |
| Enqueue a message in priority order within a given priority status sublist. | |
| virtual int | dequeue_head_i (ACE_Message_Block *&first_item) |
| virtual int | refresh_queue (const ACE_Time_Value ¤t_time) |
| virtual int | refresh_pending_queue (const ACE_Time_Value ¤t_time) |
| virtual int | refresh_late_queue (const ACE_Time_Value ¤t_time) |
Protected Attributes | |
| ACE_Message_Block * | pending_head_ |
| Pointer to head of the pending messages. | |
| ACE_Message_Block * | pending_tail_ |
| Pointer to tail of the pending messages. | |
| ACE_Message_Block * | late_head_ |
| Pointer to head of the late messages. | |
| ACE_Message_Block * | late_tail_ |
| Pointer to tail of the late messages. | |
| ACE_Message_Block * | beyond_late_head_ |
| Pointer to head of the beyond late messages. | |
| ACE_Message_Block * | beyond_late_tail_ |
| Pointer to tail of the beyond late messages. | |
| ACE_Dynamic_Message_Strategy & | message_strategy_ |
| Pointer to a dynamic priority evaluation function. | |
Private Member Functions | |
| void | operator= (const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &) |
| ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &) | |
| virtual int | peek_dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0) |
| Private method to hide public base class method: just calls base class method. | |
The messages in the queue are managed so as to preserve a logical ordering with minimal overhead per enqueue and dequeue operation. For this reason, the actual order of messages in the linked list of the queue may differ from their priority order. As time passes, a message may change from pending status to late status, and eventually to beyond late status. To minimize reordering overhead under this design force, three separate boundaries are maintained within the linked list of messages. Messages are dequeued preferentially from the head of the pending portion, then the head of the late portion, and finally from the head of the beyond late portion. In this way, only the boundaries need to be maintained (which can be done efficiently, as aging messages maintain the same linked list order as they progress from one status to the next), with no reordering of the messages themselves, while providing correct priority ordered dequeueing semantics. Head and tail enqueue methods inherited from ACE_Message_Queue are made private to prevent out-of-order messages from confusing management of the various portions of the queue. Messages in the pending portion of the queue whose priority becomes late (according to the specific dynamic strategy) advance into the late portion of the queue. Messages in the late portion of the queue whose priority becomes later than can be represented advance to the beyond_late portion of the queue. These behaviors support a limited schedule overrun, with pending messages prioritized ahead of late messages, and late messages ahead of beyond late messages. These behaviors can be modified in derived classes by providing alternative definitions for the appropriate virtual methods. When filled with messages, the queue's linked list should look like: H T | | B - B - B - B - L - L - L - P - P - P - P - P | | | | | | BH BT LH LT PH PT Where the symbols are as follows: H = Head of the entire list T = Tail of the entire list B = Beyond late message BH = Beyond late messages Head BT = Beyond late messages Tail L = Late message LH = Late messages Head LT = Late messages Tail P = Pending message PH = Pending messages Head PT = Pending messages Tail Caveat: the virtual methods enqueue_tail, enqueue_head, and peek_dequeue_head have semantics for the static message queues that cannot be guaranteed for dynamic message queues. The peek_dequeue_head method just calls the base class method, while the two enqueue methods call the priority enqueue method. The order of messages in the dynamic queue is a function of message deadlines and how long they are in the queues. You can manipulate these in some cases to ensure the correct semantics, but that is not a very stable or portable approach (discouraged).
Definition at line 750 of file Message_Queue_T.h.
|
||||||||||||||||||||||||
|
Definition at line 1873 of file Message_Queue_T.cpp. References ACE_SYNCH_USE.
01877 : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns), 01878 pending_head_ (0), 01879 pending_tail_ (0), 01880 late_head_ (0), 01881 late_tail_ (0), 01882 beyond_late_head_ (0), 01883 beyond_late_tail_ (0), 01884 message_strategy_ (message_strategy) 01885 { 01886 // Note, the ACE_Dynamic_Message_Queue assumes full responsibility 01887 // for the passed ACE_Dynamic_Message_Strategy object, and deletes 01888 // it in its own dtor 01889 } |
|
||||||||||
|
Close down the message queue and release all resources.
Definition at line 1894 of file Message_Queue_T.cpp. References ACE_Dynamic_Message_Queue<>::message_strategy_.
01895 {
01896 delete &this->message_strategy_;
01897 }
|
|
||||||||||
|
|
|
||||||||||||||||
|
Dequeue and return the <ACE_Message_Block *> at the head of the queue. Returns -1 on failure, else the number of items still on the queue. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2040 of file Message_Queue_T.cpp. References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX_T, ACE_TRACE, ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ESHUTDOWN, ACE_OS::gettimeofday(), ACE_Dynamic_Message_Queue<>::refresh_queue(), and ACE_Message_Queue< ACE_SYNCH_USE >::wait_not_empty_cond().
02042 {
02043 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
02044
02045 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
02046
02047 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
02048 {
02049 errno = ESHUTDOWN;
02050 return -1;
02051 }
02052
02053 int result;
02054
02055 // get the current time
02056 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02057
02058 // refresh priority status boundaries in the queue
02059 result = this->refresh_queue (current_time);
02060 if (result < 0)
02061 return result;
02062
02063 // *now* it's appropriate to wait for an enqueued item
02064 result = this->wait_not_empty_cond (ace_mon, timeout);
02065 if (result == -1)
02066 return result;
02067
02068 // call the internal dequeue method, which selects an item from the
02069 // highest priority status portion of the queue that has messages
02070 // enqueued.
02071 result = this->dequeue_head_i (first_item);
02072
02073 return result;
02074 }
|
|
||||||||||
|
Dequeue and return the <ACE_Message_Block *> at the head of the logical queue. Attempts first to dequeue from the pending portion of the queue, or if that is empty from the late portion, or if that is empty from the beyond late portion, or if that is empty just sets the passed pointer to zero and returns -1. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2321 of file Message_Queue_T.cpp. References ACE_TRACE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Message_Queue< ACE_SYNCH_USE >::signal_enqueue_waiters(), and ACE_Message_Block::total_size_and_length(). Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head().
02322 {
02323 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
02324
02325 int result = 0;
02326 int last_in_subqueue = 0;
02327
02328 // first, try to dequeue from the head of the pending list
02329 if (this->pending_head_)
02330 {
02331 first_item = this->pending_head_;
02332
02333 if (0 == this->pending_head_->prev ())
02334 this->head_ = this->pending_head_->next ();
02335 else
02336 this->pending_head_->prev ()->next (this->pending_head_->next ());
02337
02338 if (0 == this->pending_head_->next ())
02339 {
02340 this->tail_ = this->pending_head_->prev ();
02341 this->pending_head_ = 0;
02342 this->pending_tail_ = 0;
02343 }
02344 else
02345 {
02346 this->pending_head_->next ()->prev (this->pending_head_->prev ());
02347 this->pending_head_ = this->pending_head_->next ();
02348 }
02349
02350 first_item->prev (0);
02351 first_item->next (0);
02352 }
02353
02354 // Second, try to dequeue from the head of the late list
02355 else if (this->late_head_)
02356 {
02357 last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0;
02358
02359 first_item = this->late_head_;
02360
02361 if (0 == this->late_head_->prev ())
02362 this->head_ = this->late_head_->next ();
02363 else
02364 this->late_head_->prev ()->next (this->late_head_->next ());
02365
02366 if (0 == this->late_head_->next ())
02367 this->tail_ = this->late_head_->prev ();
02368 else
02369 {
02370 this->late_head_->next ()->prev (this->late_head_->prev ());
02371 this->late_head_ = this->late_head_->next ();
02372 }
02373
02374 if (last_in_subqueue)
02375 {
02376 this->late_head_ = 0;
02377 this->late_tail_ = 0;
02378 }
02379
02380 first_item->prev (0);
02381 first_item->next (0);
02382 }
02383 // finally, try to dequeue from the head of the beyond late list
02384 else if (this->beyond_late_head_)
02385 {
02386 last_in_subqueue =
02387 (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0;
02388
02389 first_item = this->beyond_late_head_;
02390 this->head_ = this->beyond_late_head_->next ();
02391
02392 if (0 == this->beyond_late_head_->next ())
02393 this->tail_ = this->beyond_late_head_->prev ();
02394 else
02395 {
02396 this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ());
02397 this->beyond_late_head_ = this->beyond_late_head_->next ();
02398 }
02399
02400 if (last_in_subqueue)
02401 {
02402 this->beyond_late_head_ = 0;
02403 this->beyond_late_tail_ = 0;
02404 }
02405
02406 first_item->prev (0);
02407 first_item->next (0);
02408 }
02409 else
02410 {
02411 // nothing to dequeue: set the pointer to zero and return an error code
02412 first_item = 0;
02413 result = -1;
02414 }
02415
02416 if (result < 0)
02417 return result;
02418
02419 size_t mb_bytes = 0;
02420 size_t mb_length = 0;
02421 first_item->total_size_and_length (mb_bytes,
02422 mb_length);
02423 // Subtract off all of the bytes associated with this message.
02424 this->cur_bytes_ -= mb_bytes;
02425 this->cur_length_ -= mb_length;
02426 --this->cur_count_;
02427
02428 // Only signal enqueueing threads if we've fallen below the low
02429 // water mark.
02430 if (this->cur_bytes_ <= this->low_water_mark_
02431 && this->signal_enqueue_waiters () == -1)
02432 return -1;
02433 else
02434 return this->cur_count_;
02435 }
|
|
||||||||||
|
Dump the state of the queue.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2080 of file Message_Queue_T.cpp. References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_TEXT, ACE_TRACE, ACE_Dynamic_Message_Strategy::dump(), ACE_Message_Queue<>::dump(), LM_DEBUG, and ACE_Dynamic_Message_Queue<>::message_strategy_.
02081 {
02082 #if defined (ACE_HAS_DUMP)
02083 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump");
02084 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02085
02086 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n")));
02087 this->ACE_Message_Queue<ACE_SYNCH_USE>::dump ();
02088
02089 ACE_DEBUG ((LM_DEBUG,
02090 ACE_TEXT ("pending_head_ = %u\n")
02091 ACE_TEXT ("pending_tail_ = %u\n")
02092 ACE_TEXT ("late_head_ = %u\n")
02093 ACE_TEXT ("late_tail_ = %u\n")
02094 ACE_TEXT ("beyond_late_head_ = %u\n")
02095 ACE_TEXT ("beyond_late_tail_ = %u\n"),
02096 this->pending_head_,
02097 this->pending_tail_,
02098 this->late_head_,
02099 this->late_tail_,
02100 this->beyond_late_head_,
02101 this->beyond_late_tail_));
02102
02103 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("message_strategy_ : \n")));
02104 message_strategy_.dump ();
02105
02106 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02107 #endif /* ACE_HAS_DUMP */
02108 }
|
|
||||||||||||||||
|
Just call priority enqueue method: head enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2692 of file Message_Queue_T.cpp. References ACE_TRACE, and ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_prio().
02694 {
02695 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
02696 return this->enqueue_prio (new_item, timeout);
02697 }
|
|
||||||||||
|
Enqueue an <ACE_Message_Block *> in accordance with its priority. priority may be *dynamic* or *static* or a combination or *both* It calls the priority evaluation function passed into the Dynamic Message Queue constructor to update the priorities of all enqueued messages. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2112 of file Message_Queue_T.cpp. References ACE_TRACE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_head_i(), ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_tail_i(), ACE_OS::gettimeofday(), ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Dynamic_Message_Strategy::priority_status(), ACE_Dynamic_Message_Queue<>::refresh_queue(), ACE_Message_Queue< ACE_SYNCH_USE >::signal_dequeue_waiters(), ACE_Dynamic_Message_Queue<>::sublist_enqueue_i(), and ACE_Message_Block::total_size_and_length().
02113 {
02114 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
02115
02116 if (new_item == 0)
02117 return -1;
02118
02119 int result = 0;
02120
02121 // Get the current time.
02122 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02123
02124 // Refresh priority status boundaries in the queue.
02125
02126 result = this->refresh_queue (current_time);
02127 if (result < 0)
02128 return result;
02129
02130 // Where we enqueue depends on the message's priority status.
02131 switch (message_strategy_.priority_status (*new_item,
02132 current_time))
02133 {
02134 case ACE_Dynamic_Message_Strategy::PENDING:
02135 if (this->pending_tail_ == 0)
02136 {
02137 // Check for simple case of an empty pending queue, where
02138 // all we need to do is insert <new_item> into the tail of
02139 // the queue.
02140 pending_head_ = new_item;
02141 pending_tail_ = pending_head_;
02142 return this->enqueue_tail_i (new_item);
02143 }
02144 else
02145 {
02146 // Enqueue the new message in priority order in the pending
02147 // sublist
02148 result = sublist_enqueue_i (new_item,
02149 current_time,
02150 this->pending_head_,
02151 this->pending_tail_,
02152 ACE_Dynamic_Message_Strategy::PENDING);
02153 }
02154 break;
02155
02156 case ACE_Dynamic_Message_Strategy::LATE:
02157 if (this->late_tail_ == 0)
02158 {
02159 late_head_ = new_item;
02160 late_tail_ = late_head_;
02161
02162 if (this->pending_head_ == 0)
02163 // Check for simple case of an empty pending queue,
02164 // where all we need to do is insert <new_item> into the
02165 // tail of the queue.
02166 return this->enqueue_tail_i (new_item);
02167 else if (this->beyond_late_tail_ == 0)
02168 // Check for simple case of an empty beyond late queue, where all
02169 // we need to do is insert <new_item> into the head of the queue.
02170 return this->enqueue_head_i (new_item);
02171 else
02172 {
02173 // Otherwise, we can just splice the new message in
02174 // between the pending and beyond late portions of the
02175 // queue.
02176 this->beyond_late_tail_->next (new_item);
02177 new_item->prev (this->beyond_late_tail_);
02178 this->pending_head_->prev (new_item);
02179 new_item->next (this->pending_head_);
02180 }
02181 }
02182 else
02183 {
02184 // Enqueue the new message in priority order in the late
02185 // sublist
02186 result = sublist_enqueue_i (new_item,
02187 current_time,
02188 this->late_head_,
02189 this->late_tail_,
02190 ACE_Dynamic_Message_Strategy::LATE);
02191 }
02192 break;
02193
02194 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02195 if (this->beyond_late_tail_ == 0)
02196 {
02197 // Check for simple case of an empty beyond late queue,
02198 // where all we need to do is insert <new_item> into the
02199 // head of the queue.
02200 beyond_late_head_ = new_item;
02201 beyond_late_tail_ = beyond_late_head_;
02202 return this->enqueue_head_i (new_item);
02203 }
02204 else
02205 {
02206 // all beyond late messages have the same (zero) priority,
02207 // so just put the new one at the end of the beyond late
02208 // messages
02209 if (this->beyond_late_tail_->next ())
02210 this->beyond_late_tail_->next ()->prev (new_item);
02211 else
02212 this->tail_ = new_item;
02213
02214 new_item->next (this->beyond_late_tail_->next ());
02215 this->beyond_late_tail_->next (new_item);
02216 new_item->prev (this->beyond_late_tail_);
02217 this->beyond_late_tail_ = new_item;
02218 }
02219
02220 break;
02221
02222 // should never get here, but just in case...
02223 default:
02224 result = -1;
02225 break;
02226 }
02227
02228 if (result < 0)
02229 return result;
02230
02231 size_t mb_bytes = 0;
02232 size_t mb_length = 0;
02233 new_item->total_size_and_length (mb_bytes,
02234 mb_length);
02235 this->cur_bytes_ += mb_bytes;
02236 this->cur_length_ += mb_length;
02237 ++this->cur_count_;
02238
02239 if (this->signal_dequeue_waiters () == -1)
02240 return -1;
02241 else
02242 return this->cur_count_;
02243 }
|
|
||||||||||||||||
|
Just call priority enqueue method: tail enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2679 of file Message_Queue_T.cpp. References ACE_TRACE, and ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_prio().
02681 {
02682 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
02683 return this->enqueue_prio (new_item, timeout);
02684 }
|
|
||||||||||
|
|
|
||||||||||||||||
|
Private method to hide public base class method: just calls base class method.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2668 of file Message_Queue_T.cpp. References ACE_Message_Queue<>::peek_dequeue_head().
02670 {
02671 return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item,
02672 timeout);
02673 }
|
|
||||||||||
|
Refresh the late queue using the strategy specific priority status function. Definition at line 2583 of file Message_Queue_T.cpp. References ACE_ERROR_RETURN, ACE_TEXT, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, LM_ERROR, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Message_Block::prev(), and ACE_Dynamic_Message_Strategy::priority_status(). Referenced by ACE_Dynamic_Message_Queue<>::refresh_queue().
02584 {
02585 ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02586
02587 if (this->late_head_)
02588 {
02589 current_status = message_strategy_.priority_status (*this->late_head_,
02590 current_time);
02591 switch (current_status)
02592 {
02593 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02594
02595 // make sure the head of the beyond late queue is set
02596 // (there may not have been any beyond late messages previously)
02597 this->beyond_late_head_ = this->head_;
02598
02599 // advance through the beyond late messages in the late queue
02600 do
02601 {
02602 this->late_head_ = this->late_head_->next ();
02603
02604 if (this->late_head_)
02605 current_status = message_strategy_.priority_status (*this->late_head_,
02606 current_time);
02607 else
02608 break; // do while
02609
02610 }
02611 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02612
02613 if (this->late_head_)
02614 {
02615 // point tail of beyond late sublist to previous item
02616 this->beyond_late_tail_ = this->late_head_->prev ();
02617
02618 if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02619 {
02620 // there are no late messages left in the queue
02621 this->late_head_ = 0;
02622 this->late_tail_ = 0;
02623 }
02624 else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02625 // if we got here, something is *seriously* wrong with the queue
02626 ACE_ERROR_RETURN ((LM_ERROR,
02627 ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02628 (int) current_status),
02629 -1);
02630 }
02631 else
02632 {
02633 // there are no late messages left in the queue
02634 this->beyond_late_tail_ = this->tail_;
02635 this->late_head_ = 0;
02636 this->late_tail_ = 0;
02637 }
02638
02639 break; // switch
02640
02641 case ACE_Dynamic_Message_Strategy::LATE:
02642 // do nothing - the late queue is unchanged
02643 break; // switch
02644
02645 case ACE_Dynamic_Message_Strategy::PENDING:
02646 // if we got here, something is *seriously* wrong with the queue
02647 ACE_ERROR_RETURN ((LM_ERROR,
02648 ACE_TEXT ("Unexpected message priority status ")
02649 ACE_TEXT ("[%d] (expected LATE or BEYOND_LATE)"),
02650 (int) current_status),
02651 -1);
02652 default:
02653 // if we got here, something is *seriously* wrong with the queue
02654 ACE_ERROR_RETURN ((LM_ERROR,
02655 ACE_TEXT ("Unknown message priority status [%d]"),
02656 (int) current_status),
02657 -1);
02658 }
02659 }
02660
02661 return 0;
02662 }
|
|
||||||||||
|
Refresh the pending queue using the strategy specific priority status function. Definition at line 2460 of file Message_Queue_T.cpp. References ACE_ERROR_RETURN, ACE_TEXT, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, LM_ERROR, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), and ACE_Dynamic_Message_Strategy::priority_status(). Referenced by ACE_Dynamic_Message_Queue<>::refresh_queue().
02461 {
02462 ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02463
02464 // refresh priority status boundaries in the queue
02465 if (this->pending_head_)
02466 {
02467 current_status = message_strategy_.priority_status (*this->pending_head_,
02468 current_time);
02469 switch (current_status)
02470 {
02471 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02472 // Make sure the head of the beyond late queue is set (there
02473 // may not have been any beyond late messages previously)
02474 this->beyond_late_head_ = this->head_;
02475
02476 // Zero out the late queue pointers, and set them only if
02477 // there turn out to be late messages in the pending sublist
02478 this->late_head_ = 0;
02479 this->late_tail_ = 0;
02480
02481 // Advance through the beyond late messages in the pending queue
02482 do
02483 {
02484 this->pending_head_ = this->pending_head_->next ();
02485
02486 if (this->pending_head_)
02487 current_status = message_strategy_.priority_status (*this->pending_head_,
02488 current_time);
02489 else
02490 break; // do while
02491
02492 }
02493 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02494
02495 if (this->pending_head_)
02496 {
02497 // point tail of beyond late sublist to previous item
02498 this->beyond_late_tail_ = this->pending_head_->prev ();
02499
02500 if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02501 // there are no late messages left in the queue
02502 break; // switch
02503 else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02504 {
02505 // if we got here, something is *seriously* wrong with the queue
02506 ACE_ERROR_RETURN ((LM_ERROR,
02507 ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02508 (int) current_status),
02509 -1);
02510 }
02511 /* FALLTHRU */
02512 }
02513 else
02514 {
02515 // There are no pending or late messages left in the
02516 // queue.
02517 this->beyond_late_tail_ = this->tail_;
02518 this->pending_head_ = 0;
02519 this->pending_tail_ = 0;
02520 break; // switch
02521 }
02522
02523 case ACE_Dynamic_Message_Strategy::LATE:
02524 // Make sure the head of the late queue is set (there may
02525 // not have been any late messages previously, or they may
02526 // have all become beyond late).
02527 if (this->late_head_ == 0)
02528 this->late_head_ = this->pending_head_;
02529
02530 // advance through the beyond late messages in the pending queue
02531 do
02532 {
02533 this->pending_head_ = this->pending_head_->next ();
02534
02535 if (this->pending_head_)
02536 current_status = message_strategy_.priority_status (*this->pending_head_,
02537 current_time);
02538 else
02539 break; // do while
02540
02541 }
02542 while (current_status == ACE_Dynamic_Message_Strategy::LATE);
02543
02544 if (this->pending_head_)
02545 {
02546 if (current_status != ACE_Dynamic_Message_Strategy::PENDING)
02547 // if we got here, something is *seriously* wrong with the queue
02548 ACE_ERROR_RETURN((LM_ERROR,
02549 ACE_TEXT ("Unexpected message priority status [%d] (expected PENDING)"),
02550 (int) current_status),
02551 -1);
02552
02553 // Point tail of late sublist to previous item
02554 this->late_tail_ = this->pending_head_->prev ();
02555 }
02556 else
02557 {
02558 // there are no pending messages left in the queue
02559 this->late_tail_ = this->tail_;
02560 this->pending_head_ = 0;
02561 this->pending_tail_ = 0;
02562 }
02563
02564 break; // switch
02565 case ACE_Dynamic_Message_Strategy::PENDING:
02566 // do nothing - the pending queue is unchanged
02567 break; // switch
02568 default:
02569 // if we got here, something is *seriously* wrong with the queue
02570 ACE_ERROR_RETURN((LM_ERROR,
02571 ACE_TEXT ("Unknown message priority status [%d]"),
02572 (int) current_status),
02573 -1);
02574 }
02575 }
02576 return 0;
02577 }
|
|
||||||||||
|
Refresh the queue using the strategy specific priority status function. Definition at line 2444 of file Message_Queue_T.cpp. References ACE_Dynamic_Message_Queue<>::refresh_late_queue(), and ACE_Dynamic_Message_Queue<>::refresh_pending_queue(). Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head(), ACE_Dynamic_Message_Queue<>::enqueue_i(), and ACE_Dynamic_Message_Queue<>::remove_messages().
02445 {
02446 int result;
02447
02448 result = refresh_pending_queue (current_time);
02449
02450 if (result != -1)
02451 result = refresh_late_queue (current_time);
02452
02453 return result;
02454 }
|
|
||||||||||||||||||||
|
Detach all messages with status given in the passed flags from the queue and return them by setting passed head and tail pointers to the linked list they comprise. This method is intended primarily as a means of periodically harvesting messages that have missed their deadlines, but is available in its most general form. All messages are returned in priority order, from head to tail, as of the time this method was called. Definition at line 1900 of file Message_Queue_T.cpp. References ACE_BIT_ENABLED, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_OS::gettimeofday(), ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Dynamic_Message_Queue<>::refresh_queue(), and ACE_Message_Block::total_size_and_length().
01903 {
01904 // start with an empty list
01905 list_head = 0;
01906 list_tail = 0;
01907
01908 // Get the current time
01909 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01910
01911 // Refresh priority status boundaries in the queue.
01912 int result = this->refresh_queue (current_time);
01913 if (result < 0)
01914 return result;
01915
01916 if (ACE_BIT_ENABLED (status_flags,
01917 (u_int) ACE_Dynamic_Message_Strategy::PENDING)
01918 && this->pending_head_
01919 && this->pending_tail_)
01920 {
01921 // patch up pointers for the new tail of the queue
01922 if (this->pending_head_->prev ())
01923 {
01924 this->tail_ = this->pending_head_->prev ();
01925 this->pending_head_->prev ()->next (0);
01926 }
01927 else
01928 {
01929 // the list has become empty
01930 this->head_ = 0;
01931 this->tail_ = 0;
01932 }
01933
01934 // point to the head and tail of the list
01935 list_head = this->pending_head_;
01936 list_tail = this->pending_tail_;
01937
01938 // cut the pending messages out of the queue entirely
01939 this->pending_head_->prev (0);
01940 this->pending_head_ = 0;
01941 this->pending_tail_ = 0;
01942 }
01943
01944 if (ACE_BIT_ENABLED (status_flags,
01945 (u_int) ACE_Dynamic_Message_Strategy::LATE)
01946 && this->late_head_
01947 && this->late_tail_)
01948 {
01949 // Patch up pointers for the (possibly) new head and tail of the
01950 // queue.
01951 if (this->late_tail_->next ())
01952 this->late_tail_->next ()->prev (this->late_head_->prev ());
01953 else
01954 this->tail_ = this->late_head_->prev ();
01955
01956 if (this->late_head_->prev ())
01957 this->late_head_->prev ()->next (this->late_tail_->next ());
01958 else
01959 this->head_ = this->late_tail_->next ();
01960
01961 // put late messages behind pending messages (if any) being returned
01962 this->late_head_->prev (list_tail);
01963 if (list_tail)
01964 list_tail->next (this->late_head_);
01965 else
01966 list_head = this->late_head_;
01967
01968 list_tail = this->late_tail_;
01969
01970 this->late_tail_->next (0);
01971 this->late_head_ = 0;
01972 this->late_tail_ = 0;
01973 }
01974
01975 if (ACE_BIT_ENABLED (status_flags,
01976 (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE)
01977 && this->beyond_late_head_
01978 && this->beyond_late_tail_)
01979 {
01980 // Patch up pointers for the new tail of the queue
01981 if (this->beyond_late_tail_->next ())
01982 {
01983 this->head_ = this->beyond_late_tail_->next ();
01984 this->beyond_late_tail_->next ()->prev (0);
01985 }
01986 else
01987 {
01988 // the list has become empty
01989 this->head_ = 0;
01990 this->tail_ = 0;
01991 }
01992
01993 // Put beyond late messages at the end of the list being
01994 // returned.
01995 if (list_tail)
01996 {
01997 this->beyond_late_head_->prev (list_tail);
01998 list_tail->next (this->beyond_late_head_);
01999 }
02000 else
02001 list_head = this->beyond_late_head_;
02002
02003 list_tail = this->beyond_late_tail_;
02004
02005 this->beyond_late_tail_->next (0);
02006 this->beyond_late_head_ = 0;
02007 this->beyond_late_tail_ = 0;
02008 }
02009
02010 // Decrement message and size counts for removed messages.
02011 ACE_Message_Block *temp1;
02012
02013 for (temp1 = list_head;
02014 temp1 != 0;
02015 temp1 = temp1->next ())
02016 {
02017 --this->cur_count_;
02018
02019 size_t mb_bytes = 0;
02020 size_t mb_length = 0;
02021 temp1->total_size_and_length (mb_bytes,
02022 mb_length);
02023 // Subtract off all of the bytes associated with this message.
02024 this->cur_bytes_ -= mb_bytes;
02025 this->cur_length_ -= mb_length;
02026 }
02027
02028 return result;
02029 }
|
|
||||||||||||||||||||||||||||
|
Enqueue a message in priority order within a given priority status sublist.
Definition at line 2252 of file Message_Queue_T.cpp. References ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::msg_priority(), ACE_Message_Block::next(), ACE_Message_Block::prev(), and ACE_Dynamic_Message_Strategy::priority_status(). Referenced by ACE_Dynamic_Message_Queue<>::enqueue_i().
02257 {
02258 int result = 0;
02259 ACE_Message_Block *current_item = 0;
02260
02261 // Find message after which to enqueue new item, based on message
02262 // priority and priority status.
02263 for (current_item = sublist_tail;
02264 current_item;
02265 current_item = current_item->prev ())
02266 {
02267 if (message_strategy_.priority_status (*current_item, current_time) == status)
02268 {
02269 if (current_item->msg_priority () >= new_item->msg_priority ())
02270 break;
02271 }
02272 else
02273 {
02274 sublist_head = new_item;
02275 break;
02276 }
02277 }
02278
02279 if (current_item == 0)
02280 {
02281 // If the new message has highest priority of any, put it at the
02282 // head of the list (and sublist).
02283 new_item->prev (0);
02284 new_item->next (this->head_);
02285 if (this->head_ != 0)
02286 this->head_->prev (new_item);
02287 else
02288 {
02289 this->tail_ = new_item;
02290 sublist_tail = new_item;
02291 }
02292 this->head_ = new_item;
02293 sublist_head = new_item;
02294 }
02295 else
02296 {
02297 // insert the new item into the list
02298 new_item->next (current_item->next ());
02299 new_item->prev (current_item);
02300
02301 if (current_item->next ())
02302 current_item->next ()->prev (new_item);
02303 else
02304 this->tail_ = new_item;
02305
02306 current_item->next (new_item);
02307
02308 // If the new item has lowest priority of any in the sublist,
02309 // move the tail pointer of the sublist back to the new item
02310 if (current_item == sublist_tail)
02311 sublist_tail = new_item;
02312 }
02313
02314 return result;
02315 }
|
|
|||||
|
Declare the dynamic allocation hooks.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 806 of file Message_Queue_T.h. |
|
|||||
|
Pointer to head of the beyond late messages.
Definition at line 860 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
|
|||||
|
Pointer to tail of the beyond late messages.
Definition at line 863 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
|
|||||
|
Pointer to head of the late messages.
Definition at line 854 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
|
|||||
|
Pointer to tail of the late messages.
Definition at line 857 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
|
|||||
|
Pointer to a dynamic priority evaluation function.
Definition at line 866 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dump(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), ACE_Dynamic_Message_Queue<>::sublist_enqueue_i(), and ACE_Dynamic_Message_Queue<>::~ACE_Dynamic_Message_Queue(). |
|
|||||
|
Pointer to head of the pending messages.
Definition at line 848 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
|
|||||
|
Pointer to tail of the pending messages.
Definition at line 851 of file Message_Queue_T.h. Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages(). |
1.3.6