ACE_Dynamic_Message_Queue<> Class Template Reference

A derived class which adapts the ACE_Message_Queue class in order to maintain dynamic priorities for enqueued and manage the queue order according to these dynamic priorities. More...

#include <Message_Queue_T.h>

Inheritance diagram for ACE_Dynamic_Message_Queue<>:

Inheritance graph
[legend]
Collaboration diagram for ACE_Dynamic_Message_Queue<>:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy &message_strategy, size_t hwm=ACE_Message_Queue_Base::DEFAULT_HWM, size_t lwm=ACE_Message_Queue_Base::DEFAULT_LWM, ACE_Notification_Strategy *=0)
virtual ~ACE_Dynamic_Message_Queue (void)
 Close down the message queue and release all resources.

virtual int remove_messages (ACE_Message_Block *&list_head, ACE_Message_Block *&list_tail, u_int status_flags)
virtual int dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0)
virtual void dump (void) const
 Dump the state of the queue.

virtual int enqueue_tail (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0)
virtual int enqueue_head (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0)

Public Attributes

 ACE_ALLOC_HOOK_DECLARE
 Declare the dynamic allocation hooks.


Protected Member Functions

virtual int enqueue_i (ACE_Message_Block *new_item)
virtual int sublist_enqueue_i (ACE_Message_Block *new_item, const ACE_Time_Value &current_time, ACE_Message_Block *&sublist_head, ACE_Message_Block *&sublist_tail, ACE_Dynamic_Message_Strategy::Priority_Status status)
 Enqueue a message in priority order within a given priority status sublist.

virtual int dequeue_head_i (ACE_Message_Block *&first_item)
virtual int refresh_queue (const ACE_Time_Value &current_time)
virtual int refresh_pending_queue (const ACE_Time_Value &current_time)
virtual int refresh_late_queue (const ACE_Time_Value &current_time)

Protected Attributes

ACE_Message_Blockpending_head_
 Pointer to head of the pending messages.

ACE_Message_Blockpending_tail_
 Pointer to tail of the pending messages.

ACE_Message_Blocklate_head_
 Pointer to head of the late messages.

ACE_Message_Blocklate_tail_
 Pointer to tail of the late messages.

ACE_Message_Blockbeyond_late_head_
 Pointer to head of the beyond late messages.

ACE_Message_Blockbeyond_late_tail_
 Pointer to tail of the beyond late messages.

ACE_Dynamic_Message_Strategymessage_strategy_
 Pointer to a dynamic priority evaluation function.


Private Member Functions

void operator= (const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &)
 ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &)
virtual int peek_dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0)
 Private method to hide public base class method: just calls base class method.


Detailed Description

template<ACE_SYNCH_DECL>
class ACE_Dynamic_Message_Queue<>

A derived class which adapts the ACE_Message_Queue class in order to maintain dynamic priorities for enqueued and manage the queue order according to these dynamic priorities.

The messages in the queue are managed so as to preserve a logical ordering with minimal overhead per enqueue and dequeue operation. For this reason, the actual order of messages in the linked list of the queue may differ from their priority order. As time passes, a message may change from pending status to late status, and eventually to beyond late status. To minimize reordering overhead under this design force, three separate boundaries are maintained within the linked list of messages. Messages are dequeued preferentially from the head of the pending portion, then the head of the late portion, and finally from the head of the beyond late portion. In this way, only the boundaries need to be maintained (which can be done efficiently, as aging messages maintain the same linked list order as they progress from one status to the next), with no reordering of the messages themselves, while providing correct priority ordered dequeueing semantics. Head and tail enqueue methods inherited from ACE_Message_Queue are made private to prevent out-of-order messages from confusing management of the various portions of the queue. Messages in the pending portion of the queue whose priority becomes late (according to the specific dynamic strategy) advance into the late portion of the queue. Messages in the late portion of the queue whose priority becomes later than can be represented advance to the beyond_late portion of the queue. These behaviors support a limited schedule overrun, with pending messages prioritized ahead of late messages, and late messages ahead of beyond late messages. These behaviors can be modified in derived classes by providing alternative definitions for the appropriate virtual methods. When filled with messages, the queue's linked list should look like: H T | | B - B - B - B - L - L - L - P - P - P - P - P | | | | | | BH BT LH LT PH PT Where the symbols are as follows: H = Head of the entire list T = Tail of the entire list B = Beyond late message BH = Beyond late messages Head BT = Beyond late messages Tail L = Late message LH = Late messages Head LT = Late messages Tail P = Pending message PH = Pending messages Head PT = Pending messages Tail Caveat: the virtual methods enqueue_tail, enqueue_head, and peek_dequeue_head have semantics for the static message queues that cannot be guaranteed for dynamic message queues. The peek_dequeue_head method just calls the base class method, while the two enqueue methods call the priority enqueue method. The order of messages in the dynamic queue is a function of message deadlines and how long they are in the queues. You can manipulate these in some cases to ensure the correct semantics, but that is not a very stable or portable approach (discouraged).

