ACE_Dynamic_Message_Queue<> Class Template Reference

A derived class which adapts the ACE_Message_Queue class in order to maintain dynamic priorities for enqueued <ACE_Message_Blocks> and manage the queue order according to these dynamic priorities. More...

#include <Message_Queue_T.h>

Inheritance diagram for ACE_Dynamic_Message_Queue<>:

Inheritance graph
[legend]
Collaboration diagram for ACE_Dynamic_Message_Queue<>:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy &message_strategy, size_t hwm=ACE_Message_Queue_Base::DEFAULT_HWM, size_t lwm=ACE_Message_Queue_Base::DEFAULT_LWM, ACE_Notification_Strategy *=0)
virtual ~ACE_Dynamic_Message_Queue (void)
 Close down the message queue and release all resources.
virtual int remove_messages (ACE_Message_Block *&list_head, ACE_Message_Block *&list_tail, u_int status_flags)
virtual int dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0)
virtual void dump (void) const
 Dump the state of the queue.
virtual int enqueue_tail (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0)
virtual int enqueue_head (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0)

Public Attributes

 ACE_ALLOC_HOOK_DECLARE
 Declare the dynamic allocation hooks.

Protected Member Functions

virtual int enqueue_i (ACE_Message_Block *new_item)
virtual int sublist_enqueue_i (ACE_Message_Block *new_item, const ACE_Time_Value &current_time, ACE_Message_Block *&sublist_head, ACE_Message_Block *&sublist_tail, ACE_Dynamic_Message_Strategy::Priority_Status status)
 Enqueue a message in priority order within a given priority status sublist.
virtual int dequeue_head_i (ACE_Message_Block *&first_item)
virtual int refresh_queue (const ACE_Time_Value &current_time)
virtual int refresh_pending_queue (const ACE_Time_Value &current_time)
virtual int refresh_late_queue (const ACE_Time_Value &current_time)

Protected Attributes

ACE_Message_Blockpending_head_
 Pointer to head of the pending messages.
ACE_Message_Blockpending_tail_
 Pointer to tail of the pending messages.
ACE_Message_Blocklate_head_
 Pointer to head of the late messages.
ACE_Message_Blocklate_tail_
 Pointer to tail of the late messages.
ACE_Message_Blockbeyond_late_head_
 Pointer to head of the beyond late messages.
ACE_Message_Blockbeyond_late_tail_
 Pointer to tail of the beyond late messages.
ACE_Dynamic_Message_Strategymessage_strategy_
 Pointer to a dynamic priority evaluation function.

Private Member Functions

void operator= (const ACE_Dynamic_Message_Queue< _ACE_SYNCH_MUTEX_T, _ACE_SYNCH_CONDITION_T > &)
 ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue< _ACE_SYNCH_MUTEX_T, _ACE_SYNCH_CONDITION_T > &)
virtual int peek_dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0)
 Private method to hide public base class method: just calls base class method.

Detailed Description

template<ACE_SYNCH_DECL>
class ACE_Dynamic_Message_Queue<>

A derived class which adapts the ACE_Message_Queue class in order to maintain dynamic priorities for enqueued <ACE_Message_Blocks> and manage the queue order according to these dynamic priorities.

The messages in the queue are managed so as to preserve a logical ordering with minimal overhead per enqueue and dequeue operation. For this reason, the actual order of messages in the linked list of the queue may differ from their priority order. As time passes, a message may change from pending status to late status, and eventually to beyond late status. To minimize reordering overhead under this design force, three separate boundaries are maintained within the linked list of messages. Messages are dequeued preferentially from the head of the pending portion, then the head of the late portion, and finally from the head of the beyond late portion. In this way, only the boundaries need to be maintained (which can be done efficiently, as aging messages maintain the same linked list order as they progress from one status to the next), with no reordering of the messages themselves, while providing correct priority ordered dequeueing semantics. Head and tail enqueue methods inherited from ACE_Message_Queue are made private to prevent out-of-order messages from confusing management of the various portions of the queue. Messages in the pending portion of the queue whose priority becomes late (according to the specific dynamic strategy) advance into the late portion of the queue. Messages in the late portion of the queue whose priority becomes later than can be represented advance to the beyond_late portion of the queue. These behaviors support a limited schedule overrun, with pending messages prioritized ahead of late messages, and late messages ahead of beyond late messages. These behaviors can be modified in derived classes by providing alternative definitions for the appropriate virtual methods. When filled with messages, the queue's linked list should look like: H T | | B - B - B - B - L - L - L - P - P - P - P - P | | | | | | BH BT LH LT PH PT Where the symbols are as follows: H = Head of the entire list T = Tail of the entire list B = Beyond late message BH = Beyond late messages Head BT = Beyond late messages Tail L = Late message LH = Late messages Head LT = Late messages Tail P = Pending message PH = Pending messages Head PT = Pending messages Tail Caveat: the virtual methods enqueue_tail, enqueue_head, and peek_dequeue_head have semantics for the static message queues that cannot be guaranteed for dynamic message queues. The peek_dequeue_head method just calls the base class method, while the two enqueue methods call the priority enqueue method. The order of messages in the dynamic queue is a function of message deadlines and how long they are in the queues. You can manipulate these in some cases to ensure the correct semantics, but that is not a very stable or portable approach (discouraged).

