ACE_Dynamic_Message_Queue<> Class Template Reference

A derived class which adapts the ACE_Message_Queue class in order to maintain dynamic priorities for enqueued and manage the queue order according to these dynamic priorities. More...

#include <Message_Queue_T.h>

Inheritance diagram for ACE_Dynamic_Message_Queue<>:

Inheritance graph
[legend]
Collaboration diagram for ACE_Dynamic_Message_Queue<>:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy &message_strategy, size_t hwm=ACE_Message_Queue_Base::DEFAULT_HWM, size_t lwm=ACE_Message_Queue_Base::DEFAULT_LWM, ACE_Notification_Strategy *=0)
virtual ~ACE_Dynamic_Message_Queue (void)
 Close down the message queue and release all resources.

virtual int remove_messages (ACE_Message_Block *&list_head, ACE_Message_Block *&list_tail, u_int status_flags)
virtual int dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0)
virtual void dump (void) const
 Dump the state of the queue.

virtual int enqueue_tail (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0)
virtual int enqueue_head (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0)

Public Attributes

 ACE_ALLOC_HOOK_DECLARE
 Declare the dynamic allocation hooks.


Protected Member Functions

virtual int enqueue_i (ACE_Message_Block *new_item)
virtual int sublist_enqueue_i (ACE_Message_Block *new_item, const ACE_Time_Value &current_time, ACE_Message_Block *&sublist_head, ACE_Message_Block *&sublist_tail, ACE_Dynamic_Message_Strategy::Priority_Status status)
 Enqueue a message in priority order within a given priority status sublist.

virtual int dequeue_head_i (ACE_Message_Block *&first_item)
virtual int refresh_queue (const ACE_Time_Value &current_time)
virtual int refresh_pending_queue (const ACE_Time_Value &current_time)
virtual int refresh_late_queue (const ACE_Time_Value &current_time)

Protected Attributes

ACE_Message_Blockpending_head_
 Pointer to head of the pending messages.

ACE_Message_Blockpending_tail_
 Pointer to tail of the pending messages.

ACE_Message_Blocklate_head_
 Pointer to head of the late messages.

ACE_Message_Blocklate_tail_
 Pointer to tail of the late messages.

ACE_Message_Blockbeyond_late_head_
 Pointer to head of the beyond late messages.

ACE_Message_Blockbeyond_late_tail_
 Pointer to tail of the beyond late messages.

ACE_Dynamic_Message_Strategymessage_strategy_
 Pointer to a dynamic priority evaluation function.


Private Member Functions

void operator= (const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &)
 ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &)
virtual int peek_dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0)
 Private method to hide public base class method: just calls base class method.


Detailed Description

template<ACE_SYNCH_DECL>
class ACE_Dynamic_Message_Queue<>

A derived class which adapts the ACE_Message_Queue class in order to maintain dynamic priorities for enqueued and manage the queue order according to these dynamic priorities.

The messages in the queue are managed so as to preserve a logical ordering with minimal overhead per enqueue and dequeue operation. For this reason, the actual order of messages in the linked list of the queue may differ from their priority order. As time passes, a message may change from pending status to late status, and eventually to beyond late status. To minimize reordering overhead under this design force, three separate boundaries are maintained within the linked list of messages. Messages are dequeued preferentially from the head of the pending portion, then the head of the late portion, and finally from the head of the beyond late portion. In this way, only the boundaries need to be maintained (which can be done efficiently, as aging messages maintain the same linked list order as they progress from one status to the next), with no reordering of the messages themselves, while providing correct priority ordered dequeueing semantics. Head and tail enqueue methods inherited from ACE_Message_Queue are made private to prevent out-of-order messages from confusing management of the various portions of the queue. Messages in the pending portion of the queue whose priority becomes late (according to the specific dynamic strategy) advance into the late portion of the queue. Messages in the late portion of the queue whose priority becomes later than can be represented advance to the beyond_late portion of the queue. These behaviors support a limited schedule overrun, with pending messages prioritized ahead of late messages, and late messages ahead of beyond late messages. These behaviors can be modified in derived classes by providing alternative definitions for the appropriate virtual methods. When filled with messages, the queue's linked list should look like: H T | | B - B - B - B - L - L - L - P - P - P - P - P | | | | | | BH BT LH LT PH PT Where the symbols are as follows: H = Head of the entire list T = Tail of the entire list B = Beyond late message BH = Beyond late messages Head BT = Beyond late messages Tail L = Late message LH = Late messages Head LT = Late messages Tail P = Pending message PH = Pending messages Head PT = Pending messages Tail Caveat: the virtual methods enqueue_tail, enqueue_head, and peek_dequeue_head have semantics for the static message queues that cannot be guaranteed for dynamic message queues. The peek_dequeue_head method just calls the base class method, while the two enqueue methods call the priority enqueue method. The order of messages in the dynamic queue is a function of message deadlines and how long they are in the queues. You can manipulate these in some cases to ensure the correct semantics, but that is not a very stable or portable approach (discouraged).