Definition at line 750 of file Message_Queue_T.h.


Constructor & Destructor Documentation

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Queue<>::ACE_Dynamic_Message_Queue ACE_Dynamic_Message_Strategy message_strategy,
size_t  hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
size_t  lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
ACE_Notification_Strategy = 0
 

Definition at line 1873 of file Message_Queue_T.cpp.

References ACE_SYNCH_USE.

01877   : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
01878     pending_head_ (0),
01879     pending_tail_ (0),
01880     late_head_ (0),
01881     late_tail_ (0),
01882     beyond_late_head_ (0),
01883     beyond_late_tail_ (0),
01884     message_strategy_ (message_strategy)
01885 {
01886   // Note, the ACE_Dynamic_Message_Queue assumes full responsibility
01887   // for the passed ACE_Dynamic_Message_Strategy object, and deletes
01888   // it in its own dtor
01889 }

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Queue<>::~ACE_Dynamic_Message_Queue void   )  [virtual]
 

Close down the message queue and release all resources.

Definition at line 1894 of file Message_Queue_T.cpp.

References ACE_Dynamic_Message_Queue<>::message_strategy_.

01895 {
01896   delete &this->message_strategy_;
01897 }

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Queue<>::ACE_Dynamic_Message_Queue const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &   )  [private]
 


Member Function Documentation

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::dequeue_head ACE_Message_Block *&  first_item,
ACE_Time_Value timeout = 0
[virtual]
 

Dequeue and return the <ACE_Message_Block *> at the head of the queue. Returns -1 on failure, else the number of items still on the queue.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2040 of file Message_Queue_T.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX_T, ACE_TRACE, ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ESHUTDOWN, ACE_OS::gettimeofday(), ACE_Dynamic_Message_Queue<>::refresh_queue(), and ACE_Message_Queue< ACE_SYNCH_USE >::wait_not_empty_cond().

02042 {
02043   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
02044 
02045   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
02046 
02047   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
02048     {
02049       errno = ESHUTDOWN;
02050       return -1;
02051     }
02052 
02053   int result;
02054 
02055   // get the current time
02056   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02057 
02058   // refresh priority status boundaries in the queue
02059   result = this->refresh_queue (current_time);
02060   if (result < 0)
02061     return result;
02062 
02063   // *now* it's appropriate to wait for an enqueued item
02064   result = this->wait_not_empty_cond (ace_mon, timeout);
02065   if (result == -1)
02066     return result;
02067 
02068   // call the internal dequeue method, which selects an item from the
02069   // highest priority status portion of the queue that has messages
02070   // enqueued.
02071   result = this->dequeue_head_i (first_item);
02072 
02073   return result;
02074 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::dequeue_head_i ACE_Message_Block *&  first_item  )  [protected, virtual]
 

Dequeue and return the <ACE_Message_Block *> at the head of the logical queue. Attempts first to dequeue from the pending portion of the queue, or if that is empty from the late portion, or if that is empty from the beyond late portion, or if that is empty just sets the passed pointer to zero and returns -1.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2321 of file Message_Queue_T.cpp.

References ACE_TRACE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Message_Queue< ACE_SYNCH_USE >::signal_enqueue_waiters(), and ACE_Message_Block::total_size_and_length().

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head().