Definition at line 767 of file Message_Queue_T.h.


Constructor & Destructor Documentation

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Queue<>::ACE_Dynamic_Message_Queue ( ACE_Dynamic_Message_Strategy message_strategy,
size_t  hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
size_t  lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
ACE_Notification_Strategy = 0 
)

Definition at line 1913 of file Message_Queue_T.cpp.

01917   : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
01918     pending_head_ (0),
01919     pending_tail_ (0),
01920     late_head_ (0),
01921     late_tail_ (0),
01922     beyond_late_head_ (0),
01923     beyond_late_tail_ (0),
01924     message_strategy_ (message_strategy)
01925 {
01926   // Note, the ACE_Dynamic_Message_Queue assumes full responsibility
01927   // for the passed ACE_Dynamic_Message_Strategy object, and deletes
01928   // it in its own dtor
01929 }

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Queue<>::~ACE_Dynamic_Message_Queue ( void   )  [virtual]

Close down the message queue and release all resources.

Definition at line 1934 of file Message_Queue_T.cpp.

References ACE_Dynamic_Message_Queue<>::message_strategy_.

01935 {
01936   delete &this->message_strategy_;
01937 }

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Queue<>::ACE_Dynamic_Message_Queue ( const ACE_Dynamic_Message_Queue< _ACE_SYNCH_MUTEX_T, _ACE_SYNCH_CONDITION_T > &   )  [private]


Member Function Documentation

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::dequeue_head ( ACE_Message_Block *&  first_item,
ACE_Time_Value timeout = 0 
) [virtual]

Dequeue and return the <ACE_Message_Block *> at the head of the queue. Returns -1 on failure, else the number of items still on the queue.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2080 of file Message_Queue_T.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX_T, ACE_TRACE, ACE_Message_Queue_Base::DEACTIVATED, ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_OS::gettimeofday(), ACE_Dynamic_Message_Queue<>::refresh_queue(), and ACE_Message_Queue< ACE_SYNCH_USE >::wait_not_empty_cond().

02082 {
02083   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
02084 
02085   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
02086 
02087   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
02088     {
02089       errno = ESHUTDOWN;
02090       return -1;
02091     }
02092 
02093   int result;
02094 
02095   // get the current time
02096   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02097 
02098   // refresh priority status boundaries in the queue
02099   result = this->refresh_queue (current_time);
02100   if (result < 0)
02101     return result;
02102 
02103   // *now* it's appropriate to wait for an enqueued item
02104   result = this->wait_not_empty_cond (ace_mon, timeout);
02105   if (result == -1)
02106     return result;
02107 
02108   // call the internal dequeue method, which selects an item from the
02109   // highest priority status portion of the queue that has messages
02110   // enqueued.
02111   result = this->dequeue_head_i (first_item);
02112 
02113   return result;
02114 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::dequeue_head_i ( ACE_Message_Block *&  first_item  )  [protected, virtual]

Dequeue and return the <ACE_Message_Block *> at the head of the logical queue. Attempts first to dequeue from the pending portion of the queue, or if that is empty from the late portion, or if that is empty from the beyond late portion, or if that is empty just sets the passed pointer to zero and returns -1.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2361 of file Message_Queue_T.cpp.

References ACE_TRACE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_bytes_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_count_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_length_, ACE_Message_Queue< ACE_SYNCH_USE >::head_, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Message_Block::next(), ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), and ACE_Message_Block::total_size_and_length().

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head().