Definition at line 749 of file Message_Queue_T.h.


Constructor & Destructor Documentation

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Queue<>::ACE_Dynamic_Message_Queue ACE_Dynamic_Message_Strategy message_strategy,
size_t  hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
size_t  lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
ACE_Notification_Strategy = 0
 

Definition at line 1765 of file Message_Queue_T.cpp.

References ACE_SYNCH_USE.

01769   : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
01770     pending_head_ (0),
01771     pending_tail_ (0),
01772     late_head_ (0),
01773     late_tail_ (0),
01774     beyond_late_head_ (0),
01775     beyond_late_tail_ (0),
01776     message_strategy_ (message_strategy)
01777 {
01778   // Note, the ACE_Dynamic_Message_Queue assumes full responsibility
01779   // for the passed ACE_Dynamic_Message_Strategy object, and deletes
01780   // it in its own dtor
01781 }

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Queue<>::~ACE_Dynamic_Message_Queue void   )  [virtual]
 

Close down the message queue and release all resources.

Definition at line 1786 of file Message_Queue_T.cpp.

References ACE_Dynamic_Message_Queue<>::message_strategy_.

01787 {
01788   delete &this->message_strategy_;
01789 }

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Queue<>::ACE_Dynamic_Message_Queue const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &   )  [private]
 


Member Function Documentation

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::dequeue_head ACE_Message_Block *&  first_item,
ACE_Time_Value timeout = 0
[virtual]
 

Dequeue and return the <ACE_Message_Block *> at the head of the queue. Returns -1 on failure, else the number of items still on the queue.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 1932 of file Message_Queue_T.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX_T, ACE_TRACE, ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ESHUTDOWN, ACE_OS::gettimeofday(), ACE_Dynamic_Message_Queue<>::refresh_queue(), and ACE_Message_Queue< ACE_SYNCH_USE >::wait_not_empty_cond().

01934 {
01935   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
01936 
01937   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01938 
01939   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01940     {
01941       errno = ESHUTDOWN;
01942       return -1;
01943     }
01944 
01945   int result;
01946 
01947   // get the current time
01948   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01949 
01950   // refresh priority status boundaries in the queue
01951   result = this->refresh_queue (current_time);
01952   if (result < 0)
01953     return result;
01954 
01955   // *now* it's appropriate to wait for an enqueued item
01956   result = this->wait_not_empty_cond (ace_mon, timeout);
01957   if (result == -1)
01958     return result;
01959 
01960   // call the internal dequeue method, which selects an item from the
01961   // highest priority status portion of the queue that has messages
01962   // enqueued.
01963   result = this->dequeue_head_i (first_item);
01964 
01965   return result;
01966 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::dequeue_head_i ACE_Message_Block *&  first_item  )  [protected, virtual]
 

Dequeue and return the <ACE_Message_Block *> at the head of the logical queue. Attempts first to dequeue from the pending portion of the queue, or if that is empty from the late portion, or if that is empty from the beyond late portion, or if that is empty just sets the passed pointer to zero and returns -1.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2213 of file Message_Queue_T.cpp.

References ACE_TRACE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Message_Queue< ACE_SYNCH_USE >::signal_enqueue_waiters(), and ACE_Message_Block::total_size_and_length().

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head().