02322 {
02323   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
02324 
02325   int result = 0;
02326   int last_in_subqueue = 0;
02327 
02328   // first, try to dequeue from the head of the pending list
02329   if (this->pending_head_)
02330     {
02331       first_item = this->pending_head_;
02332 
02333       if (0 == this->pending_head_->prev ())
02334         this->head_ = this->pending_head_->next ();
02335       else
02336         this->pending_head_->prev ()->next (this->pending_head_->next ());
02337 
02338       if (0 == this->pending_head_->next ())
02339         {
02340           this->tail_ = this->pending_head_->prev ();
02341           this->pending_head_ = 0;
02342           this->pending_tail_ = 0;
02343         }
02344       else
02345         {
02346           this->pending_head_->next ()->prev (this->pending_head_->prev ());
02347           this->pending_head_ = this->pending_head_->next ();
02348         }
02349 
02350       first_item->prev (0);
02351       first_item->next (0);
02352     }
02353 
02354   // Second, try to dequeue from the head of the late list
02355   else if (this->late_head_)
02356     {
02357       last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0;
02358 
02359       first_item = this->late_head_;
02360 
02361       if (0 == this->late_head_->prev ())
02362         this->head_ = this->late_head_->next ();
02363       else
02364         this->late_head_->prev ()->next (this->late_head_->next ());
02365 
02366       if (0 == this->late_head_->next ())
02367         this->tail_ = this->late_head_->prev ();
02368       else
02369         {
02370           this->late_head_->next ()->prev (this->late_head_->prev ());
02371           this->late_head_ = this->late_head_->next ();
02372         }
02373 
02374       if (last_in_subqueue)
02375         {
02376           this->late_head_ = 0;
02377           this->late_tail_ = 0;
02378         }
02379 
02380       first_item->prev (0);
02381       first_item->next (0);
02382     }
02383   // finally, try to dequeue from the head of the beyond late list
02384   else if (this->beyond_late_head_)
02385     {
02386       last_in_subqueue =
02387         (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0;
02388 
02389       first_item = this->beyond_late_head_;
02390       this->head_ = this->beyond_late_head_->next ();
02391 
02392       if (0 == this->beyond_late_head_->next ())
02393         this->tail_ = this->beyond_late_head_->prev ();
02394       else
02395         {
02396           this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ());
02397           this->beyond_late_head_ = this->beyond_late_head_->next ();
02398         }
02399 
02400       if (last_in_subqueue)
02401         {
02402           this->beyond_late_head_ = 0;
02403           this->beyond_late_tail_ = 0;
02404         }
02405 
02406       first_item->prev (0);
02407       first_item->next (0);
02408     }
02409   else
02410     {
02411       // nothing to dequeue: set the pointer to zero and return an error code
02412       first_item = 0;
02413       result = -1;
02414     }
02415 
02416   if (result < 0)
02417     return result;
02418 
02419   size_t mb_bytes = 0;
02420   size_t mb_length = 0;
02421   first_item->total_size_and_length (mb_bytes,
02422                                      mb_length);
02423   // Subtract off all of the bytes associated with this message.
02424   this->cur_bytes_ -= mb_bytes;
02425   this->cur_length_ -= mb_length;
02426   --this->cur_count_;
02427 
02428   // Only signal enqueueing threads if we've fallen below the low
02429   // water mark.
02430   if (this->cur_bytes_ <= this->low_water_mark_
02431       && this->signal_enqueue_waiters () == -1)
02432     return -1;
02433   else
02434     return this->cur_count_;
02435 }

template<ACE_SYNCH_DECL >
void ACE_Dynamic_Message_Queue<>::dump void   )  const [virtual]
 

Dump the state of the queue.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2080 of file Message_Queue_T.cpp.

References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_TEXT, ACE_TRACE, ACE_Dynamic_Message_Strategy::dump(), ACE_Message_Queue<>::dump(), LM_DEBUG, and ACE_Dynamic_Message_Queue<>::message_strategy_.

02081 {
02082 #if defined (ACE_HAS_DUMP)
02083   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump");
02084   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02085 
02086   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n")));
02087   this->ACE_Message_Queue<ACE_SYNCH_USE>::dump ();
02088 
02089   ACE_DEBUG ((LM_DEBUG,
02090               ACE_TEXT ("pending_head_ = %u\n")
02091               ACE_TEXT ("pending_tail_ = %u\n")
02092               ACE_TEXT ("late_head_ = %u\n")
02093               ACE_TEXT ("late_tail_ = %u\n")
02094               ACE_TEXT ("beyond_late_head_ = %u\n")
02095               ACE_TEXT ("beyond_late_tail_ = %u\n"),
02096               this->pending_head_,
02097               this->pending_tail_,
02098               this->late_head_,
02099               this->late_tail_,
02100               this->beyond_late_head_,
02101               this->beyond_late_tail_));
02102 
02103   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("message_strategy_ : \n")));
02104   message_strategy_.dump ();
02105 
02106   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02107 #endif /* ACE_HAS_DUMP */
02108 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::enqueue_head ACE_Message_Block new_item,
ACE_Time_Value timeout = 0
[virtual]
 

Just call priority enqueue method: head enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2692 of file Message_Queue_T.cpp.

References ACE_TRACE, and ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_prio().