02362 {
02363   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
02364 
02365   int result = 0;
02366   int last_in_subqueue = 0;
02367 
02368   // first, try to dequeue from the head of the pending list
02369   if (this->pending_head_)
02370     {
02371       first_item = this->pending_head_;
02372 
02373       if (0 == this->pending_head_->prev ())
02374         this->head_ = this->pending_head_->next ();
02375       else
02376         this->pending_head_->prev ()->next (this->pending_head_->next ());
02377 
02378       if (0 == this->pending_head_->next ())
02379         {
02380           this->tail_ = this->pending_head_->prev ();
02381           this->pending_head_ = 0;
02382           this->pending_tail_ = 0;
02383         }
02384       else
02385         {
02386           this->pending_head_->next ()->prev (this->pending_head_->prev ());
02387           this->pending_head_ = this->pending_head_->next ();
02388         }
02389 
02390       first_item->prev (0);
02391       first_item->next (0);
02392     }
02393 
02394   // Second, try to dequeue from the head of the late list
02395   else if (this->late_head_)
02396     {
02397       last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0;
02398 
02399       first_item = this->late_head_;
02400 
02401       if (0 == this->late_head_->prev ())
02402         this->head_ = this->late_head_->next ();
02403       else
02404         this->late_head_->prev ()->next (this->late_head_->next ());
02405 
02406       if (0 == this->late_head_->next ())
02407         this->tail_ = this->late_head_->prev ();
02408       else
02409         {
02410           this->late_head_->next ()->prev (this->late_head_->prev ());
02411           this->late_head_ = this->late_head_->next ();
02412         }
02413 
02414       if (last_in_subqueue)
02415         {
02416           this->late_head_ = 0;
02417           this->late_tail_ = 0;
02418         }
02419 
02420       first_item->prev (0);
02421       first_item->next (0);
02422     }
02423   // finally, try to dequeue from the head of the beyond late list
02424   else if (this->beyond_late_head_)
02425     {
02426       last_in_subqueue =
02427         (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0;
02428 
02429       first_item = this->beyond_late_head_;
02430       this->head_ = this->beyond_late_head_->next ();
02431 
02432       if (0 == this->beyond_late_head_->next ())
02433         this->tail_ = this->beyond_late_head_->prev ();
02434       else
02435         {
02436           this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ());
02437           this->beyond_late_head_ = this->beyond_late_head_->next ();
02438         }
02439 
02440       if (last_in_subqueue)
02441         {
02442           this->beyond_late_head_ = 0;
02443           this->beyond_late_tail_ = 0;
02444         }
02445 
02446       first_item->prev (0);
02447       first_item->next (0);
02448     }
02449   else
02450     {
02451       // nothing to dequeue: set the pointer to zero and return an error code
02452       first_item = 0;
02453       result = -1;
02454     }
02455 
02456   if (result < 0)
02457     return result;
02458 
02459   size_t mb_bytes = 0;
02460   size_t mb_length = 0;
02461   first_item->total_size_and_length (mb_bytes,
02462                                      mb_length);
02463   // Subtract off all of the bytes associated with this message.
02464   this->cur_bytes_ -= mb_bytes;
02465   this->cur_length_ -= mb_length;
02466   --this->cur_count_;
02467 
02468   // Only signal enqueueing threads if we've fallen below the low
02469   // water mark.
02470   if (this->cur_bytes_ <= this->low_water_mark_
02471       && this->signal_enqueue_waiters () == -1)
02472     return -1;
02473   else
02474     return this->cur_count_;
02475 }

template<ACE_SYNCH_DECL >
void ACE_Dynamic_Message_Queue<>::dump ( void   )  const [virtual]

Dump the state of the queue.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2120 of file Message_Queue_T.cpp.

References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_TEXT, ACE_TRACE, ACE_Dynamic_Message_Strategy::dump(), ACE_Message_Queue<>::dump(), LM_DEBUG, and ACE_Dynamic_Message_Queue<>::message_strategy_.

02121 {
02122 #if defined (ACE_HAS_DUMP)
02123   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump");
02124   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02125 
02126   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n")));
02127   this->ACE_Message_Queue<ACE_SYNCH_USE>::dump ();
02128 
02129   ACE_DEBUG ((LM_DEBUG,
02130               ACE_TEXT ("pending_head_ = %u\n")
02131               ACE_TEXT ("pending_tail_ = %u\n")
02132               ACE_TEXT ("late_head_ = %u\n")
02133               ACE_TEXT ("late_tail_ = %u\n")
02134               ACE_TEXT ("beyond_late_head_ = %u\n")
02135               ACE_TEXT ("beyond_late_tail_ = %u\n"),
02136               this->pending_head_,
02137               this->pending_tail_,
02138               this->late_head_,
02139               this->late_tail_,
02140               this->beyond_late_head_,
02141               this->beyond_late_tail_));
02142 
02143   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("message_strategy_ : \n")));
02144   message_strategy_.dump ();
02145 
02146   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02147 #endif /* ACE_HAS_DUMP */
02148 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::enqueue_head ( ACE_Message_Block new_item,
ACE_Time_Value timeout = 0 
) [virtual]

Just call priority enqueue method: head enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2732 of file Message_Queue_T.cpp.

References ACE_TRACE, and ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_prio().