02214 {
02215   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
02216 
02217   int result = 0;
02218   int last_in_subqueue = 0;
02219 
02220   // first, try to dequeue from the head of the pending list
02221   if (this->pending_head_)
02222     {
02223       first_item = this->pending_head_;
02224 
02225       if (0 == this->pending_head_->prev ())
02226         this->head_ = this->pending_head_->next ();
02227       else
02228         this->pending_head_->prev ()->next (this->pending_head_->next ());
02229 
02230       if (0 == this->pending_head_->next ())
02231         {
02232           this->tail_ = this->pending_head_->prev ();
02233           this->pending_head_ = 0;
02234           this->pending_tail_ = 0;
02235         }
02236       else
02237         {
02238           this->pending_head_->next ()->prev (this->pending_head_->prev ());
02239           this->pending_head_ = this->pending_head_->next ();
02240         }
02241 
02242       first_item->prev (0);
02243       first_item->next (0);
02244     }
02245 
02246   // Second, try to dequeue from the head of the late list
02247   else if (this->late_head_)
02248     {
02249       last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0;
02250 
02251       first_item = this->late_head_;
02252 
02253       if (0 == this->late_head_->prev ())
02254         this->head_ = this->late_head_->next ();
02255       else
02256         this->late_head_->prev ()->next (this->late_head_->next ());
02257 
02258       if (0 == this->late_head_->next ())
02259         this->tail_ = this->late_head_->prev ();
02260       else
02261         {
02262           this->late_head_->next ()->prev (this->late_head_->prev ());
02263           this->late_head_ = this->late_head_->next ();
02264         }
02265 
02266       if (last_in_subqueue)
02267         {
02268           this->late_head_ = 0;
02269           this->late_tail_ = 0;
02270         }
02271 
02272       first_item->prev (0);
02273       first_item->next (0);
02274     }
02275   // finally, try to dequeue from the head of the beyond late list
02276   else if (this->beyond_late_head_)
02277     {
02278       last_in_subqueue =
02279         (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0;
02280 
02281       first_item = this->beyond_late_head_;
02282       this->head_ = this->beyond_late_head_->next ();
02283 
02284       if (0 == this->beyond_late_head_->next ())
02285         this->tail_ = this->beyond_late_head_->prev ();
02286       else
02287         {
02288           this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ());
02289           this->beyond_late_head_ = this->beyond_late_head_->next ();
02290         }
02291 
02292       if (last_in_subqueue)
02293         {
02294           this->beyond_late_head_ = 0;
02295           this->beyond_late_tail_ = 0;
02296         }
02297 
02298       first_item->prev (0);
02299       first_item->next (0);
02300     }
02301   else
02302     {
02303       // nothing to dequeue: set the pointer to zero and return an error code
02304       first_item = 0;
02305       result = -1;
02306     }
02307 
02308   if (result < 0)
02309     return result;
02310 
02311   size_t mb_bytes = 0;
02312   size_t mb_length = 0;
02313   first_item->total_size_and_length (mb_bytes,
02314                                      mb_length);
02315   // Subtract off all of the bytes associated with this message.
02316   this->cur_bytes_ -= mb_bytes;
02317   this->cur_length_ -= mb_length;
02318   --this->cur_count_;
02319 
02320   // Only signal enqueueing threads if we've fallen below the low
02321   // water mark.
02322   if (this->cur_bytes_ <= this->low_water_mark_
02323       && this->signal_enqueue_waiters () == -1)
02324     return -1;
02325   else
02326     return this->cur_count_;
02327 }

template<ACE_SYNCH_DECL >
void ACE_Dynamic_Message_Queue<>::dump void   )  const [virtual]
 

Dump the state of the queue.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 1972 of file Message_Queue_T.cpp.

References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_LIB_TEXT, ACE_TRACE, ACE_Dynamic_Message_Strategy::dump(), ACE_Message_Queue<>::dump(), LM_DEBUG, and ACE_Dynamic_Message_Queue<>::message_strategy_.

01973 {
01974 #if defined (ACE_HAS_DUMP)
01975   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump");
01976   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
01977 
01978   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n")));
01979   this->ACE_Message_Queue<ACE_SYNCH_USE>::dump ();
01980 
01981   ACE_DEBUG ((LM_DEBUG,
01982               ACE_LIB_TEXT ("pending_head_ = %u\n")
01983               ACE_LIB_TEXT ("pending_tail_ = %u\n")
01984               ACE_LIB_TEXT ("late_head_ = %u\n")
01985               ACE_LIB_TEXT ("late_tail_ = %u\n")
01986               ACE_LIB_TEXT ("beyond_late_head_ = %u\n")
01987               ACE_LIB_TEXT ("beyond_late_tail_ = %u\n"),
01988               this->pending_head_,
01989               this->pending_tail_,
01990               this->late_head_,
01991               this->late_tail_,
01992               this->beyond_late_head_,
01993               this->beyond_late_tail_));
01994 
01995   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("message_strategy_ : \n")));
01996   message_strategy_.dump ();
01997 
01998   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
01999 #endif /* ACE_HAS_DUMP */
02000 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::enqueue_head ACE_Message_Block new_item,
ACE_Time_Value timeout = 0
[virtual]
 

Just call priority enqueue method: head enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2584 of file Message_Queue_T.cpp.