02694 {
02695   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
02696   return this->enqueue_prio (new_item, timeout);
02697 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::enqueue_i ACE_Message_Block new_item  )  [protected, virtual]
 

Enqueue an <ACE_Message_Block *> in accordance with its priority. priority may be *dynamic* or *static* or a combination or *both* It calls the priority evaluation function passed into the Dynamic Message Queue constructor to update the priorities of all enqueued messages.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2112 of file Message_Queue_T.cpp.

References ACE_TRACE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_head_i(), ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_tail_i(), ACE_OS::gettimeofday(), ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Dynamic_Message_Strategy::priority_status(), ACE_Dynamic_Message_Queue<>::refresh_queue(), ACE_Message_Queue< ACE_SYNCH_USE >::signal_dequeue_waiters(), ACE_Dynamic_Message_Queue<>::sublist_enqueue_i(), and ACE_Message_Block::total_size_and_length().

02113 {
02114   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
02115 
02116   if (new_item == 0)
02117     return -1;
02118 
02119   int result = 0;
02120 
02121   // Get the current time.
02122   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02123 
02124   // Refresh priority status boundaries in the queue.
02125 
02126   result = this->refresh_queue (current_time);
02127   if (result < 0)
02128     return result;
02129 
02130   // Where we enqueue depends on the message's priority status.
02131   switch (message_strategy_.priority_status (*new_item,
02132                                              current_time))
02133     {
02134     case ACE_Dynamic_Message_Strategy::PENDING:
02135       if (this->pending_tail_ == 0)
02136         {
02137           // Check for simple case of an empty pending queue, where
02138           // all we need to do is insert <new_item> into the tail of
02139           // the queue.
02140           pending_head_ = new_item;
02141           pending_tail_ = pending_head_;
02142           return this->enqueue_tail_i (new_item);
02143         }
02144       else
02145         {
02146           // Enqueue the new message in priority order in the pending
02147           // sublist
02148           result = sublist_enqueue_i (new_item,
02149                                       current_time,
02150                                       this->pending_head_,
02151                                       this->pending_tail_,
02152                                       ACE_Dynamic_Message_Strategy::PENDING);
02153         }
02154       break;
02155 
02156     case ACE_Dynamic_Message_Strategy::LATE:
02157       if (this->late_tail_ == 0)
02158         {
02159           late_head_ = new_item;
02160           late_tail_ = late_head_;
02161 
02162           if (this->pending_head_ == 0)
02163             // Check for simple case of an empty pending queue,
02164             // where all we need to do is insert <new_item> into the
02165             // tail of the queue.
02166             return this->enqueue_tail_i (new_item);
02167           else if (this->beyond_late_tail_ == 0)
02168             // Check for simple case of an empty beyond late queue, where all
02169             // we need to do is insert <new_item> into the head of the queue.
02170             return this->enqueue_head_i (new_item);
02171           else
02172             {
02173               // Otherwise, we can just splice the new message in
02174               // between the pending and beyond late portions of the
02175               // queue.
02176               this->beyond_late_tail_->next (new_item);
02177               new_item->prev (this->beyond_late_tail_);
02178               this->pending_head_->prev (new_item);
02179               new_item->next (this->pending_head_);
02180             }
02181         }
02182       else
02183         {
02184           // Enqueue the new message in priority order in the late
02185           // sublist
02186           result = sublist_enqueue_i (new_item,
02187                                       current_time,
02188                                       this->late_head_,
02189                                       this->late_tail_,
02190                                       ACE_Dynamic_Message_Strategy::LATE);
02191         }
02192       break;
02193 
02194     case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02195       if (this->beyond_late_tail_ == 0)
02196         {
02197           // Check for simple case of an empty beyond late queue,
02198           // where all we need to do is insert <new_item> into the
02199           // head of the queue.
02200           beyond_late_head_ = new_item;
02201           beyond_late_tail_ = beyond_late_head_;
02202           return this->enqueue_head_i (new_item);
02203         }
02204       else
02205         {
02206           // all beyond late messages have the same (zero) priority,
02207           // so just put the new one at the end of the beyond late
02208           // messages
02209           if (this->beyond_late_tail_->next ())
02210             this->beyond_late_tail_->next ()->prev (new_item);
02211           else
02212             this->tail_ = new_item;
02213 
02214           new_item->next (this->beyond_late_tail_->next ());
02215           this->beyond_late_tail_->next (new_item);
02216           new_item->prev (this->beyond_late_tail_);
02217           this->beyond_late_tail_ = new_item;
02218         }
02219 
02220       break;
02221 
02222       // should never get here, but just in case...
02223     default:
02224       result = -1;
02225       break;
02226     }
02227 
02228   if (result < 0)
02229     return result;
02230 
02231   size_t mb_bytes = 0;
02232   size_t mb_length = 0;
02233   new_item->total_size_and_length (mb_bytes,
02234                                    mb_length);
02235   this->cur_bytes_ += mb_bytes;
02236   this->cur_length_ += mb_length;
02237   ++this->cur_count_;
02238 
02239   if (this->signal_dequeue_waiters () == -1)
02240     return -1;
02241   else
02242     return this->cur_count_;
02243 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::enqueue_tail ACE_Message_Block new_item,
ACE_Time_Value timeout = 0
[virtual]
 

Just call priority enqueue method: tail enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2679 of file Message_Queue_T.cpp.

References ACE_TRACE, and ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_prio().

02681 {
02682   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
02683   return this->enqueue_prio (new_item, timeout);
02684 }

template<ACE_SYNCH_DECL >
void ACE_Dynamic_Message_Queue<>::operator= const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &   )  [private]
 

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::peek_dequeue_head ACE_Message_Block *&  first_item,
ACE_Time_Value timeout = 0
[private, virtual]
 