02734 {
02735   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
02736   return this->enqueue_prio (new_item, timeout);
02737 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::enqueue_i ( ACE_Message_Block new_item  )  [protected, virtual]

Enqueue an <ACE_Message_Block *> in accordance with its priority. priority may be *dynamic* or *static* or a combination or *both* It calls the priority evaluation function passed into the Dynamic Message Queue constructor to update the priorities of all enqueued messages.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2152 of file Message_Queue_T.cpp.

References ACE_TRACE, ACE_Dynamic_Message_Strategy::BEYOND_LATE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_bytes_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_count_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_length_, ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_head_i(), ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_tail_i(), ACE_OS::gettimeofday(), ACE_Dynamic_Message_Strategy::LATE, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Dynamic_Message_Strategy::PENDING, ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Dynamic_Message_Strategy::priority_status(), ACE_Dynamic_Message_Queue<>::refresh_queue(), ACE_Dynamic_Message_Queue<>::sublist_enqueue_i(), ACE_Message_Queue< ACE_SYNCH_USE >::tail_, and ACE_Message_Block::total_size_and_length().

02153 {
02154   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
02155 
02156   if (new_item == 0)
02157     return -1;
02158 
02159   int result = 0;
02160 
02161   // Get the current time.
02162   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02163 
02164   // Refresh priority status boundaries in the queue.
02165 
02166   result = this->refresh_queue (current_time);
02167   if (result < 0)
02168     return result;
02169 
02170   // Where we enqueue depends on the message's priority status.
02171   switch (message_strategy_.priority_status (*new_item,
02172                                              current_time))
02173     {
02174     case ACE_Dynamic_Message_Strategy::PENDING:
02175       if (this->pending_tail_ == 0)
02176         {
02177           // Check for simple case of an empty pending queue, where
02178           // all we need to do is insert <new_item> into the tail of
02179           // the queue.
02180           pending_head_ = new_item;
02181           pending_tail_ = pending_head_;
02182           return this->enqueue_tail_i (new_item);
02183         }
02184       else
02185         {
02186           // Enqueue the new message in priority order in the pending
02187           // sublist
02188           result = sublist_enqueue_i (new_item,
02189                                       current_time,
02190                                       this->pending_head_,
02191                                       this->pending_tail_,
02192                                       ACE_Dynamic_Message_Strategy::PENDING);
02193         }
02194       break;
02195 
02196     case ACE_Dynamic_Message_Strategy::LATE:
02197       if (this->late_tail_ == 0)
02198         {
02199           late_head_ = new_item;
02200           late_tail_ = late_head_;
02201 
02202           if (this->pending_head_ == 0)
02203             // Check for simple case of an empty pending queue,
02204             // where all we need to do is insert <new_item> into the
02205             // tail of the queue.
02206             return this->enqueue_tail_i (new_item);
02207           else if (this->beyond_late_tail_ == 0)
02208             // Check for simple case of an empty beyond late queue, where all
02209             // we need to do is insert <new_item> into the head of the queue.
02210             return this->enqueue_head_i (new_item);
02211           else
02212             {
02213               // Otherwise, we can just splice the new message in
02214               // between the pending and beyond late portions of the
02215               // queue.
02216               this->beyond_late_tail_->next (new_item);
02217               new_item->prev (this->beyond_late_tail_);
02218               this->pending_head_->prev (new_item);
02219               new_item->next (this->pending_head_);
02220             }
02221         }
02222       else
02223         {
02224           // Enqueue the new message in priority order in the late
02225           // sublist
02226           result = sublist_enqueue_i (new_item,
02227                                       current_time,
02228                                       this->late_head_,
02229                                       this->late_tail_,
02230                                       ACE_Dynamic_Message_Strategy::LATE);
02231         }
02232       break;
02233 
02234     case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02235       if (this->beyond_late_tail_ == 0)
02236         {
02237           // Check for simple case of an empty beyond late queue,
02238           // where all we need to do is insert <new_item> into the
02239           // head of the queue.
02240           beyond_late_head_ = new_item;
02241           beyond_late_tail_ = beyond_late_head_;
02242           return this->enqueue_head_i (new_item);
02243         }
02244       else
02245         {
02246           // all beyond late messages have the same (zero) priority,
02247           // so just put the new one at the end of the beyond late
02248           // messages
02249           if (this->beyond_late_tail_->next ())
02250             this->beyond_late_tail_->next ()->prev (new_item);
02251           else
02252             this->tail_ = new_item;
02253 
02254           new_item->next (this->beyond_late_tail_->next ());
02255           this->beyond_late_tail_->next (new_item);
02256           new_item->prev (this->beyond_late_tail_);
02257           this->beyond_late_tail_ = new_item;
02258         }
02259 
02260       break;
02261 
02262       // should never get here, but just in case...
02263     default:
02264       result = -1;
02265       break;
02266     }
02267 
02268   if (result < 0)
02269     return result;
02270 
02271   size_t mb_bytes = 0;
02272   size_t mb_length = 0;
02273   new_item->total_size_and_length (mb_bytes,
02274                                    mb_length);
02275   this->cur_bytes_ += mb_bytes;
02276   this->cur_length_ += mb_length;
02277   ++this->cur_count_;
02278 
02279   if (this->signal_dequeue_waiters () == -1)
02280     return -1;
02281   else
02282     return this->cur_count_;
02283 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::enqueue_tail ( ACE_Message_Block new_item,
ACE_Time_Value timeout = 0 
) [virtual]

Just call priority enqueue method: tail enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2719 of file Message_Queue_T.cpp.

References ACE_TRACE, and ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_prio().

02721 {
02722   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
02723   return this->enqueue_prio (new_item, timeout);
02724 }

template<ACE_SYNCH_DECL >
void ACE_Dynamic_Message_Queue<>::operator= ( const ACE_Dynamic_Message_Queue< _ACE_SYNCH_MUTEX_T, _ACE_SYNCH_CONDITION_T > &   )  [private]

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::peek_dequeue_head ( ACE_Message_Block *&  first_item,
ACE_Time_Value timeout = 0 
) [private, virtual]

Private method to hide public base class method: just calls base class method.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2708 of file Message_Queue_T.cpp.

References ACE_Message_Queue<>::peek_dequeue_head().

02710 {
02711   return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item,
02712                                                               timeout);
02713 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::refresh_late_queue ( const ACE_Time_Value current_time  )  [protected, virtual]

Refresh the late queue using the strategy specific priority status function.

Definition at line 2623 of file Message_Queue_T.cpp.

References ACE_ERROR_RETURN, ACE_TEXT, ACE_Dynamic_Message_Strategy::BEYOND_LATE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::head_, ACE_Dynamic_Message_Strategy::LATE, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, LM_ERROR, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Dynamic_Message_Strategy::PENDING, ACE_Message_Block::prev(), ACE_Dynamic_Message_Strategy::priority_status(), and ACE_Message_Queue< ACE_SYNCH_USE >::tail_.

Referenced by ACE_Dynamic_Message_Queue<>::refresh_queue().

02624 {
02625   ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02626 
02627   if (this->late_head_)
02628     {
02629       current_status = message_strategy_.priority_status (*this->late_head_,
02630                                                           current_time);
02631       switch (current_status)
02632         {
02633         case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02634 
02635           // make sure the head of the beyond late queue is set
02636           // (there may not have been any beyond late messages previously)
02637           this->beyond_late_head_ = this->head_;
02638 
02639           // advance through the beyond late messages in the late queue
02640           do
02641             {
02642               this->late_head_ = this->late_head_->next ();
02643 
02644               if (this->late_head_)
02645                 current_status = message_strategy_.priority_status (*this->late_head_,
02646                                                                     current_time);
02647               else
02648                 break;  // do while
02649 
02650             }
02651           while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02652 
02653           if (this->late_head_)
02654             {
02655               // point tail of beyond late sublist to previous item
02656               this->beyond_late_tail_ = this->late_head_->prev ();
02657 
02658               if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02659                 {
02660                   // there are no late messages left in the queue
02661                   this->late_head_ = 0;
02662                   this->late_tail_ = 0;
02663                 }
02664               else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02665                 // if we got here, something is *seriously* wrong with the queue
02666                 ACE_ERROR_RETURN ((LM_ERROR,
02667                                    ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02668                                    (int) current_status),
02669                                   -1);
02670             }
02671           else
02672             {
02673               // there are no late messages left in the queue
02674               this->beyond_late_tail_ = this->tail_;
02675               this->late_head_ = 0;
02676               this->late_tail_ = 0;
02677             }
02678 
02679           break;  // switch
02680 
02681         case ACE_Dynamic_Message_Strategy::LATE:
02682           // do nothing - the late queue is unchanged
02683           break; // switch
02684 
02685         case ACE_Dynamic_Message_Strategy::PENDING:
02686           // if we got here, something is *seriously* wrong with the queue
02687           ACE_ERROR_RETURN ((LM_ERROR,
02688                              ACE_TEXT ("Unexpected message priority status ")
02689                              ACE_TEXT ("[%d] (expected LATE or BEYOND_LATE)"),
02690                              (int) current_status),
02691                             -1);
02692         default:
02693           // if we got here, something is *seriously* wrong with the queue
02694           ACE_ERROR_RETURN ((LM_ERROR,
02695                              ACE_TEXT ("Unknown message priority status [%d]"),
02696                              (int) current_status),
02697                             -1);
02698         }
02699     }
02700 
02701   return 0;
02702 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::refresh_pending_queue ( const ACE_Time_Value current_time  )  [protected, virtual]

Refresh the pending queue using the strategy specific priority status function.

Definition at line 2500 of file Message_Queue_T.cpp.

References ACE_ERROR_RETURN, ACE_TEXT, ACE_Dynamic_Message_Strategy::BEYOND_LATE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::head_, ACE_Dynamic_Message_Strategy::LATE, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, LM_ERROR, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::next(), ACE_Dynamic_Message_Strategy::PENDING, ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Dynamic_Message_Strategy::priority_status(), and ACE_Message_Queue< ACE_SYNCH_USE >::tail_.

Referenced by ACE_Dynamic_Message_Queue<>::refresh_queue().

02501 {
02502   ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02503 
02504   // refresh priority status boundaries in the queue
02505   if (this->pending_head_)
02506     {
02507       current_status = message_strategy_.priority_status (*this->pending_head_,
02508                                                           current_time);
02509       switch (current_status)
02510         {
02511         case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02512           // Make sure the head of the beyond late queue is set (there
02513           // may not have been any beyond late messages previously)
02514           this->beyond_late_head_ = this->head_;
02515 
02516           // Zero out the late queue pointers, and set them only if
02517           // there turn out to be late messages in the pending sublist
02518           this->late_head_ = 0;
02519           this->late_tail_ = 0;
02520 
02521           // Advance through the beyond late messages in the pending queue
02522           do
02523             {
02524               this->pending_head_ = this->pending_head_->next ();
02525 
02526               if (this->pending_head_)
02527                 current_status = message_strategy_.priority_status (*this->pending_head_,
02528                                                                     current_time);
02529               else
02530                 break;  // do while
02531 
02532             }
02533           while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02534 
02535           if (this->pending_head_)
02536             {
02537               // point tail of beyond late sublist to previous item
02538               this->beyond_late_tail_ = this->pending_head_->prev ();
02539 
02540               if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02541                 // there are no late messages left in the queue
02542                 break; // switch
02543               else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02544                 {
02545                   // if we got here, something is *seriously* wrong with the queue
02546                   ACE_ERROR_RETURN ((LM_ERROR,
02547                                      ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02548                                      (int) current_status),
02549                                     -1);
02550                 }
02551               /* FALLTHRU */
02552             }
02553           else
02554             {
02555               // There are no pending or late messages left in the
02556               // queue.
02557               this->beyond_late_tail_ = this->tail_;
02558               this->pending_head_ = 0;
02559               this->pending_tail_ = 0;
02560               break; // switch
02561             }
02562 
02563         case ACE_Dynamic_Message_Strategy::LATE:
02564           // Make sure the head of the late queue is set (there may
02565           // not have been any late messages previously, or they may
02566           // have all become beyond late).
02567           if (this->late_head_ == 0)
02568             this->late_head_ = this->pending_head_;
02569 
02570           // advance through the beyond late messages in the pending queue
02571           do
02572             {
02573               this->pending_head_ = this->pending_head_->next ();
02574 
02575               if (this->pending_head_)
02576                 current_status = message_strategy_.priority_status (*this->pending_head_,
02577                                                                     current_time);
02578               else
02579                 break;  // do while
02580 
02581             }
02582           while (current_status == ACE_Dynamic_Message_Strategy::LATE);
02583 
02584           if (this->pending_head_)
02585             {
02586               if (current_status != ACE_Dynamic_Message_Strategy::PENDING)
02587                 // if we got here, something is *seriously* wrong with the queue
02588                 ACE_ERROR_RETURN((LM_ERROR,
02589                                   ACE_TEXT ("Unexpected message priority status [%d] (expected PENDING)"),
02590                                   (int) current_status),
02591                                  -1);
02592 
02593               // Point tail of late sublist to previous item
02594               this->late_tail_ = this->pending_head_->prev ();
02595             }
02596           else
02597             {
02598               // there are no pending messages left in the queue
02599               this->late_tail_ = this->tail_;
02600               this->pending_head_ = 0;
02601               this->pending_tail_ = 0;
02602             }
02603 
02604           break; // switch
02605         case ACE_Dynamic_Message_Strategy::PENDING:
02606           // do nothing - the pending queue is unchanged
02607           break; // switch
02608         default:
02609           // if we got here, something is *seriously* wrong with the queue
02610           ACE_ERROR_RETURN((LM_ERROR,
02611                             ACE_TEXT ("Unknown message priority status [%d]"),
02612                             (int) current_status),
02613                            -1);
02614         }
02615     }
02616   return 0;
02617 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::refresh_queue ( const ACE_Time_Value current_time  )  [protected, virtual]

Refresh the queue using the strategy specific priority status function.

Definition at line 2484 of file Message_Queue_T.cpp.

References ACE_Dynamic_Message_Queue<>::refresh_late_queue(), and ACE_Dynamic_Message_Queue<>::refresh_pending_queue().

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head(), ACE_Dynamic_Message_Queue<>::enqueue_i(), and ACE_Dynamic_Message_Queue<>::remove_messages().

02485 {
02486   int result;
02487 
02488   result = refresh_pending_queue (current_time);
02489 
02490   if (result != -1)
02491     result = refresh_late_queue (current_time);
02492 
02493   return result;
02494 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::remove_messages ( ACE_Message_Block *&  list_head,
ACE_Message_Block *&  list_tail,
u_int  status_flags 
) [virtual]

Detach all messages with status given in the passed flags from the queue and return them by setting passed head and tail pointers to the linked list they comprise. This method is intended primarily as a means of periodically harvesting messages that have missed their deadlines, but is available in its most general form. All messages are returned in priority order, from head to tail, as of the time this method was called.

Definition at line 1940 of file Message_Queue_T.cpp.

References ACE_BIT_ENABLED, ACE_Dynamic_Message_Strategy::BEYOND_LATE, ACE_Dynamic_Message_Queue<>::beyond_late_head_, ACE_Dynamic_Message_Queue<>::beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_bytes_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_length_, ACE_OS::gettimeofday(), ACE_Message_Queue< ACE_SYNCH_USE >::head_, ACE_Dynamic_Message_Strategy::LATE, ACE_Dynamic_Message_Queue<>::late_head_, ACE_Dynamic_Message_Queue<>::late_tail_, ACE_Message_Block::next(), ACE_Dynamic_Message_Strategy::PENDING, ACE_Dynamic_Message_Queue<>::pending_head_, ACE_Dynamic_Message_Queue<>::pending_tail_, ACE_Message_Block::prev(), ACE_Dynamic_Message_Queue<>::refresh_queue(), ACE_Message_Queue< ACE_SYNCH_USE >::tail_, and ACE_Message_Block::total_size_and_length().

01943 {
01944   // start with an empty list
01945   list_head = 0;
01946   list_tail = 0;
01947 
01948   // Get the current time
01949   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01950 
01951   // Refresh priority status boundaries in the queue.
01952   int result = this->refresh_queue (current_time);
01953   if (result < 0)
01954     return result;
01955 
01956   if (ACE_BIT_ENABLED (status_flags,
01957                        (u_int) ACE_Dynamic_Message_Strategy::PENDING)
01958       && this->pending_head_
01959       && this->pending_tail_)
01960     {
01961       // patch up pointers for the new tail of the queue
01962       if (this->pending_head_->prev ())
01963         {
01964           this->tail_ = this->pending_head_->prev ();
01965           this->pending_head_->prev ()->next (0);
01966         }
01967       else
01968         {
01969           // the list has become empty
01970           this->head_ = 0;
01971           this->tail_ = 0;
01972         }
01973 
01974       // point to the head and tail of the list
01975       list_head = this->pending_head_;
01976       list_tail = this->pending_tail_;
01977 
01978       // cut the pending messages out of the queue entirely
01979       this->pending_head_->prev (0);
01980       this->pending_head_ = 0;
01981       this->pending_tail_ = 0;
01982     }
01983 
01984   if (ACE_BIT_ENABLED (status_flags,
01985                        (u_int) ACE_Dynamic_Message_Strategy::LATE)
01986       && this->late_head_
01987       && this->late_tail_)
01988     {
01989       // Patch up pointers for the (possibly) new head and tail of the
01990       // queue.
01991       if (this->late_tail_->next ())
01992         this->late_tail_->next ()->prev (this->late_head_->prev ());
01993       else
01994         this->tail_ = this->late_head_->prev ();
01995 
01996       if (this->late_head_->prev ())
01997         this->late_head_->prev ()->next (this->late_tail_->next ());
01998       else
01999         this->head_ = this->late_tail_->next ();
02000 
02001       // put late messages behind pending messages (if any) being returned
02002       this->late_head_->prev (list_tail);
02003       if (list_tail)
02004         list_tail->next (this->late_head_);
02005       else
02006         list_head = this->late_head_;
02007 
02008       list_tail = this->late_tail_;
02009 
02010       this->late_tail_->next (0);
02011       this->late_head_ = 0;
02012       this->late_tail_ = 0;
02013     }
02014 
02015   if (ACE_BIT_ENABLED (status_flags,
02016       (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE)
02017       && this->beyond_late_head_
02018       && this->beyond_late_tail_)
02019     {
02020       // Patch up pointers for the new tail of the queue
02021       if (this->beyond_late_tail_->next ())
02022         {
02023           this->head_ = this->beyond_late_tail_->next ();
02024           this->beyond_late_tail_->next ()->prev (0);
02025         }
02026       else
02027         {
02028           // the list has become empty
02029           this->head_ = 0;
02030           this->tail_ = 0;
02031         }
02032 
02033       // Put beyond late messages at the end of the list being
02034       // returned.
02035       if (list_tail)
02036         {
02037           this->beyond_late_head_->prev (list_tail);
02038           list_tail->next (this->beyond_late_head_);
02039         }
02040       else
02041         list_head = this->beyond_late_head_;
02042 
02043       list_tail = this->beyond_late_tail_;
02044 
02045       this->beyond_late_tail_->next (0);
02046       this->beyond_late_head_ = 0;
02047       this->beyond_late_tail_ = 0;
02048     }
02049 
02050   // Decrement message and size counts for removed messages.
02051   ACE_Message_Block *temp1;
02052 
02053   for (temp1 = list_head;
02054        temp1 != 0;
02055        temp1 = temp1->next ())
02056     {
02057       --this->cur_count_;
02058 
02059       size_t mb_bytes = 0;
02060       size_t mb_length = 0;
02061       temp1->total_size_and_length (mb_bytes,
02062                                     mb_length);
02063       // Subtract off all of the bytes associated with this message.
02064       this->cur_bytes_ -= mb_bytes;
02065       this->cur_length_ -= mb_length;
02066     }
02067 
02068   return result;
02069 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::sublist_enqueue_i ( ACE_Message_Block new_item,
const ACE_Time_Value current_time,
ACE_Message_Block *&  sublist_head,
ACE_Message_Block *&  sublist_tail,
ACE_Dynamic_Message_Strategy::Priority_Status  status 
) [protected, virtual]

Enqueue a message in priority order within a given priority status sublist.

Definition at line 2292 of file Message_Queue_T.cpp.

References ACE_Message_Queue< ACE_SYNCH_USE >::head_, ACE_Dynamic_Message_Queue<>::message_strategy_, ACE_Message_Block::msg_priority(), ACE_Message_Block::next(), ACE_Message_Block::prev(), ACE_Dynamic_Message_Strategy::priority_status(), and ACE_Message_Queue< ACE_SYNCH_USE >::tail_.

Referenced by ACE_Dynamic_Message_Queue<>::enqueue_i().

02297 {
02298   int result = 0;
02299   ACE_Message_Block *current_item = 0;
02300 
02301   // Find message after which to enqueue new item, based on message
02302   // priority and priority status.
02303   for (current_item = sublist_tail;
02304        current_item;
02305        current_item = current_item->prev ())
02306     {
02307       if (message_strategy_.priority_status (*current_item, current_time) == status)
02308         {
02309           if (current_item->msg_priority () >= new_item->msg_priority ())
02310             break;
02311         }
02312       else
02313         {
02314           sublist_head = new_item;
02315           break;
02316         }
02317     }
02318 
02319   if (current_item == 0)
02320     {
02321       // If the new message has highest priority of any, put it at the
02322       // head of the list (and sublist).
02323       new_item->prev (0);
02324       new_item->next (this->head_);
02325       if (this->head_ != 0)
02326         this->head_->prev (new_item);
02327       else
02328         {
02329           this->tail_ = new_item;
02330           sublist_tail = new_item;
02331         }
02332       this->head_ = new_item;
02333       sublist_head = new_item;
02334     }
02335   else
02336     {
02337       // insert the new item into the list
02338       new_item->next (current_item->next ());
02339       new_item->prev (current_item);
02340 
02341       if (current_item->next ())
02342         current_item->next ()->prev (new_item);
02343       else
02344         this->tail_ = new_item;
02345 
02346       current_item->next (new_item);
02347 
02348       // If the new item has lowest priority of any in the sublist,
02349       // move the tail pointer of the sublist back to the new item
02350       if (current_item == sublist_tail)
02351         sublist_tail = new_item;
02352     }
02353 
02354   return result;
02355 }


Member Data Documentation

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Queue<>::ACE_ALLOC_HOOK_DECLARE

Declare the dynamic allocation hooks.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 823 of file Message_Queue_T.h.

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::beyond_late_head_ [protected]

Pointer to head of the beyond late messages.

Definition at line 877 of file Message_Queue_T.h.

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::beyond_late_tail_ [protected]

Pointer to tail of the beyond late messages.

Definition at line 880 of file Message_Queue_T.h.

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::late_head_ [protected]

Pointer to head of the late messages.

Definition at line 871 of file Message_Queue_T.h.

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::late_tail_ [protected]

Pointer to tail of the late messages.

Definition at line 874 of file Message_Queue_T.h.

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Strategy& ACE_Dynamic_Message_Queue<>::message_strategy_ [protected]

Pointer to a dynamic priority evaluation function.

Definition at line 883 of file Message_Queue_T.h.

Referenced by ACE_Dynamic_Message_Queue<>::dump(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_late_queue(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), ACE_Dynamic_Message_Queue<>::sublist_enqueue_i(), and ACE_Dynamic_Message_Queue<>::~ACE_Dynamic_Message_Queue().

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::pending_head_ [protected]

Pointer to head of the pending messages.

Definition at line 865 of file Message_Queue_T.h.

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue<>::pending_tail_ [protected]

Pointer to tail of the pending messages.

Definition at line 868 of file Message_Queue_T.h.

Referenced by ACE_Dynamic_Message_Queue<>::dequeue_head_i(), ACE_Dynamic_Message_Queue<>::enqueue_i(), ACE_Dynamic_Message_Queue<>::refresh_pending_queue(), and ACE_Dynamic_Message_Queue<>::remove_messages().


The documentation for this class was generated from the following files:
Generated on Tue Feb 2 17:35:05 2010 for ACE by  doxygen 1.4.7