References ACE_TRACE, and ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_prio().

02586 {
02587   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
02588   return this->enqueue_prio (new_item, timeout);
02589 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::enqueue_i ACE_Message_Block new_item  )  [protected, virtual]
 

Enqueue an <ACE_Message_Block *> in accordance with its priority. priority may be *dynamic* or *static* or a combination or *both* It calls the priority evaluation function passed into the Dynamic Message Queue constructor to update the priorities of all enqueued messages.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2004 of file Message_Queue_T.cpp.

References ACE_TRACE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_head_i(), ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_tail_i(), ACE_OS::gettimeofday(), ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Dynamic_Message_Strategy::priority_status(), ACE_Dynamic_Message_Queue<>::refresh_queue(), ACE_Message_Queue< ACE_SYNCH_USE >::signal_dequeue_waiters(), ACE_Dynamic_Message_Queue<>::sublist_enqueue_i(), and ACE_Message_Block::total_size_and_length().

02005 {
02006   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
02007 
02008   if (new_item == 0)
02009     return -1;
02010 
02011   int result = 0;
02012 
02013   // Get the current time.
02014   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02015 
02016   // Refresh priority status boundaries in the queue.
02017 
02018   result = this->refresh_queue (current_time);
02019   if (result < 0)
02020     return result;
02021 
02022   // Where we enqueue depends on the message's priority status.
02023   switch (message_strategy_.priority_status (*new_item,
02024                                              current_time))
02025     {
02026     case ACE_Dynamic_Message_Strategy::PENDING:
02027       if (this->pending_tail_ == 0)
02028         {
02029           // Check for simple case of an empty pending queue, where
02030           // all we need to do is insert <new_item> into the tail of
02031           // the queue.
02032           pending_head_ = new_item;
02033           pending_tail_ = pending_head_;
02034           return this->enqueue_tail_i (new_item);
02035         }
02036       else
02037         {
02038           // Enqueue the new message in priority order in the pending
02039           // sublist
02040           result = sublist_enqueue_i (new_item,
02041                                       current_time,
02042                                       this->pending_head_,
02043                                       this->pending_tail_,
02044                                       ACE_Dynamic_Message_Strategy::PENDING);
02045         }
02046       break;
02047 
02048     case ACE_Dynamic_Message_Strategy::LATE:
02049       if (this->late_tail_ == 0)
02050         {
02051           late_head_ = new_item;
02052           late_tail_ = late_head_;
02053 
02054           if (this->pending_head_ == 0)
02055             // Check for simple case of an empty pending queue,
02056             // where all we need to do is insert <new_item> into the
02057             // tail of the queue.
02058             return this->enqueue_tail_i (new_item);
02059           else if (this->beyond_late_tail_ == 0)
02060             // Check for simple case of an empty beyond late queue, where all
02061             // we need to do is insert <new_item> into the head of the queue.
02062             return this->enqueue_head_i (new_item);
02063           else
02064             {
02065               // Otherwise, we can just splice the new message in
02066               // between the pending and beyond late portions of the
02067               // queue.
02068               this->beyond_late_tail_->next (new_item);
02069               new_item->prev (this->beyond_late_tail_);
02070               this->pending_head_->prev (new_item);
02071               new_item->next (this->pending_head_);
02072             }
02073         }
02074       else
02075         {
02076           // Enqueue the new message in priority order in the late
02077           // sublist
02078           result = sublist_enqueue_i (new_item,
02079                                       current_time,
02080                                       this->late_head_,
02081                                       this->late_tail_,
02082                                       ACE_Dynamic_Message_Strategy::LATE);
02083         }
02084       break;
02085 
02086     case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02087       if (this->beyond_late_tail_ == 0)
02088         {
02089           // Check for simple case of an empty beyond late queue,
02090           // where all we need to do is insert <new_item> into the
02091           // head of the queue.
02092           beyond_late_head_ = new_item;
02093           beyond_late_tail_ = beyond_late_head_;
02094           return this->enqueue_head_i (new_item);
02095         }
02096       else
02097         {
02098           // all beyond late messages have the same (zero) priority,
02099           // so just put the new one at the end of the beyond late
02100           // messages
02101           if (this->beyond_late_tail_->next ())
02102             this->beyond_late_tail_->next ()->prev (new_item);
02103           else
02104             this->tail_ = new_item;
02105 
02106           new_item->next (this->beyond_late_tail_->next ());
02107           this->beyond_late_tail_->next (new_item);
02108           new_item->prev (this->beyond_late_tail_);
02109           this->beyond_late_tail_ = new_item;
02110         }
02111 
02112       break;
02113 
02114       // should never get here, but just in case...
02115     default:
02116       result = -1;
02117       break;
02118     }
02119 
02120   if (result < 0)
02121     return result;
02122 
02123   size_t mb_bytes = 0;
02124   size_t mb_length = 0;
02125   new_item->total_size_and_length (mb_bytes,
02126                                    mb_length);
02127   this->cur_bytes_ += mb_bytes;
02128   this->cur_length_ += mb_length;
02129   ++this->cur_count_;
02130 
02131   if (this->signal_dequeue_waiters () == -1)
02132     return -1;
02133   else
02134     return this->cur_count_;
02135 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::enqueue_tail ACE_Message_Block new_item,
ACE_Time_Value timeout = 0
[virtual]
 

Just call priority enqueue method: tail enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2571 of file Message_Queue_T.cpp.

References ACE_TRACE, and ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_prio().

02573 {
02574   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
02575   return this->enqueue_prio (new_item, timeout);
02576 }

template<ACE_SYNCH_DECL >
void ACE_Dynamic_Message_Queue<>::operator= const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &   )  [private]
 

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::peek_dequeue_head ACE_Message_Block *&  first_item,
ACE_Time_Value timeout = 0
[private, virtual]
 