Private method to hide public base class method: just calls base class method.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2668 of file Message_Queue_T.cpp.

References ACE_Message_Queue<>::peek_dequeue_head().

02670 {
02671   return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item,
02672                                                               timeout);
02673 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::refresh_late_queue const ACE_Time_Value current_time  )  [protected, virtual]
 

Refresh the late queue using the strategy specific priority status function.

Definition at line 2583 of file Message_Queue_T.cpp.

References ACE_ERROR_RETURN, ACE_TEXT, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, LM_ERROR, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Message_Block::prev(), and ACE_Dynamic_Message_Strategy::priority_status().

Referenced by ACE_Dynamic_Message_Queue<>::refresh_queue().

02584 {
02585   ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02586 
02587   if (this->late_head_)
02588     {
02589       current_status = message_strategy_.priority_status (*this->late_head_,
02590                                                           current_time);
02591       switch (current_status)
02592         {
02593         case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02594 
02595           // make sure the head of the beyond late queue is set
02596           // (there may not have been any beyond late messages previously)
02597           this->beyond_late_head_ = this->head_;
02598 
02599           // advance through the beyond late messages in the late queue
02600           do
02601             {
02602               this->late_head_ = this->late_head_->next ();
02603 
02604               if (this->late_head_)
02605                 current_status = message_strategy_.priority_status (*this->late_head_,
02606                                                                     current_time);
02607               else
02608                 break;  // do while
02609 
02610             }
02611           while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02612 
02613           if (this->late_head_)
02614             {
02615               // point tail of beyond late sublist to previous item
02616               this->beyond_late_tail_ = this->late_head_->prev ();
02617 
02618               if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02619                 {
02620                   // there are no late messages left in the queue
02621                   this->late_head_ = 0;
02622                   this->late_tail_ = 0;
02623                 }
02624               else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02625                 // if we got here, something is *seriously* wrong with the queue
02626                 ACE_ERROR_RETURN ((LM_ERROR,
02627                                    ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02628                                    (int) current_status),
02629                                   -1);
02630             }
02631           else
02632             {
02633               // there are no late messages left in the queue
02634               this->beyond_late_tail_ = this->tail_;
02635               this->late_head_ = 0;
02636               this->late_tail_ = 0;
02637             }
02638 
02639           break;  // switch
02640 
02641         case ACE_Dynamic_Message_Strategy::LATE:
02642           // do nothing - the late queue is unchanged
02643           break; // switch
02644 
02645         case ACE_Dynamic_Message_Strategy::PENDING:
02646           // if we got here, something is *seriously* wrong with the queue
02647           ACE_ERROR_RETURN ((LM_ERROR,
02648                              ACE_TEXT ("Unexpected message priority status ")
02649                              ACE_TEXT ("[%d] (expected LATE or BEYOND_LATE)"),
02650                              (int) current_status),
02651                             -1);
02652         default:
02653           // if we got here, something is *seriously* wrong with the queue
02654           ACE_ERROR_RETURN ((LM_ERROR,
02655                              ACE_TEXT ("Unknown message priority status [%d]"),
02656                              (int) current_status),
02657                             -1);
02658         }
02659     }
02660 
02661   return 0;
02662 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::refresh_pending_queue const ACE_Time_Value current_time  )  [protected, virtual]
 