Private method to hide public base class method: just calls base class method.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2560 of file Message_Queue_T.cpp.

References ACE_Message_Queue<>::peek_dequeue_head().

02562 {
02563   return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item,
02564                                                               timeout);
02565 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::refresh_late_queue const ACE_Time_Value current_time  )  [protected, virtual]
 

Refresh the late queue using the strategy specific priority status function.

Definition at line 2475 of file Message_Queue_T.cpp.

References ACE_ERROR_RETURN, ACE_LIB_TEXT, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, LM_ERROR, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Message_Block::prev(), and ACE_Dynamic_Message_Strategy::priority_status().

Referenced by ACE_Dynamic_Message_Queue<>::refresh_queue().

02476 {
02477   ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02478 
02479   if (this->late_head_)
02480     {
02481       current_status = message_strategy_.priority_status (*this->late_head_,
02482                                                           current_time);
02483       switch (current_status)
02484         {
02485         case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02486 
02487           // make sure the head of the beyond late queue is set
02488           // (there may not have been any beyond late messages previously)
02489           this->beyond_late_head_ = this->head_;
02490 
02491           // advance through the beyond late messages in the late queue
02492           do
02493             {
02494               this->late_head_ = this->late_head_->next ();
02495 
02496               if (this->late_head_)
02497                 current_status = message_strategy_.priority_status (*this->late_head_,
02498                                                                     current_time);
02499               else
02500                 break;  // do while
02501 
02502             }
02503           while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02504 
02505           if (this->late_head_)
02506             {
02507               // point tail of beyond late sublist to previous item
02508               this->beyond_late_tail_ = this->late_head_->prev ();
02509 
02510               if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02511                 {
02512                   // there are no late messages left in the queue
02513                   this->late_head_ = 0;
02514                   this->late_tail_ = 0;
02515                 }
02516               else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02517                 // if we got here, something is *seriously* wrong with the queue
02518                 ACE_ERROR_RETURN ((LM_ERROR,
02519                                    ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02520                                    (int) current_status),
02521                                   -1);
02522             }
02523           else
02524             {
02525               // there are no late messages left in the queue
02526               this->beyond_late_tail_ = this->tail_;
02527               this->late_head_ = 0;
02528               this->late_tail_ = 0;
02529             }
02530 
02531           break;  // switch
02532 
02533         case ACE_Dynamic_Message_Strategy::LATE:
02534           // do nothing - the late queue is unchanged
02535           break; // switch
02536 
02537         case ACE_Dynamic_Message_Strategy::PENDING:
02538           // if we got here, something is *seriously* wrong with the queue
02539           ACE_ERROR_RETURN ((LM_ERROR,
02540                              ACE_LIB_TEXT ("Unexpected message priority status ")
02541                              ACE_LIB_TEXT ("[%d] (expected LATE or BEYOND_LATE)"),
02542                              (int) current_status),
02543                             -1);
02544         default:
02545           // if we got here, something is *seriously* wrong with the queue
02546           ACE_ERROR_RETURN ((LM_ERROR,
02547                              ACE_LIB_TEXT ("Unknown message priority status [%d]"),
02548                              (int) current_status),
02549                             -1);
02550         }
02551     }
02552 
02553   return 0;
02554 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::refresh_pending_queue const ACE_Time_Value current_time  )  [protected, virtual]
 

Refresh the pending queue using the strategy specific priority status function.

Definition at line 2352 of file Message_Queue_T.cpp.

References ACE_ERROR_RETURN, ACE_LIB_TEXT, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, LM_ERROR, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), and ACE_Dynamic_Message_Strategy::priority_status().

Referenced by ACE_Dynamic_Message_Queue<>::refresh_queue().

02353 {
02354   ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02355 
02356   // refresh priority status boundaries in the queue
02357   if (this->pending_head_)
02358     {
02359       current_status = message_strategy_.priority_status (*this->pending_head_,
02360                                                           current_time);
02361       switch (current_status)
02362         {
02363         case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02364           // Make sure the head of the beyond late queue is set (there
02365           // may not have been any beyond late messages previously)
02366           this->beyond_late_head_ = this->head_;
02367 
02368           // Zero out the late queue pointers, and set them only if
02369           // there turn out to be late messages in the pending sublist
02370           this->late_head_ = 0;
02371           this->late_tail_ = 0;
02372 
02373           // Advance through the beyond late messages in the pending queue
02374           do
02375             {
02376               this->pending_head_ = this->pending_head_->next ();
02377 
02378               if (this->pending_head_)
02379                 current_status = message_strategy_.priority_status (*this->pending_head_,
02380                                                                     current_time);
02381               else
02382                 break;  // do while
02383 
02384             }
02385           while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02386 
02387           if (this->pending_head_)
02388             {
02389               // point tail of beyond late sublist to previous item
02390               this->beyond_late_tail_ = this->pending_head_->prev ();
02391 
02392               if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02393                 // there are no late messages left in the queue
02394                 break; // switch
02395               else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02396                 {
02397                   // if we got here, something is *seriously* wrong with the queue
02398                   ACE_ERROR_RETURN ((LM_ERROR,
02399                                      ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02400                                      (int) current_status),
02401                                     -1);
02402                 }
02403               /* FALLTHRU */
02404             }
02405           else
02406             {
02407               // There are no pending or late messages left in the
02408               // queue.
02409               this->beyond_late_tail_ = this->tail_;
02410               this->pending_head_ = 0;
02411               this->pending_tail_ = 0;
02412               break; // switch
02413             }
02414 
02415         case ACE_Dynamic_Message_Strategy::LATE:
02416           // Make sure the head of the late queue is set (there may
02417           // not have been any late messages previously, or they may
02418           // have all become beyond late).
02419           if (this->late_head_ == 0)
02420             this->late_head_ = this->pending_head_;
02421 
02422           // advance through the beyond late messages in the pending queue
02423           do
02424             {
02425               this->pending_head_ = this->pending_head_->next ();
02426 
02427               if (this->pending_head_)
02428                 current_status = message_strategy_.priority_status (*this->pending_head_,
02429                                                                     current_time);
02430               else
02431                 break;  // do while
02432 
02433             }
02434           while (current_status == ACE_Dynamic_Message_Strategy::LATE);
02435 
02436           if (this->pending_head_)
02437             {
02438               if (current_status != ACE_Dynamic_Message_Strategy::PENDING)
02439                 // if we got here, something is *seriously* wrong with the queue
02440                 ACE_ERROR_RETURN((LM_ERROR,
02441                                   ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected PENDING)"),
02442                                   (int) current_status),
02443                                  -1);
02444 
02445               // Point tail of late sublist to previous item
02446               this->late_tail_ = this->pending_head_->prev ();
02447             }
02448           else
02449             {
02450               // there are no pending messages left in the queue
02451               this->late_tail_ = this->tail_;
02452               this->pending_head_ = 0;
02453               this->pending_tail_ = 0;
02454             }
02455 
02456           break; // switch
02457         case ACE_Dynamic_Message_Strategy::PENDING:
02458           // do nothing - the pending queue is unchanged
02459           break; // switch
02460         default:
02461           // if we got here, something is *seriously* wrong with the queue
02462           ACE_ERROR_RETURN((LM_ERROR,
02463                             ACE_LIB_TEXT ("Unknown message priority status [%d]"),
02464                             (int) current_status),
02465                            -1);
02466         }
02467     }
02468   return 0;
02469 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::refresh_queue const ACE_Time_Value current_time  )  [protected, virtual]
 

Refresh the queue using the strategy specific priority status function.

Definition at line 2336 of file Message_Queue_T.cpp.