Refresh the pending queue using the strategy specific priority status function.

Definition at line 2460 of file Message_Queue_T.cpp.

References ACE_ERROR_RETURN, ACE_TEXT, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, LM_ERROR, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), and ACE_Dynamic_Message_Strategy::priority_status().

Referenced by ACE_Dynamic_Message_Queue<>::refresh_queue().

02461 {
02462   ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02463 
02464   // refresh priority status boundaries in the queue
02465   if (this->pending_head_)
02466     {
02467       current_status = message_strategy_.priority_status (*this->pending_head_,
02468                                                           current_time);
02469       switch (current_status)
02470         {
02471         case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02472           // Make sure the head of the beyond late queue is set (there
02473           // may not have been any beyond late messages previously)
02474           this->beyond_late_head_ = this->head_;
02475 
02476           // Zero out the late queue pointers, and set them only if
02477           // there turn out to be late messages in the pending sublist
02478           this->late_head_ = 0;
02479           this->late_tail_ = 0;
02480 
02481           // Advance through the beyond late messages in the pending queue
02482           do
02483             {
02484               this->pending_head_ = this->pending_head_->next ();
02485 
02486               if (this->pending_head_)
02487                 current_status = message_strategy_.priority_status (*this->pending_head_,
02488                                                                     current_time);
02489               else
02490                 break;  // do while
02491 
02492             }
02493           while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02494 
02495           if (this->pending_head_)
02496             {
02497               // point tail of beyond late sublist to previous item
02498               this->beyond_late_tail_ = this->pending_head_->prev ();
02499 
02500               if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02501                 // there are no late messages left in the queue
02502                 break; // switch
02503               else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02504                 {
02505                   // if we got here, something is *seriously* wrong with the queue
02506                   ACE_ERROR_RETURN ((LM_ERROR,
02507                                      ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02508                                      (int) current_status),
02509                                     -1);
02510                 }
02511               /* FALLTHRU */
02512             }
02513           else
02514             {
02515               // There are no pending or late messages left in the
02516               // queue.
02517               this->beyond_late_tail_ = this->tail_;
02518               this->pending_head_ = 0;
02519               this->pending_tail_ = 0;
02520               break; // switch
02521             }
02522 
02523         case ACE_Dynamic_Message_Strategy::LATE:
02524           // Make sure the head of the late queue is set (there may
02525           // not have been any late messages previously, or they may
02526           // have all become beyond late).
02527           if (this->late_head_ == 0)
02528             this->late_head_ = this->pending_head_;
02529 
02530           // advance through the beyond late messages in the pending queue
02531           do
02532             {
02533               this->pending_head_ = this->pending_head_->next ();
02534 
02535               if (this->pending_head_)
02536                 current_status = message_strategy_.priority_status (*this->pending_head_,
02537                                                                     current_time);
02538               else
02539                 break;  // do while
02540 
02541             }
02542           while (current_status == ACE_Dynamic_Message_Strategy::LATE);
02543 
02544           if (this->pending_head_)
02545             {
02546               if (current_status != ACE_Dynamic_Message_Strategy::PENDING)
02547                 // if we got here, something is *seriously* wrong with the queue
02548                 ACE_ERROR_RETURN((LM_ERROR,
02549                                   ACE_TEXT ("Unexpected message priority status [%d] (expected PENDING)"),
02550                                   (int) current_status),
02551                                  -1);
02552 
02553               // Point tail of late sublist to previous item
02554               this->late_tail_ = this->pending_head_->prev ();
02555             }
02556           else
02557             {
02558               // there are no pending messages left in the queue
02559               this->late_tail_ = this->tail_;
02560               this->pending_head_ = 0;
02561               this->pending_tail_ = 0;
02562             }
02563 
02564           break; // switch
02565         case ACE_Dynamic_Message_Strategy::PENDING:
02566           // do nothing - the pending queue is unchanged
02567           break; // switch
02568         default:
02569           // if we got here, something is *seriously* wrong with the queue
02570           ACE_ERROR_RETURN((LM_ERROR,
02571                             ACE_TEXT ("Unknown message priority status [%d]"),
02572                             (int) current_status),
02573                            -1);
02574         }
02575     }
02576   return 0;
02577 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::refresh_queue const ACE_Time_Value current_time  )  [protected, virtual]
 

Refresh the queue using the strategy specific priority status function.

Definition at line 2444 of file Message_Queue_T.cpp.

References ACE_Dynamic_Message_Queue<>::refresh_late_queue(), and ACE_Dynamic_Message_Queue<>::refresh_pending_queue().

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head(), ACE_Dynamic_Message_Queue<>::enqueue_i(), and ACE_Dynamic_Message_Queue<>::remove_messages().

02445 {
02446   int result;
02447 
02448   result = refresh_pending_queue (current_time);
02449 
02450   if (result != -1)
02451     result = refresh_late_queue (current_time);
02452 
02453   return result;
02454 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::remove_messages ACE_Message_Block *&  list_head,
ACE_Message_Block *&  list_tail,
u_int  status_flags
[virtual]
 

Detach all messages with status given in the passed flags from the queue and return them by setting passed head and tail pointers to the linked list they comprise. This method is intended primarily as a means of periodically harvesting messages that have missed their deadlines, but is available in its most general form. All messages are returned in priority order, from head to tail, as of the time this method was called.

Definition at line 1900 of file Message_Queue_T.cpp.

References ACE_BIT_ENABLED, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_OS::gettimeofday(), ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Dynamic_Message_Queue<>::refresh_queue(), and ACE_Message_Block::total_size_and_length().

01903 {
01904   // start with an empty list
01905   list_head = 0;
01906   list_tail = 0;
01907 
01908   // Get the current time
01909   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01910 
01911   // Refresh priority status boundaries in the queue.
01912   int result = this->refresh_queue (current_time);
01913   if (result < 0)
01914     return result;
01915 
01916   if (ACE_BIT_ENABLED (status_flags,
01917                        (u_int) ACE_Dynamic_Message_Strategy::PENDING)
01918       && this->pending_head_
01919       && this->pending_tail_)
01920     {
01921       // patch up pointers for the new tail of the queue
01922       if (this->pending_head_->prev ())
01923         {
01924           this->tail_ = this->pending_head_->prev ();
01925           this->pending_head_->prev ()->next (0);
01926         }
01927       else
01928         {
01929           // the list has become empty
01930           this->head_ = 0;
01931           this->tail_ = 0;
01932         }
01933 
01934       // point to the head and tail of the list
01935       list_head = this->pending_head_;
01936       list_tail = this->pending_tail_;
01937 
01938       // cut the pending messages out of the queue entirely
01939       this->pending_head_->prev (0);
01940       this->pending_head_ = 0;
01941       this->pending_tail_ = 0;
01942     }
01943 
01944   if (ACE_BIT_ENABLED (status_flags,
01945                        (u_int) ACE_Dynamic_Message_Strategy::LATE)
01946       && this->late_head_
01947       && this->late_tail_)
01948     {
01949       // Patch up pointers for the (possibly) new head and tail of the
01950       // queue.
01951       if (this->late_tail_->next ())
01952         this->late_tail_->next ()->prev (this->late_head_->prev ());
01953       else
01954         this->tail_ = this->late_head_->prev ();
01955 
01956       if (this->late_head_->prev ())
01957         this->late_head_->prev ()->next (this->late_tail_->next ());
01958       else
01959         this->head_ = this->late_tail_->next ();
01960 
01961       // put late messages behind pending messages (if any) being returned
01962       this->late_head_->prev (list_tail);
01963       if (list_tail)
01964         list_tail->next (this->late_head_);
01965       else
01966         list_head = this->late_head_;
01967 
01968       list_tail = this->late_tail_;
01969 
01970       this->late_tail_->next (0);
01971       this->late_head_ = 0;
01972       this->late_tail_ = 0;
01973     }
01974 
01975   if (ACE_BIT_ENABLED (status_flags,
01976       (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE)
01977       && this->beyond_late_head_
01978       && this->beyond_late_tail_)
01979     {
01980       // Patch up pointers for the new tail of the queue
01981       if (this->beyond_late_tail_->next ())
01982         {
01983           this->head_ = this->beyond_late_tail_->next ();
01984           this->beyond_late_tail_->next ()->prev (0);
01985         }
01986       else
01987         {
01988           // the list has become empty
01989           this->head_ = 0;
01990           this->tail_ = 0;
01991         }
01992 
01993       // Put beyond late messages at the end of the list being
01994       // returned.
01995       if (list_tail)
01996         {
01997           this->beyond_late_head_->prev (list_tail);
01998           list_tail->next (this->beyond_late_head_);
01999         }
02000       else
02001         list_head = this->beyond_late_head_;
02002 
02003       list_tail = this->beyond_late_tail_;
02004 
02005       this->beyond_late_tail_->next (0);
02006       this->beyond_late_head_ = 0;
02007       this->beyond_late_tail_ = 0;
02008     }
02009 
02010   // Decrement message and size counts for removed messages.
02011   ACE_Message_Block *temp1;
02012 
02013   for (temp1 = list_head;
02014        temp1 != 0;
02015        temp1 = temp1->next ())
02016     {
02017       --this->cur_count_;
02018 
02019       size_t mb_bytes = 0;
02020       size_t mb_length = 0;
02021       temp1->total_size_and_length (mb_bytes,
02022                                     mb_length);
02023       // Subtract off all of the bytes associated with this message.
02024       this->cur_bytes_ -= mb_bytes;
02025       this->cur_length_ -= mb_length;
02026     }
02027 
02028   return result;
02029 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::sublist_enqueue_i ACE_Message_Block new_item,
const ACE_Time_Value current_time,
ACE_Message_Block *&  sublist_head,
ACE_Message_Block *&  sublist_tail,
ACE_Dynamic_Message_Strategy::Priority_Status  status
[protected, virtual]
 

Enqueue a message in priority order within a given priority status sublist.

Definition at line 2252 of file Message_Queue_T.cpp.

References ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::msg_priority(), ACE_Message_Block::next(), ACE_Message_Block::prev(), and ACE_Dynamic_Message_Strategy::priority_status().

Referenced by ACE_Dynamic_Message_Queue<>::enqueue_i().

02257 {
02258   int result = 0;
02259   ACE_Message_Block *current_item = 0;
02260 
02261   // Find message after which to enqueue new item, based on message
02262   // priority and priority status.
02263   for (current_item = sublist_tail;
02264        current_item;
02265        current_item = current_item->prev ())
02266     {
02267       if (message_strategy_.priority_status (*current_item, current_time) == status)
02268         {
02269           if (current_item->msg_priority () >= new_item->msg_priority ())
02270             break;
02271         }
02272       else
02273         {
02274           sublist_head = new_item;
02275           break;
02276         }
02277     }
02278 
02279   if (current_item == 0)
02280     {
02281       // If the new message has highest priority of any, put it at the
02282       // head of the list (and sublist).
02283       new_item->prev (0);
02284       new_item->next (this->head_);
02285       if (this->head_ != 0)
02286         this->head_->prev (new_item);
02287       else
02288         {
02289           this->tail_ = new_item;
02290           sublist_tail = new_item;
02291         }
02292       this->head_ = new_item;
02293       sublist_head = new_item;
02294     }
02295   else
02296     {
02297       // insert the new item into the list
02298       new_item->next (current_item->next ());
02299       new_item->prev (current_item);
02300 
02301       if (current_item->next ())
02302         current_item->next ()->prev (new_item);
02303       else
02304         this->tail_ = new_item;
02305 
02306       current_item->next (new_item);
02307 
02308       // If the new item has lowest priority of any in the sublist,
02309       // move the tail pointer of the sublist back to the new item
02310       if (current_item == sublist_tail)
02311         sublist_tail = new_item;
02312     }
02313 
02314   return result;
02315 }


Member Data Documentation

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Queue<>::ACE_ALLOC_HOOK_DECLARE
 

Declare the dynamic allocation hooks.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 806 of file Message_Queue_T.h.

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::beyond_late_head_ [protected]
 

Pointer to head of the beyond late messages.

Definition at line 860 of file Message_Queue_T.h.

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::beyond_late_tail_ [protected]
 

Pointer to tail of the beyond late messages.

Definition at line 863 of file Message_Queue_T.h.

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::late_head_ [protected]
 

Pointer to head of the late messages.

Definition at line 854 of file Message_Queue_T.h.

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::late_tail_ [protected]
 

Pointer to tail of the late messages.

Definition at line 857 of file Message_Queue_T.h.

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Strategy& ACE_Dynamic_Message_Queue<>::message_strategy_ [protected]
 

Pointer to a dynamic priority evaluation function.

Definition at line 866 of file Message_Queue_T.h.

Referenced by ACE_Dynamic_Message_Queue<>::dump(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), ACE_Dynamic_Message_Queue<>::sublist_enqueue_i(), and ACE_Dynamic_Message_Queue<>::~ACE_Dynamic_Message_Queue().

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::pending_head_ [protected]
 

Pointer to head of the pending messages.

Definition at line 848 of file Message_Queue_T.h.

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::pending_tail_ [protected]
 

Pointer to tail of the pending messages.

Definition at line 851 of file Message_Queue_T.h.

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().


The documentation for this class was generated from the following files:
Generated on Sun Jan 27 12:54:34 2008 for ACE by doxygen 1.3.6