References ACE_Dynamic_Message_Queue<>::refresh_late_queue(), and ACE_Dynamic_Message_Queue<>::refresh_pending_queue().

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head(), ACE_Dynamic_Message_Queue<>::enqueue_i(), and ACE_Dynamic_Message_Queue<>::remove_messages().

02337 {
02338   int result;
02339 
02340   result = refresh_pending_queue (current_time);
02341 
02342   if (result != -1)
02343     result = refresh_late_queue (current_time);
02344 
02345   return result;
02346 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::remove_messages ACE_Message_Block *&  list_head,
ACE_Message_Block *&  list_tail,
u_int  status_flags
[virtual]
 

Detach all messages with status given in the passed flags from the queue and return them by setting passed head and tail pointers to the linked list they comprise. This method is intended primarily as a means of periodically harvesting messages that have missed their deadlines, but is available in its most general form. All messages are returned in priority order, from head to tail, as of the time this method was called.

Definition at line 1792 of file Message_Queue_T.cpp.

References ACE_BIT_ENABLED, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_OS::gettimeofday(), ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Dynamic_Message_Queue<>::refresh_queue(), and ACE_Message_Block::total_size_and_length().

01795 {
01796   // start with an empty list
01797   list_head = 0;
01798   list_tail = 0;
01799 
01800   // Get the current time
01801   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01802 
01803   // Refresh priority status boundaries in the queue.
01804   int result = this->refresh_queue (current_time);
01805   if (result < 0)
01806     return result;
01807 
01808   if (ACE_BIT_ENABLED (status_flags,
01809                        (u_int) ACE_Dynamic_Message_Strategy::PENDING)
01810       && this->pending_head_
01811       && this->pending_tail_)
01812     {
01813       // patch up pointers for the new tail of the queue
01814       if (this->pending_head_->prev ())
01815         {
01816           this->tail_ = this->pending_head_->prev ();
01817           this->pending_head_->prev ()->next (0);
01818         }
01819       else
01820         {
01821           // the list has become empty
01822           this->head_ = 0;
01823           this->tail_ = 0;
01824         }
01825 
01826       // point to the head and tail of the list
01827       list_head = this->pending_head_;
01828       list_tail = this->pending_tail_;
01829 
01830       // cut the pending messages out of the queue entirely
01831       this->pending_head_->prev (0);
01832       this->pending_head_ = 0;
01833       this->pending_tail_ = 0;
01834     }
01835 
01836   if (ACE_BIT_ENABLED (status_flags,
01837                        (u_int) ACE_Dynamic_Message_Strategy::LATE)
01838       && this->late_head_
01839       && this->late_tail_)
01840     {
01841       // Patch up pointers for the (possibly) new head and tail of the
01842       // queue.
01843       if (this->late_tail_->next ())
01844         this->late_tail_->next ()->prev (this->late_head_->prev ());
01845       else
01846         this->tail_ = this->late_head_->prev ();
01847 
01848       if (this->late_head_->prev ())
01849         this->late_head_->prev ()->next (this->late_tail_->next ());
01850       else
01851         this->head_ = this->late_tail_->next ();
01852 
01853       // put late messages behind pending messages (if any) being returned
01854       this->late_head_->prev (list_tail);
01855       if (list_tail)
01856         list_tail->next (this->late_head_);
01857       else
01858         list_head = this->late_head_;
01859 
01860       list_tail = this->late_tail_;
01861 
01862       this->late_tail_->next (0);
01863       this->late_head_ = 0;
01864       this->late_tail_ = 0;
01865     }
01866 
01867   if (ACE_BIT_ENABLED (status_flags,
01868       (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE)
01869       && this->beyond_late_head_
01870       && this->beyond_late_tail_)
01871     {
01872       // Patch up pointers for the new tail of the queue
01873       if (this->beyond_late_tail_->next ())
01874         {
01875           this->head_ = this->beyond_late_tail_->next ();
01876           this->beyond_late_tail_->next ()->prev (0);
01877         }
01878       else
01879         {
01880           // the list has become empty
01881           this->head_ = 0;
01882           this->tail_ = 0;
01883         }
01884 
01885       // Put beyond late messages at the end of the list being
01886       // returned.
01887       if (list_tail)
01888         {
01889           this->beyond_late_head_->prev (list_tail);
01890           list_tail->next (this->beyond_late_head_);
01891         }
01892       else
01893         list_head = this->beyond_late_head_;
01894 
01895       list_tail = this->beyond_late_tail_;
01896 
01897       this->beyond_late_tail_->next (0);
01898       this->beyond_late_head_ = 0;
01899       this->beyond_late_tail_ = 0;
01900     }
01901 
01902   // Decrement message and size counts for removed messages.
01903   ACE_Message_Block *temp1;
01904 
01905   for (temp1 = list_head;
01906        temp1 != 0;
01907        temp1 = temp1->next ())
01908     {
01909       --this->cur_count_;
01910 
01911       size_t mb_bytes = 0;
01912       size_t mb_length = 0;
01913       temp1->total_size_and_length (mb_bytes,
01914                                     mb_length);
01915       // Subtract off all of the bytes associated with this message.
01916       this->cur_bytes_ -= mb_bytes;
01917       this->cur_length_ -= mb_length;
01918     }
01919 
01920   return result;
01921 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::sublist_enqueue_i ACE_Message_Block new_item,
const ACE_Time_Value current_time,
ACE_Message_Block *&  sublist_head,
ACE_Message_Block *&  sublist_tail,
ACE_Dynamic_Message_Strategy::Priority_Status  status
[protected, virtual]
 

Enqueue a message in priority order within a given priority status sublist.

Definition at line 2144 of file Message_Queue_T.cpp.

References ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::msg_priority(), ACE_Message_Block::next(), ACE_Message_Block::prev(), and ACE_Dynamic_Message_Strategy::priority_status().

Referenced by ACE_Dynamic_Message_Queue<>::enqueue_i().

02149 {
02150   int result = 0;
02151   ACE_Message_Block *current_item = 0;
02152 
02153   // Find message after which to enqueue new item, based on message
02154   // priority and priority status.
02155   for (current_item = sublist_tail;
02156        current_item;
02157        current_item = current_item->prev ())
02158     {
02159       if (message_strategy_.priority_status (*current_item, current_time) == status)
02160         {
02161           if (current_item->msg_priority () >= new_item->msg_priority ())
02162             break;
02163         }
02164       else
02165         {
02166           sublist_head = new_item;
02167           break;
02168         }
02169     }
02170 
02171   if (current_item == 0)
02172     {
02173       // If the new message has highest priority of any, put it at the
02174       // head of the list (and sublist).
02175       new_item->prev (0);
02176       new_item->next (this->head_);
02177       if (this->head_ != 0)
02178         this->head_->prev (new_item);
02179       else
02180         {
02181           this->tail_ = new_item;
02182           sublist_tail = new_item;
02183         }
02184       this->head_ = new_item;
02185       sublist_head = new_item;
02186     }
02187   else
02188     {
02189       // insert the new item into the list
02190       new_item->next (current_item->next ());
02191       new_item->prev (current_item);
02192 
02193       if (current_item->next ())
02194         current_item->next ()->prev (new_item);
02195       else
02196         this->tail_ = new_item;
02197 
02198       current_item->next (new_item);
02199 
02200       // If the new item has lowest priority of any in the sublist,
02201       // move the tail pointer of the sublist back to the new item
02202       if (current_item == sublist_tail)
02203         sublist_tail = new_item;
02204     }
02205 
02206   return result;
02207 }


Member Data Documentation

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Queue<>::ACE_ALLOC_HOOK_DECLARE
 

Declare the dynamic allocation hooks.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 805 of file Message_Queue_T.h.

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::beyond_late_head_ [protected]
 

Pointer to head of the beyond late messages.

Definition at line 859 of file Message_Queue_T.h.

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::beyond_late_tail_ [protected]
 

Pointer to tail of the beyond late messages.

Definition at line 862 of file Message_Queue_T.h.

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::late_head_ [protected]
 

Pointer to head of the late messages.

Definition at line 853 of file Message_Queue_T.h.

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::late_tail_ [protected]
 

Pointer to tail of the late messages.

Definition at line 856 of file Message_Queue_T.h.

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Strategy& ACE_Dynamic_Message_Queue<>::message_strategy_ [protected]
 

Pointer to a dynamic priority evaluation function.

Definition at line 865 of file Message_Queue_T.h.

Referenced by ACE_Dynamic_Message_Queue<>::dump(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), ACE_Dynamic_Message_Queue<>::sublist_enqueue_i(), and ACE_Dynamic_Message_Queue<>::~ACE_Dynamic_Message_Queue().

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::pending_head_ [protected]
 

Pointer to head of the pending messages.

Definition at line 847 of file Message_Queue_T.h.

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::pending_tail_ [protected]
 

Pointer to tail of the pending messages.

Definition at line 850 of file Message_Queue_T.h.

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 11:21:57 2006 for ACE by doxygen 1.3.6