Message_Queue_T.cpp

Go to the documentation of this file.
00001 // Message_Queue_T.cpp,v 4.82 2006/05/30 10:57:22 jwillemsen Exp
00002 
00003 #ifndef ACE_MESSAGE_QUEUE_T_CPP
00004 #define ACE_MESSAGE_QUEUE_T_CPP
00005 
00006 // #include Message_Queue.h instead of Message_Queue_T.h to avoid
00007 // circular include problems.
00008 #include "ace/Message_Queue.h"
00009 #include "ace/Log_Msg.h"
00010 #include "ace/OS_NS_sys_time.h"
00011 
00012 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00013 # pragma once
00014 #endif /* ACE_LACKS_PRAGMA_ONCE */
00015 
00016 #include "ace/Notification_Strategy.h"
00017 #include "ace/Truncate.h"
00018 
00019 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00020 
00021 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue)
00022 ACE_ALLOC_HOOK_DEFINE(ACE_Dynamic_Message_Queue)
00023 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Ex)
00024 
00025 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00026 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump (void) const
00027 {
00028 #if defined (ACE_HAS_DUMP)
00029   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump");
00030 
00031   this->queue_.dump ();
00032 #endif /* ACE_HAS_DUMP */
00033 }
00034 
00035 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00036 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes (size_t new_value)
00037 {
00038   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes");
00039 
00040   this->queue_.message_bytes (new_value);
00041 }
00042 
00043 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00044 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length (size_t new_value)
00045 {
00046   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length");
00047 
00048   this->queue_.message_length (new_value);
00049 }
00050 
00051 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00052 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex (size_t hwm,
00053                                                                              size_t lwm,
00054                                                                              ACE_Notification_Strategy *ns)
00055 {
00056   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex");
00057 
00058   if (this->queue_.open (hwm, lwm, ns) == -1)
00059     ACE_ERROR ((LM_ERROR,
00060                 ACE_LIB_TEXT ("ACE_Message_Queue_Ex")));
00061 }
00062 
00063 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00064 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex (void)
00065 {
00066   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex");
00067 }
00068 
00069 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00070 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::open (size_t hwm,
00071                                                              size_t lwm,
00072                                                              ACE_Notification_Strategy *ns)
00073 {
00074   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::open");
00075 
00076   return this->queue_.open (hwm, lwm, ns);
00077 }
00078 
00079 // Clean up the queue if we have not already done so!
00080 
00081 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00082 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::close (void)
00083 {
00084   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::close");
00085 
00086   return this->queue_.close ();
00087 }
00088 
00089 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00090 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush (void)
00091 {
00092   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush");
00093 
00094   return this->queue_.flush ();
00095 }
00096 
00097 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00098 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush_i (void)
00099 {
00100   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush_i");
00101 
00102   return this->queue_.flush_i ();
00103 }
00104 
00105 // Take a look at the first item without removing it.
00106 
00107 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00108 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::peek_dequeue_head (ACE_MESSAGE_TYPE *&first_item,
00109                                                                           ACE_Time_Value *timeout)
00110 {
00111   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::peek_dequeue_head");
00112 
00113   ACE_Message_Block *mb = 0;
00114 
00115   int const cur_count = this->queue_.peek_dequeue_head (mb, timeout);
00116 
00117   if (cur_count != -1)
00118     first_item  = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00119 
00120   return cur_count;
00121 }
00122 
00123 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00124 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head (ACE_MESSAGE_TYPE *new_item,
00125                                                                      ACE_Time_Value *timeout)
00126 {
00127   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head");
00128 
00129   ACE_Message_Block *mb = 0;
00130 
00131   ACE_NEW_RETURN (mb,
00132                   ACE_Message_Block ((char *) new_item,
00133                                      sizeof (*new_item),
00134                                      ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00135                   -1);
00136 
00137   int const result = this->queue_.enqueue_head (mb, timeout);
00138   if (result == -1)
00139     // Zap the message.
00140     mb->release ();
00141   return result;
00142 }
00143 
00144 // Enqueue an <ACE_MESSAGE_TYPE *> into the <Message_Queue> in
00145 // accordance with its <msg_priority> (0 is lowest priority).  Returns
00146 // -1 on failure, else the number of items still on the queue.
00147 
00148 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00149 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue (ACE_MESSAGE_TYPE *new_item,
00150                                                                 ACE_Time_Value *timeout)
00151 {
00152   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue");
00153 
00154   return this->enqueue_prio (new_item, timeout);
00155 }
00156 
00157 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00158 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio (ACE_MESSAGE_TYPE *new_item,
00159                                                                      ACE_Time_Value *timeout)
00160 {
00161   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio");
00162 
00163   ACE_Message_Block *mb = 0;
00164 
00165   ACE_NEW_RETURN (mb,
00166                   ACE_Message_Block ((char *) new_item,
00167                                      sizeof (*new_item),
00168                                      ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00169                   -1);
00170 
00171   int const result = this->queue_.enqueue_prio (mb, timeout);
00172   if (result == -1)
00173     // Zap the message.
00174     mb->release ();
00175 
00176   return result;
00177 }
00178 
00179 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00180 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_deadline (ACE_MESSAGE_TYPE *new_item,
00181                                                                          ACE_Time_Value *timeout)
00182 {
00183   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_deadline");
00184 
00185   ACE_Message_Block *mb = 0;
00186 
00187   ACE_NEW_RETURN (mb,
00188                   ACE_Message_Block ((char *) new_item,
00189                                      sizeof (*new_item),
00190                                      ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY ),
00191                   -1);
00192 
00193   int const result = this->queue_.enqueue_deadline (mb, timeout);
00194   if (result == -1)
00195     // Zap the message.
00196     mb->release ();
00197 
00198   return result;
00199 }
00200 
00201 // Block indefinitely waiting for an item to arrive,
00202 // does not ignore alerts (e.g., signals).
00203 
00204 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00205 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail (ACE_MESSAGE_TYPE *new_item,
00206                                                                      ACE_Time_Value *timeout)
00207 {
00208   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail");
00209 
00210   ACE_Message_Block *mb = 0;
00211 
00212   ACE_NEW_RETURN (mb,
00213                   ACE_Message_Block ((char *) new_item,
00214                                      sizeof (*new_item),
00215                                      ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00216                   -1);
00217 
00218   int const result = this->queue_.enqueue_tail (mb, timeout);
00219   if (result == -1)
00220     // Zap the message.
00221     mb->release ();
00222   return result;
00223 }
00224 
00225 // Remove an item from the front of the queue.  If timeout == 0 block
00226 // indefinitely (or until an alert occurs).  Otherwise, block for upto
00227 // the amount of time specified by timeout.
00228 
00229 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00230 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_head (ACE_MESSAGE_TYPE *&first_item,
00231                                                                      ACE_Time_Value *timeout)
00232 {
00233   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_head");
00234 
00235   ACE_Message_Block *mb = 0;
00236 
00237   int const cur_count = this->queue_.dequeue_head (mb, timeout);
00238 
00239   // Dequeue the message.
00240   if (cur_count != -1)
00241     {
00242       first_item = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00243       // Delete the message block.
00244       mb->release ();
00245     }
00246 
00247   return cur_count;
00248 }
00249 
00250 // Remove the item with the lowest priority from the queue.  If timeout == 0
00251 // block indefinitely (or until an alert occurs).  Otherwise, block for upto
00252 // the amount of time specified by timeout.
00253 
00254 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00255 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_prio (ACE_MESSAGE_TYPE *&dequeued,
00256                                                                      ACE_Time_Value *timeout)
00257 {
00258   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_prio");
00259 
00260   ACE_Message_Block *mb = 0;
00261 
00262   int const cur_count = this->queue_.dequeue_prio (mb, timeout);
00263 
00264   // Dequeue the message.
00265   if (cur_count != -1)
00266     {
00267       dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00268       // Delete the message block.
00269       mb->release ();
00270     }
00271 
00272   return cur_count;
00273 }
00274 
00275 // Remove an item from the end of the queue.  If timeout == 0 block
00276 // indefinitely (or until an alert occurs).  Otherwise, block for upto
00277 // the amount of time specified by timeout.
00278 
00279 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00280 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_tail (ACE_MESSAGE_TYPE *&dequeued,
00281                                                                      ACE_Time_Value *timeout)
00282 {
00283   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_tail");
00284 
00285   ACE_Message_Block *mb = 0;
00286 
00287   int const cur_count = this->queue_.dequeue_tail (mb, timeout);
00288 
00289   // Dequeue the message.
00290   if (cur_count != -1)
00291     {
00292       dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00293       // Delete the message block.
00294       mb->release ();
00295     }
00296 
00297   return cur_count;
00298 }
00299 
00300 // Remove an item with the lowest deadline time.  If timeout == 0 block
00301 // indefinitely (or until an alert occurs).  Otherwise, block for upto
00302 // the amount of time specified by timeout.
00303 
00304 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00305 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_deadline (ACE_MESSAGE_TYPE *&dequeued,
00306                                                                          ACE_Time_Value *timeout)
00307 {
00308   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_deadline");
00309 
00310   ACE_Message_Block *mb = 0;
00311 
00312   int const cur_count = this->queue_.dequeue_deadline (mb, timeout);
00313 
00314   // Dequeue the message.
00315   if (cur_count != -1)
00316     {
00317       dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00318       // Delete the message block.
00319       mb->release ();
00320     }
00321 
00322   return cur_count;
00323 }
00324 
00325 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00326 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notify (void)
00327 {
00328   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notify");
00329 
00330   return this->queue_.notify ();
00331 }
00332 
00333 
00334 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Reverse_Iterator)
00335 
00336 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00337 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue (ACE_MESSAGE_TYPE *&first_item,
00338                                                                 ACE_Time_Value *timeout)
00339 {
00340   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue");
00341 
00342   return this->dequeue_head (first_item, timeout);
00343 }
00344 
00345 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_Notification_Strategy *
00346 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy (void)
00347 {
00348   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy");
00349 
00350   return this->queue_.notification_strategy ();
00351 }
00352 
00353 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00354 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy (ACE_Notification_Strategy *s)
00355 {
00356   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy");
00357 
00358   this->queue_.notification_strategy (s);
00359 }
00360 
00361 // Check if queue is empty (holds locks).
00362 
00363 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00364 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_empty (void)
00365 {
00366   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_empty");
00367 
00368   return this->queue_.is_empty ();
00369 }
00370 
00371 // Check if queue is full (holds locks).
00372 
00373 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00374 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_full (void)
00375 {
00376   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_full");
00377 
00378   return this->queue_.is_full ();
00379 }
00380 
00381 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00382 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark (void)
00383 {
00384   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark");
00385 
00386   return this->queue_.high_water_mark ();
00387 }
00388 
00389 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00390 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark (size_t hwm)
00391 {
00392   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark");
00393 
00394   this->queue_.high_water_mark (hwm);
00395 }
00396 
00397 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00398 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark (void)
00399 {
00400   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark");
00401 
00402   return this->queue_.low_water_mark ();
00403 }
00404 
00405 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00406 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark (size_t lwm)
00407 {
00408   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark");
00409 
00410   this->queue_.low_water_mark (lwm);
00411 }
00412 
00413 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00414 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes (void)
00415 {
00416   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes");
00417 
00418   return this->queue_.message_bytes ();
00419 }
00420 
00421 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00422 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length (void)
00423 {
00424   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length");
00425 
00426   return this->queue_.message_length ();
00427 }
00428 
00429 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00430 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_count (void)
00431 {
00432   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_count");
00433 
00434   return this->queue_.message_count ();
00435 }
00436 
00437 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00438 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate (void)
00439 {
00440   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate");
00441 
00442   return this->queue_.deactivate ();
00443 }
00444 
00445 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00446 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate (void)
00447 {
00448   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate");
00449 
00450   return this->queue_.activate ();
00451 }
00452 
00453 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00454 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::pulse (void)
00455 {
00456   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::pulse");
00457 
00458   return this->queue_.pulse ();
00459 }
00460 
00461 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00462 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivated (void)
00463 {
00464   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivated");
00465 
00466   return this->queue_.deactivated ();
00467 }
00468 
00469 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00470 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::state (void)
00471 {
00472   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::state");
00473 
00474   return this->queue_.state ();
00475 }
00476 
00477 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_SYNCH_MUTEX_T &
00478 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::lock (void)
00479 {
00480   return this->queue_.lock ();
00481 }
00482 
00483 template <ACE_SYNCH_DECL>
00484 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
00485   : queue_ (q),
00486     curr_ (q.head_)
00487 {
00488 }
00489 
00490 template <ACE_SYNCH_DECL> int
00491 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
00492 {
00493   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00494 
00495   if (this->curr_ != 0)
00496     {
00497       entry = this->curr_;
00498       return 1;
00499     }
00500 
00501   return 0;
00502 }
00503 
00504 template <ACE_SYNCH_DECL> int
00505 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::done (void) const
00506 {
00507   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00508 
00509   return this->curr_ == 0;
00510 }
00511 
00512 template <ACE_SYNCH_DECL> int
00513 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::advance (void)
00514 {
00515   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00516 
00517   if (this->curr_)
00518     this->curr_ = this->curr_->next ();
00519   return this->curr_ != 0;
00520 }
00521 
00522 template <ACE_SYNCH_DECL> void
00523 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::dump (void) const
00524 {
00525 #if defined (ACE_HAS_DUMP)
00526 #endif /* ACE_HAS_DUMP */
00527 }
00528 
00529 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator)
00530 
00531 template <ACE_SYNCH_DECL>
00532 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
00533   : queue_ (q),
00534     curr_ (queue_.tail_)
00535 {
00536 }
00537 
00538 template <ACE_SYNCH_DECL> int
00539 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
00540 {
00541   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00542 
00543   if (this->curr_ != 0)
00544     {
00545       entry = this->curr_;
00546       return 1;
00547     }
00548 
00549   return 0;
00550 }
00551 
00552 template <ACE_SYNCH_DECL> int
00553 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::done (void) const
00554 {
00555   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00556 
00557   return this->curr_ == 0;
00558 }
00559 
00560 template <ACE_SYNCH_DECL> int
00561 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::advance (void)
00562 {
00563   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00564 
00565   if (this->curr_)
00566     this->curr_ = this->curr_->prev ();
00567   return this->curr_ != 0;
00568 }
00569 
00570 template <ACE_SYNCH_DECL> void
00571 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::dump (void) const
00572 {
00573 #if defined (ACE_HAS_DUMP)
00574 #endif /* ACE_HAS_DUMP */
00575 }
00576 
00577 template <ACE_SYNCH_DECL> int
00578 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue (ACE_Message_Block *&first_item,
00579                                            ACE_Time_Value *timeout)
00580 {
00581   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue");
00582   return this->dequeue_head (first_item, timeout);
00583 }
00584 
00585 template <ACE_SYNCH_DECL> ACE_Notification_Strategy *
00586 ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (void)
00587 {
00588   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy");
00589 
00590   return this->notification_strategy_;
00591 }
00592 
00593 template <ACE_SYNCH_DECL> void
00594 ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (ACE_Notification_Strategy *s)
00595 {
00596   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy");
00597 
00598   this->notification_strategy_ = s;
00599 }
00600 
00601 // Check if queue is empty (does not hold locks).
00602 
00603 template <ACE_SYNCH_DECL> int
00604 ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i (void)
00605 {
00606   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i");
00607   return this->tail_ == 0;
00608 }
00609 
00610 // Check if queue is full (does not hold locks).
00611 
00612 template <ACE_SYNCH_DECL> int
00613 ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i (void)
00614 {
00615   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i");
00616   return this->cur_bytes_ >= this->high_water_mark_;
00617 }
00618 
00619 // Check if queue is empty (holds locks).
00620 
00621 template <ACE_SYNCH_DECL> int
00622 ACE_Message_Queue<ACE_SYNCH_USE>::is_empty (void)
00623 {
00624   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty");
00625   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00626 
00627   return this->is_empty_i ();
00628 }
00629 
00630 // Check if queue is full (holds locks).
00631 
00632 template <ACE_SYNCH_DECL> int
00633 ACE_Message_Queue<ACE_SYNCH_USE>::is_full (void)
00634 {
00635   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full");
00636   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00637 
00638   return this->is_full_i ();
00639 }
00640 
00641 template <ACE_SYNCH_DECL> size_t
00642 ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (void)
00643 {
00644   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark");
00645   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00646 
00647   return this->high_water_mark_;
00648 }
00649 
00650 template <ACE_SYNCH_DECL> void
00651 ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (size_t hwm)
00652 {
00653   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark");
00654   ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00655 
00656   this->high_water_mark_ = hwm;
00657 }
00658 
00659 template <ACE_SYNCH_DECL> size_t
00660 ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (void)
00661 {
00662   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark");
00663   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00664 
00665   return this->low_water_mark_;
00666 }
00667 
00668 template <ACE_SYNCH_DECL> void
00669 ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (size_t lwm)
00670 {
00671   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark");
00672   ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00673 
00674   this->low_water_mark_ = lwm;
00675 }
00676 
00677 template <ACE_SYNCH_DECL> size_t
00678 ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (void)
00679 {
00680   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes");
00681   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00682 
00683   return this->cur_bytes_;
00684 }
00685 
00686 template <ACE_SYNCH_DECL> size_t
00687 ACE_Message_Queue<ACE_SYNCH_USE>::message_length (void)
00688 {
00689   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_length");
00690   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00691 
00692   return this->cur_length_;
00693 }
00694 
00695 template <ACE_SYNCH_DECL> size_t
00696 ACE_Message_Queue<ACE_SYNCH_USE>::message_count (void)
00697 {
00698   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_count");
00699   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00700 
00701   return this->cur_count_;
00702 }
00703 
00704 template <ACE_SYNCH_DECL> int
00705 ACE_Message_Queue<ACE_SYNCH_USE>::deactivate ()
00706 {
00707   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate");
00708   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00709 
00710   return this->deactivate_i (0);   // Not a pulse
00711 }
00712 
00713 template <ACE_SYNCH_DECL> int
00714 ACE_Message_Queue<ACE_SYNCH_USE>::activate (void)
00715 {
00716   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate");
00717   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00718 
00719   return this->activate_i ();
00720 }
00721 
00722 template <ACE_SYNCH_DECL> int
00723 ACE_Message_Queue<ACE_SYNCH_USE>::pulse ()
00724 {
00725   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::pulse");
00726   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00727 
00728   return this->deactivate_i (1);   // Just a pulse
00729 }
00730 
00731 template <ACE_SYNCH_DECL> int
00732 ACE_Message_Queue<ACE_SYNCH_USE>::deactivated (void)
00733 {
00734   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivated");
00735 
00736   return this->state_ == ACE_Message_Queue_Base::DEACTIVATED;
00737 }
00738 
00739 template <ACE_SYNCH_DECL> int
00740 ACE_Message_Queue<ACE_SYNCH_USE>::state (void)
00741 {
00742   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::state");
00743 
00744   return this->state_;
00745 }
00746 
00747 template <ACE_SYNCH_DECL> ACE_SYNCH_MUTEX_T &
00748 ACE_Message_Queue<ACE_SYNCH_USE>::lock (void)
00749 {
00750   return this->lock_;
00751 }
00752 
00753 template <ACE_SYNCH_DECL> void
00754 ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const
00755 {
00756 #if defined (ACE_HAS_DUMP)
00757   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dump");
00758   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00759   switch (this->state_)
00760     {
00761     case ACE_Message_Queue_Base::ACTIVATED:
00762       ACE_DEBUG ((LM_DEBUG,
00763                   ACE_LIB_TEXT ("state = ACTIVATED\n")));
00764       break;
00765     case ACE_Message_Queue_Base::DEACTIVATED:
00766       ACE_DEBUG ((LM_DEBUG,
00767                   ACE_LIB_TEXT ("state = DEACTIVATED\n")));
00768       break;
00769     case ACE_Message_Queue_Base::PULSED:
00770       ACE_DEBUG ((LM_DEBUG,
00771                   ACE_LIB_TEXT ("state = PULSED\n")));
00772       break;
00773     }
00774   ACE_DEBUG ((LM_DEBUG,
00775               ACE_LIB_TEXT ("low_water_mark = %d\n")
00776               ACE_LIB_TEXT ("high_water_mark = %d\n")
00777               ACE_LIB_TEXT ("cur_bytes = %d\n")
00778               ACE_LIB_TEXT ("cur_length = %d\n")
00779               ACE_LIB_TEXT ("cur_count = %d\n")
00780               ACE_LIB_TEXT ("head_ = %u\n")
00781               ACE_LIB_TEXT ("tail_ = %u\n"),
00782               this->low_water_mark_,
00783               this->high_water_mark_,
00784               this->cur_bytes_,
00785               this->cur_length_,
00786               this->cur_count_,
00787               this->head_,
00788               this->tail_));
00789   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("not_full_cond: \n")));
00790   not_full_cond_.dump ();
00791   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("not_empty_cond: \n")));
00792   not_empty_cond_.dump ();
00793   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00794 #endif /* ACE_HAS_DUMP */
00795 }
00796 
00797 template <ACE_SYNCH_DECL> void
00798 ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (size_t new_value)
00799 {
00800   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes");
00801   ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00802 
00803   this->cur_bytes_ = new_value;
00804 }
00805 
00806 template <ACE_SYNCH_DECL> void
00807 ACE_Message_Queue<ACE_SYNCH_USE>::message_length (size_t new_value)
00808 {
00809   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_length");
00810   ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00811 
00812   this->cur_length_ = new_value;
00813 }
00814 
00815 template <ACE_SYNCH_DECL>
00816 ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue (size_t hwm,
00817                                                      size_t lwm,
00818                                                      ACE_Notification_Strategy *ns)
00819   : not_empty_cond_ (this->lock_),
00820     not_full_cond_ (this->lock_)
00821 {
00822   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue");
00823 
00824   if (this->open (hwm, lwm, ns) == -1)
00825     ACE_ERROR ((LM_ERROR,
00826                 ACE_LIB_TEXT ("open")));
00827 }
00828 
00829 template <ACE_SYNCH_DECL>
00830 ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue (void)
00831 {
00832   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue");
00833   if (this->head_ != 0 && this->close () == -1)
00834     ACE_ERROR ((LM_ERROR,
00835                 ACE_LIB_TEXT ("close")));
00836 }
00837 
00838 template <ACE_SYNCH_DECL> int
00839 ACE_Message_Queue<ACE_SYNCH_USE>::flush_i (void)
00840 {
00841   int number_flushed = 0;
00842 
00843   // Remove all the <ACE_Message_Block>s in the <ACE_Message_Queue>
00844   // and <release> their memory.
00845   for (this->tail_ = 0; this->head_ != 0; )
00846     {
00847       ++number_flushed;
00848 
00849       size_t mb_bytes = 0;
00850       size_t mb_length = 0;
00851       this->head_->total_size_and_length (mb_bytes,
00852                                           mb_length);
00853       // Subtract off all of the bytes associated with this message.
00854       this->cur_bytes_ -= mb_bytes;
00855       this->cur_length_ -= mb_length;
00856       --this->cur_count_;
00857 
00858       ACE_Message_Block *temp = this->head_;
00859       this->head_ = this->head_->next ();
00860 
00861       // Make sure to use <release> rather than <delete> since this is
00862       // reference counted.
00863       temp->release ();
00864     }
00865 
00866   return number_flushed;
00867 }
00868 
00869 // Don't bother locking since if someone calls this function more than
00870 // once for the same queue, we're in bigger trouble than just
00871 // concurrency control!
00872 
00873 template <ACE_SYNCH_DECL> int
00874 ACE_Message_Queue<ACE_SYNCH_USE>::open (size_t hwm,
00875                                         size_t lwm,
00876                                         ACE_Notification_Strategy *ns)
00877 {
00878   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::open");
00879   this->high_water_mark_ = hwm;
00880   this->low_water_mark_  = lwm;
00881   this->state_ = ACE_Message_Queue_Base::ACTIVATED;
00882   this->cur_bytes_ = 0;
00883   this->cur_length_ = 0;
00884   this->cur_count_ = 0;
00885   this->tail_ = 0;
00886   this->head_ = 0;
00887   this->notification_strategy_ = ns;
00888   return 0;
00889 }
00890 
00891 // Implementation of the public deactivate() method
00892 // (assumes locks are held).
00893 
00894 template <ACE_SYNCH_DECL> int
00895 ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i (int pulse)
00896 {
00897   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i");
00898   int const previous_state = this->state_;
00899 
00900   if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
00901     {
00902       // Wakeup all waiters.
00903       this->not_empty_cond_.broadcast ();
00904       this->not_full_cond_.broadcast ();
00905 
00906       if (pulse)
00907         this->state_ = ACE_Message_Queue_Base::PULSED;
00908       else
00909         this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
00910     }
00911   return previous_state;
00912 }
00913 
00914 template <ACE_SYNCH_DECL> int
00915 ACE_Message_Queue<ACE_SYNCH_USE>::activate_i (void)
00916 {
00917   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate_i");
00918   int const previous_state = this->state_;
00919   this->state_ = ACE_Message_Queue_Base::ACTIVATED;
00920   return previous_state;
00921 }
00922 
00923 template <ACE_SYNCH_DECL> int
00924 ACE_Message_Queue<ACE_SYNCH_USE>::flush (void)
00925 {
00926   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::flush");
00927   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00928 
00929   // Free up the remaining messages on the queue.
00930   return this->flush_i ();
00931 }
00932 
00933 // Clean up the queue if we have not already done so!
00934 
00935 template <ACE_SYNCH_DECL> int
00936 ACE_Message_Queue<ACE_SYNCH_USE>::close (void)
00937 {
00938   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::close");
00939   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00940 
00941   int const result = this->deactivate_i ();
00942 
00943   // Free up the remaining messages on the queue.
00944   this->flush_i ();
00945 
00946   return result;
00947 }
00948 
00949 template <ACE_SYNCH_DECL> int
00950 ACE_Message_Queue<ACE_SYNCH_USE>::signal_enqueue_waiters (void)
00951 {
00952   if (this->not_full_cond_.signal () != 0)
00953     return -1;
00954   return 0;
00955 }
00956 
00957 template <ACE_SYNCH_DECL> int
00958 ACE_Message_Queue<ACE_SYNCH_USE>::signal_dequeue_waiters (void)
00959 {
00960   // Tell any blocked threads that the queue has a new item!
00961   if (this->not_empty_cond_.signal () != 0)
00962     return -1;
00963   return 0;
00964 }
00965 
00966 // Actually put the node at the end (no locking so must be called with
00967 // locks held).
00968 
00969 template <ACE_SYNCH_DECL> int
00970 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i (ACE_Message_Block *new_item)
00971 {
00972   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i");
00973 
00974   if (new_item == 0)
00975     return -1;
00976 
00977   // Update the queued size and length, taking into account any chained
00978   // blocks (total_size_and_length() counts all continuation blocks).
00979   // Keep count of how many blocks we're adding and, if there is a chain of
00980   // blocks, find the end in seq_tail and be sure they're properly
00981   // back-connected along the way.
00982   ACE_Message_Block *seq_tail = new_item;
00983   ++this->cur_count_;
00984   new_item->total_size_and_length (this->cur_bytes_,
00985                                    this->cur_length_);
00986   while (seq_tail->next () != 0)
00987     {
00988       seq_tail->next ()->prev (seq_tail);
00989       seq_tail = seq_tail->next ();
00990       ++this->cur_count_;
00991       seq_tail->total_size_and_length (this->cur_bytes_,
00992                                        this->cur_length_);
00993     }
00994 
00995   // List was empty, so build a new one.
00996   if (this->tail_ == 0)
00997     {
00998       this->head_ = new_item;
00999       this->tail_ = seq_tail;
01000       // seq_tail->next (0);   This is a condition of the while() loop above.
01001       new_item->prev (0);
01002     }
01003   // Link at the end.
01004   else
01005     {
01006       // seq_tail->next (0);   This is a condition of the while() loop above.
01007       this->tail_->next (new_item);
01008       new_item->prev (this->tail_);
01009       this->tail_ = seq_tail;
01010     }
01011 
01012   if (this->signal_dequeue_waiters () == -1)
01013     return -1;
01014   else
01015     return ACE_Utils::Truncate (this->cur_count_);
01016 }
01017 
01018 // Actually put the node(s) at the head (no locking)
01019 
01020 template <ACE_SYNCH_DECL> int
01021 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i (ACE_Message_Block *new_item)
01022 {
01023   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i");
01024 
01025   if (new_item == 0)
01026     return -1;
01027 
01028   // Update the queued size and length, taking into account any chained
01029   // blocks (total_size_and_length() counts all continuation blocks).
01030   // Keep count of how many blocks we're adding and, if there is a chain of
01031   // blocks, find the end in seq_tail and be sure they're properly
01032   // back-connected along the way.
01033   ACE_Message_Block *seq_tail = new_item;
01034   ++this->cur_count_;
01035   new_item->total_size_and_length (this->cur_bytes_,
01036                                    this->cur_length_);
01037   while (seq_tail->next () != 0)
01038     {
01039       seq_tail->next ()->prev (seq_tail);
01040       seq_tail = seq_tail->next ();
01041       ++this->cur_count_;
01042       seq_tail->total_size_and_length (this->cur_bytes_,
01043                                        this->cur_length_);
01044     }
01045 
01046   new_item->prev (0);
01047   seq_tail->next (this->head_);
01048 
01049   if (this->head_ != 0)
01050     this->head_->prev (seq_tail);
01051   else
01052     this->tail_ = seq_tail;
01053 
01054   this->head_ = new_item;
01055 
01056   if (this->signal_dequeue_waiters () == -1)
01057     return -1;
01058   else
01059     return ACE_Utils::Truncate (this->cur_count_);
01060 }
01061 
01062 // Actually put the node at its proper position relative to its
01063 // priority.
01064 
01065 template <ACE_SYNCH_DECL> int
01066 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
01067 {
01068   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
01069 
01070   if (new_item == 0)
01071     return -1;
01072 
01073   // Since this method uses enqueue_head_i() and enqueue_tail_i() for
01074   // special situations, and this method doesn't support enqueueing
01075   // chains of blocks off the 'next' pointer, make sure the new_item's
01076   // next pointer is 0.
01077   new_item->next (0);
01078 
01079   if (this->head_ == 0)
01080     // Check for simple case of an empty queue, where all we need to
01081     // do is insert <new_item> into the head.
01082     return this->enqueue_head_i (new_item);
01083   else
01084     {
01085       ACE_Message_Block *temp = 0;
01086 
01087       // Figure out where the new item goes relative to its priority.
01088       // We start looking from the lowest priority (at the tail) to
01089       // the highest priority (at the head).
01090 
01091       for (temp = this->tail_;
01092            temp != 0;
01093            temp = temp->prev ())
01094         if (temp->msg_priority () >= new_item->msg_priority ())
01095           // Break out when we've located an item that has
01096           // greater or equal priority.
01097           break;
01098 
01099       if (temp == 0)
01100         // Check for simple case of inserting at the head of the queue,
01101         // where all we need to do is insert <new_item> before the
01102         // current head.
01103         return this->enqueue_head_i (new_item);
01104       else if (temp->next () == 0)
01105         // Check for simple case of inserting at the tail of the
01106         // queue, where all we need to do is insert <new_item> after
01107         // the current tail.
01108         return this->enqueue_tail_i (new_item);
01109       else
01110         {
01111           // Insert the new message behind the message of greater or
01112           // equal priority.  This ensures that FIFO order is
01113           // maintained when messages of the same priority are
01114           // inserted consecutively.
01115           new_item->prev (temp);
01116           new_item->next (temp->next ());
01117           temp->next ()->prev (new_item);
01118           temp->next (new_item);
01119         }
01120     }
01121 
01122   // Make sure to count all the bytes in a composite message!!!
01123   new_item->total_size_and_length (this->cur_bytes_,
01124                                    this->cur_length_);
01125   ++this->cur_count_;
01126 
01127   if (this->signal_dequeue_waiters () == -1)
01128     return -1;
01129   else
01130     return ACE_Utils::Truncate (this->cur_count_);
01131 }
01132 
01133 // Actually put the node at its proper position relative to its
01134 // deadline time.
01135 
01136 template <ACE_SYNCH_DECL> int
01137 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i (ACE_Message_Block *new_item)
01138 {
01139 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
01140   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i");
01141 
01142   if (new_item == 0)
01143     return -1;
01144 
01145   // Since this method uses enqueue_head_i() and enqueue_tail_i() for
01146   // special situations, and this method doesn't support enqueueing
01147   // chains of blocks off the 'next' pointer, make sure the new_item's
01148   // next pointer is 0.
01149   new_item->next (0);
01150 
01151   if (this->head_ == 0)
01152     // Check for simple case of an empty queue, where all we need to
01153     // do is insert <new_item> into the head.
01154     return this->enqueue_head_i (new_item);
01155   else
01156     {
01157       ACE_Message_Block *temp = 0;
01158 
01159       // Figure out where the new item goes relative to its priority.
01160       // We start looking from the smallest deadline to the highest
01161       // deadline.
01162 
01163       for (temp = this->head_;
01164            temp != 0;
01165            temp = temp->next ())
01166         if (new_item->msg_deadline_time () < temp->msg_deadline_time ())
01167           // Break out when we've located an item that has
01168           // greater or equal priority.
01169           break;
01170 
01171       if (temp == 0 || temp->next () == 0)
01172         // Check for simple case of inserting at the tail of the queue,
01173         // where all we need to do is insert <new_item> after the
01174         // current tail.
01175         return this->enqueue_tail_i (new_item);
01176       else
01177         {
01178           // Insert the new message behind the message of
01179           // lesser or equal deadline time.  This ensures that FIFO order is
01180           // maintained when messages of the same priority are
01181           // inserted consecutively.
01182           new_item->prev (temp);
01183           new_item->next (temp->next ());
01184           temp->next ()->prev (new_item);
01185           temp->next (new_item);
01186         }
01187     }
01188 
01189   // Make sure to count all the bytes in a composite message!!!
01190   new_item->total_size_and_length (this->cur_bytes_,
01191                                    this->cur_length_);
01192   ++this->cur_count_;
01193 
01194   if (this->signal_dequeue_waiters () == -1)
01195     return -1;
01196   else
01197     return this->cur_count_;
01198 #else
01199   return this->enqueue_tail_i (new_item);
01200 #endif /* ACE_HAS_TIMED_MESSAGE_BLOCKS */
01201 }
01202 
01203 // Actually get the first ACE_Message_Block (no locking, so must be
01204 // called with locks held).  This method assumes that the queue has at
01205 // least one item in it when it is called.
01206 
01207 template <ACE_SYNCH_DECL> int
01208 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
01209 {
01210   if (this->head_ ==0)
01211     ACE_ERROR_RETURN ((LM_ERROR,
01212                        ACE_LIB_TEXT ("Attempting to dequeue from empty queue")),
01213                       -1);
01214   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
01215   first_item = this->head_;
01216   this->head_ = this->head_->next ();
01217 
01218   if (this->head_ == 0)
01219     this->tail_ = 0;
01220   else
01221     // The prev pointer of first message block must point to 0...
01222     this->head_->prev (0);
01223 
01224   size_t mb_bytes = 0;
01225   size_t mb_length = 0;
01226   first_item->total_size_and_length (mb_bytes,
01227                                      mb_length);
01228   // Subtract off all of the bytes associated with this message.
01229   this->cur_bytes_ -= mb_bytes;
01230   this->cur_length_ -= mb_length;
01231   --this->cur_count_;
01232 
01233   if (this->cur_count_ == 0 && this->head_ == this->tail_)
01234     this->head_ = this->tail_ = 0;
01235 
01236   // Make sure that the prev and next fields are 0!
01237   first_item->prev (0);
01238   first_item->next (0);
01239 
01240   // Only signal enqueueing threads if we've fallen below the low
01241   // water mark.
01242   if (this->cur_bytes_ <= this->low_water_mark_
01243       && this->signal_enqueue_waiters () == -1)
01244     return -1;
01245   else
01246     return ACE_Utils::Truncate (this->cur_count_);
01247 }
01248 
01249 // Get the earliest (i.e., FIFO) ACE_Message_Block with the lowest
01250 // priority (no locking, so must be called with locks held).  This
01251 // method assumes that the queue has at least one item in it when it
01252 // is called.
01253 
01254 template <ACE_SYNCH_DECL> int
01255 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio_i (ACE_Message_Block *&dequeued)
01256 {
01257   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio_i");
01258 
01259   if (this->head_ == 0)
01260     return -1;
01261 
01262   // Find the earliest (i.e., FIFO) message enqueued with the lowest
01263   // priority.
01264   ACE_Message_Block *chosen = 0;
01265   u_long priority = ULONG_MAX;
01266 
01267   for (ACE_Message_Block *temp = this->tail_;
01268        temp != 0;
01269        temp = temp->prev ())
01270     {
01271       // Find the first version of the earliest message (i.e.,
01272       // preserve FIFO order for messages at the same priority).
01273       if (temp->msg_priority () <= priority)
01274         {
01275           priority = temp->msg_priority ();
01276           chosen = temp;
01277         }
01278     }
01279 
01280   // If every message block is the same priority, pass back the first
01281   // one.
01282   if (chosen == 0)
01283     chosen = this->head_;
01284 
01285   // Patch up the queue.  If we don't have a previous then we are at
01286   // the head of the queue.
01287   if (chosen->prev () == 0)
01288     this->head_ = chosen->next ();
01289   else
01290     chosen->prev ()->next (chosen->next ());
01291 
01292   if (chosen->next () == 0)
01293     this->tail_ = chosen->prev ();
01294   else
01295     chosen->next ()->prev (chosen->prev ());
01296 
01297   // Pass back the chosen block
01298   dequeued = chosen;
01299 
01300   size_t mb_bytes = 0;
01301   size_t mb_length = 0;
01302   dequeued->total_size_and_length (mb_bytes,
01303                                    mb_length);
01304   // Subtract off all of the bytes associated with this message.
01305   this->cur_bytes_ -= mb_bytes;
01306   this->cur_length_ -= mb_length;
01307   --this->cur_count_;
01308 
01309   if (this->cur_count_ == 0 && this->head_ == this->tail_)
01310     this->head_ = this->tail_ = 0;
01311 
01312   // Make sure that the prev and next fields are 0!
01313   dequeued->prev (0);
01314   dequeued->next (0);
01315 
01316   // Only signal enqueueing threads if we've fallen below the low
01317   // water mark.
01318   if (this->cur_bytes_ <= this->low_water_mark_
01319       && this->signal_enqueue_waiters () == -1)
01320     return -1;
01321   else
01322     return ACE_Utils::Truncate (this->cur_count_);
01323 }
01324 
01325 // Actually get the last ACE_Message_Block (no locking, so must be
01326 // called with locks held).  This method assumes that the queue has at
01327 // least one item in it when it is called.
01328 
01329 template <ACE_SYNCH_DECL> int
01330 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail_i (ACE_Message_Block *&dequeued)
01331 {
01332   if (this->head_ == 0)
01333     ACE_ERROR_RETURN ((LM_ERROR,
01334                        ACE_LIB_TEXT ("Attempting to dequeue from empty queue")),
01335                       -1);
01336   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail_i");
01337   dequeued = this->tail_;
01338   if (this->tail_->prev () == 0)
01339     {
01340       this->head_ = 0;
01341       this->tail_ = 0;
01342     }
01343   else
01344     {
01345       this->tail_->prev ()->next (0);
01346       this->tail_ = this->tail_->prev ();
01347     }
01348 
01349   size_t mb_bytes = 0;
01350   size_t mb_length = 0;
01351   dequeued->total_size_and_length (mb_bytes,
01352                                    mb_length);
01353   // Subtract off all of the bytes associated with this message.
01354   this->cur_bytes_ -= mb_bytes;
01355   this->cur_length_ -= mb_length;
01356   --this->cur_count_;
01357 
01358   if (this->cur_count_ == 0 && this->head_ == this->tail_)
01359     this->head_ = this->tail_ = 0;
01360 
01361   // Make sure that the prev and next fields are 0!
01362   dequeued->prev (0);
01363   dequeued->next (0);
01364 
01365   // Only signal enqueueing threads if we've fallen below the low
01366   // water mark.
01367   if (this->cur_bytes_ <= this->low_water_mark_
01368       && this->signal_enqueue_waiters () == -1)
01369     return -1;
01370   else
01371     return ACE_Utils::Truncate (this->cur_count_);
01372 }
01373 
01374 // Actually get the ACE_Message_Block with the lowest deadline time
01375 // (no locking, so must be called with locks held).  This method assumes
01376 // that the queue has at least one item in it when it is called.
01377 
01378 template <ACE_SYNCH_DECL> int
01379 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline_i (ACE_Message_Block *&dequeued)
01380 {
01381 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
01382   if (this->head_ == 0)
01383     ACE_ERROR_RETURN ((LM_ERROR,
01384                        ACE_LIB_TEXT ("Attempting to dequeue from empty queue")),
01385                       -1);
01386   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline_i");
01387 
01388   // Find the last message enqueued with the lowest deadline time
01389   ACE_Message_Block* chosen = 0;
01390   ACE_Time_Value deadline = ACE_Time_Value::max_time;
01391   for (ACE_Message_Block *temp = this->head_; temp != 0; temp = temp->next ())
01392     if (temp->msg_deadline_time () < deadline)
01393       {
01394         deadline = temp->msg_deadline_time ();
01395         chosen = temp;
01396       }
01397 
01398   // If every message block is the same deadline time,
01399   // pass back the first one
01400   if (chosen == 0)
01401     chosen = this->head_;
01402 
01403   // Patch up the queue.  If we don't have a previous
01404   // then we are at the head of the queue.
01405   if (chosen->prev () == 0)
01406     this->head_ = chosen->next ();
01407   else
01408     chosen->prev ()->next (chosen->next ());
01409 
01410   if (chosen->next () == 0)
01411     this->tail_ = chosen->prev ();
01412   else
01413     chosen->next ()->prev (chosen->prev ());
01414 
01415   // Pass back the chosen block
01416   dequeued = chosen;
01417 
01418   size_t mb_bytes = 0;
01419   size_t mb_length = 0;
01420   dequeued->total_size_and_length (mb_bytes,
01421                                    mb_length);
01422   // Subtract off all of the bytes associated with this message.
01423   this->cur_bytes_ -= mb_bytes;
01424   this->cur_length_ -= mb_length;
01425   --this->cur_count_;
01426 
01427   if (this->cur_count_ == 0 && this->head_ == this->tail_)
01428     this->head_ = this->tail_ = 0;
01429 
01430   // Make sure that the prev and next fields are 0!
01431   dequeued->prev (0);
01432   dequeued->next (0);
01433 
01434   // Only signal enqueueing threads if we've fallen below the low
01435   // water mark.
01436   if (this->cur_bytes_ <= this->low_water_mark_
01437       && this->signal_enqueue_waiters () == -1)
01438     return -1;
01439   else
01440     return this->cur_count_;
01441 #else
01442   return this->dequeue_head_i (dequeued);
01443 #endif /* ACE_HAS_TIMED_MESSAGE_BLOCKS */
01444 }
01445 
01446 // Take a look at the first item without removing it.
01447 
01448 template <ACE_SYNCH_DECL> int
01449 ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
01450                                                      ACE_Time_Value *timeout)
01451 {
01452   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head");
01453   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01454 
01455   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01456     {
01457       errno = ESHUTDOWN;
01458       return -1;
01459     }
01460 
01461   // Wait for at least one item to become available.
01462 
01463   if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01464     return -1;
01465 
01466   first_item = this->head_;
01467   return ACE_Utils::Truncate (this->cur_count_);
01468 }
01469 
01470 template <ACE_SYNCH_DECL> int
01471 ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &,
01472                                                       ACE_Time_Value *timeout)
01473 {
01474   int result = 0;
01475 
01476   // Wait while the queue is full.
01477 
01478   while (this->is_full_i ())
01479     {
01480       if (this->not_full_cond_.wait (timeout) == -1)
01481         {
01482           if (errno == ETIME)
01483             errno = EWOULDBLOCK;
01484           result = -1;
01485           break;
01486         }
01487       if (this->state_ != ACE_Message_Queue_Base::ACTIVATED)
01488         {
01489           errno = ESHUTDOWN;
01490           result = -1;
01491           break;
01492         }
01493     }
01494   return result;
01495 }
01496 
01497 template <ACE_SYNCH_DECL> int
01498 ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond
01499     (ACE_Guard<ACE_SYNCH_MUTEX_T> &, ACE_Time_Value *timeout)
01500 {
01501   int result = 0;
01502 
01503   // Wait while the queue is empty.
01504 
01505   while (this->is_empty_i ())
01506     {
01507       if (this->not_empty_cond_.wait (timeout) == -1)
01508         {
01509           if (errno == ETIME)
01510             errno = EWOULDBLOCK;
01511           result = -1;
01512           break;
01513         }
01514       if (this->state_ != ACE_Message_Queue_Base::ACTIVATED)
01515         {
01516           errno = ESHUTDOWN;
01517           result = -1;
01518           break;
01519         }
01520     }
01521   return result;
01522 }
01523 
01524 // Block indefinitely waiting for an item to arrive, does not ignore
01525 // alerts (e.g., signals).
01526 
01527 template <ACE_SYNCH_DECL> int
01528 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
01529                                                 ACE_Time_Value *timeout)
01530 {
01531   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
01532   int queue_count = 0;
01533   {
01534     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01535 
01536     if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01537       {
01538         errno = ESHUTDOWN;
01539         return -1;
01540       }
01541 
01542     if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01543       return -1;
01544 
01545     queue_count = this->enqueue_head_i (new_item);
01546 
01547     if (queue_count == -1)
01548       return -1;
01549 
01550     this->notify ();
01551   }
01552   return queue_count;
01553 }
01554 
01555 // Enqueue an <ACE_Message_Block *> into the <Message_Queue> in
01556 // accordance with its <msg_priority> (0 is lowest priority).  Returns
01557 // -1 on failure, else the number of items still on the queue.
01558 
01559 template <ACE_SYNCH_DECL> int
01560 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item,
01561                                                 ACE_Time_Value *timeout)
01562 {
01563   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio");
01564   int queue_count = 0;
01565   {
01566     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01567 
01568     if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01569       {
01570         errno = ESHUTDOWN;
01571         return -1;
01572       }
01573 
01574     if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01575       return -1;
01576 
01577     queue_count = this->enqueue_i (new_item);
01578 
01579     if (queue_count == -1)
01580       return -1;
01581 
01582     this->notify ();
01583   }
01584   return queue_count;
01585 }
01586 
01587 // Enqueue an <ACE_Message_Block *> into the <Message_Queue> in
01588 // accordance with its <msg_deadline_time>.  Returns
01589 // -1 on failure, else the number of items still on the queue.
01590 
01591 template <ACE_SYNCH_DECL> int
01592 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline (ACE_Message_Block *new_item,
01593                                                     ACE_Time_Value *timeout)
01594 {
01595   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline");
01596   int queue_count = 0;
01597   {
01598     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01599 
01600     if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01601       {
01602         errno = ESHUTDOWN;
01603         return -1;
01604       }
01605 
01606     if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01607       return -1;
01608 
01609     queue_count = this->enqueue_deadline_i (new_item);
01610 
01611     if (queue_count == -1)
01612       return -1;
01613 
01614     this->notify ();
01615   }
01616   return queue_count;
01617 }
01618 
01619 template <ACE_SYNCH_DECL> int
01620 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item,
01621                                            ACE_Time_Value *timeout)
01622 {
01623   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue");
01624   return this->enqueue_prio (new_item, timeout);
01625 }
01626 
01627 // Block indefinitely waiting for an item to arrive,
01628 // does not ignore alerts (e.g., signals).
01629 
01630 template <ACE_SYNCH_DECL> int
01631 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
01632                                               ACE_Time_Value *timeout)
01633 {
01634   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
01635   int queue_count = 0;
01636   {
01637     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01638 
01639     if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01640       {
01641         errno = ESHUTDOWN;
01642         return -1;
01643       }
01644 
01645     if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01646       return -1;
01647 
01648     queue_count = this->enqueue_tail_i (new_item);
01649 
01650     if (queue_count == -1)
01651       return -1;
01652 
01653     this->notify ();
01654   }
01655   return queue_count;
01656 }
01657 
01658 // Remove an item from the front of the queue.  If timeout == 0 block
01659 // indefinitely (or until an alert occurs).  Otherwise, block for upto
01660 // the amount of time specified by timeout.
01661 
01662 template <ACE_SYNCH_DECL> int
01663 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
01664                                                 ACE_Time_Value *timeout)
01665 {
01666   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
01667   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01668 
01669   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01670     {
01671       errno = ESHUTDOWN;
01672       return -1;
01673     }
01674 
01675   if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01676     return -1;
01677 
01678   return this->dequeue_head_i (first_item);
01679 }
01680 
01681 // Remove item with the lowest priority from the queue.  If timeout == 0 block
01682 // indefinitely (or until an alert occurs).  Otherwise, block for upto
01683 // the amount of time specified by timeout.
01684 
01685 template <ACE_SYNCH_DECL> int
01686 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio (ACE_Message_Block *&dequeued,
01687                                                 ACE_Time_Value *timeout)
01688 {
01689   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio");
01690   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01691 
01692   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01693     {
01694       errno = ESHUTDOWN;
01695       return -1;
01696     }
01697 
01698   if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01699     return -1;
01700 
01701   return this->dequeue_prio_i (dequeued);
01702 }
01703 
01704 // Remove an item from the end of the queue.  If timeout == 0 block
01705 // indefinitely (or until an alert occurs).  Otherwise, block for upto
01706 // the amount of time specified by timeout.
01707 
01708 template <ACE_SYNCH_DECL> int
01709 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail (ACE_Message_Block *&dequeued,
01710                                                 ACE_Time_Value *timeout)
01711 {
01712   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail");
01713   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01714 
01715   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01716     {
01717       errno = ESHUTDOWN;
01718       return -1;
01719     }
01720 
01721   if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01722     return -1;
01723 
01724   return this->dequeue_tail_i (dequeued);
01725 }
01726 
01727 // Remove an item with the lowest deadline time.  If timeout == 0 block
01728 // indefinitely (or until an alert occurs).  Otherwise, block for upto
01729 // the amount of time specified by timeout.
01730 
01731 template <ACE_SYNCH_DECL> int
01732 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline (ACE_Message_Block *&dequeued,
01733                                                     ACE_Time_Value *timeout)
01734 {
01735   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline");
01736   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01737 
01738   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01739     {
01740       errno = ESHUTDOWN;
01741       return -1;
01742     }
01743 
01744   if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01745     return -1;
01746 
01747   return this->dequeue_deadline_i (dequeued);
01748 }
01749 
01750 template <ACE_SYNCH_DECL> int
01751 ACE_Message_Queue<ACE_SYNCH_USE>::notify (void)
01752 {
01753   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notify");
01754 
01755   // By default, don't do anything.
01756   if (this->notification_strategy_ == 0)
01757     return 0;
01758   else
01759     return this->notification_strategy_->notify ();
01760 }
01761 
01762 
01763 // = Initialization and termination methods.
01764 template <ACE_SYNCH_DECL>
01765 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy,
01766                                                                      size_t hwm,
01767                                                                      size_t lwm,
01768                                                                      ACE_Notification_Strategy *ns)
01769   : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
01770     pending_head_ (0),
01771     pending_tail_ (0),
01772     late_head_ (0),
01773     late_tail_ (0),
01774     beyond_late_head_ (0),
01775     beyond_late_tail_ (0),
01776     message_strategy_ (message_strategy)
01777 {
01778   // Note, the ACE_Dynamic_Message_Queue assumes full responsibility
01779   // for the passed ACE_Dynamic_Message_Strategy object, and deletes
01780   // it in its own dtor
01781 }
01782 
01783 // dtor: free message strategy and let base class dtor do the rest.
01784 
01785 template <ACE_SYNCH_DECL>
01786 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void)
01787 {
01788   delete &this->message_strategy_;
01789 }
01790 
01791 template <ACE_SYNCH_DECL> int
01792 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_messages (ACE_Message_Block *&list_head,
01793                                                            ACE_Message_Block *&list_tail,
01794                                                            u_int status_flags)
01795 {
01796   // start with an empty list
01797   list_head = 0;
01798   list_tail = 0;
01799 
01800   // Get the current time
01801   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01802 
01803   // Refresh priority status boundaries in the queue.
01804   int result = this->refresh_queue (current_time);
01805   if (result < 0)
01806     return result;
01807 
01808   if (ACE_BIT_ENABLED (status_flags,
01809                        (u_int) ACE_Dynamic_Message_Strategy::PENDING)
01810       && this->pending_head_
01811       && this->pending_tail_)
01812     {
01813       // patch up pointers for the new tail of the queue
01814       if (this->pending_head_->prev ())
01815         {
01816           this->tail_ = this->pending_head_->prev ();
01817           this->pending_head_->prev ()->next (0);
01818         }
01819       else
01820         {
01821           // the list has become empty
01822           this->head_ = 0;
01823           this->tail_ = 0;
01824         }
01825 
01826       // point to the head and tail of the list
01827       list_head = this->pending_head_;
01828       list_tail = this->pending_tail_;
01829 
01830       // cut the pending messages out of the queue entirely
01831       this->pending_head_->prev (0);
01832       this->pending_head_ = 0;
01833       this->pending_tail_ = 0;
01834     }
01835 
01836   if (ACE_BIT_ENABLED (status_flags,
01837                        (u_int) ACE_Dynamic_Message_Strategy::LATE)
01838       && this->late_head_
01839       && this->late_tail_)
01840     {
01841       // Patch up pointers for the (possibly) new head and tail of the
01842       // queue.
01843       if (this->late_tail_->next ())
01844         this->late_tail_->next ()->prev (this->late_head_->prev ());
01845       else
01846         this->tail_ = this->late_head_->prev ();
01847 
01848       if (this->late_head_->prev ())
01849         this->late_head_->prev ()->next (this->late_tail_->next ());
01850       else
01851         this->head_ = this->late_tail_->next ();
01852 
01853       // put late messages behind pending messages (if any) being returned
01854       this->late_head_->prev (list_tail);
01855       if (list_tail)
01856         list_tail->next (this->late_head_);
01857       else
01858         list_head = this->late_head_;
01859 
01860       list_tail = this->late_tail_;
01861 
01862       this->late_tail_->next (0);
01863       this->late_head_ = 0;
01864       this->late_tail_ = 0;
01865     }
01866 
01867   if (ACE_BIT_ENABLED (status_flags,
01868       (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE)
01869       && this->beyond_late_head_
01870       && this->beyond_late_tail_)
01871     {
01872       // Patch up pointers for the new tail of the queue
01873       if (this->beyond_late_tail_->next ())
01874         {
01875           this->head_ = this->beyond_late_tail_->next ();
01876           this->beyond_late_tail_->next ()->prev (0);
01877         }
01878       else
01879         {
01880           // the list has become empty
01881           this->head_ = 0;
01882           this->tail_ = 0;
01883         }
01884 
01885       // Put beyond late messages at the end of the list being
01886       // returned.
01887       if (list_tail)
01888         {
01889           this->beyond_late_head_->prev (list_tail);
01890           list_tail->next (this->beyond_late_head_);
01891         }
01892       else
01893         list_head = this->beyond_late_head_;
01894 
01895       list_tail = this->beyond_late_tail_;
01896 
01897       this->beyond_late_tail_->next (0);
01898       this->beyond_late_head_ = 0;
01899       this->beyond_late_tail_ = 0;
01900     }
01901 
01902   // Decrement message and size counts for removed messages.
01903   ACE_Message_Block *temp1;
01904 
01905   for (temp1 = list_head;
01906        temp1 != 0;
01907        temp1 = temp1->next ())
01908     {
01909       --this->cur_count_;
01910 
01911       size_t mb_bytes = 0;
01912       size_t mb_length = 0;
01913       temp1->total_size_and_length (mb_bytes,
01914                                     mb_length);
01915       // Subtract off all of the bytes associated with this message.
01916       this->cur_bytes_ -= mb_bytes;
01917       this->cur_length_ -= mb_length;
01918     }
01919 
01920   return result;
01921 }
01922 
01923 // Detach all messages with status given in the passed flags from the
01924 // queue and return them by setting passed head and tail pointers to
01925 // the linked list they comprise.  This method is intended primarily
01926 // as a means of periodically harvesting messages that have missed
01927 // their deadlines, but is available in its most general form.  All
01928 // messages are returned in priority order, from head to tail, as of
01929 // the time this method was called.
01930 
01931 template <ACE_SYNCH_DECL> int
01932 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
01933                                                         ACE_Time_Value *timeout)
01934 {
01935   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
01936 
01937   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01938 
01939   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01940     {
01941       errno = ESHUTDOWN;
01942       return -1;
01943     }
01944 
01945   int result;
01946 
01947   // get the current time
01948   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01949 
01950   // refresh priority status boundaries in the queue
01951   result = this->refresh_queue (current_time);
01952   if (result < 0)
01953     return result;
01954 
01955   // *now* it's appropriate to wait for an enqueued item
01956   result = this->wait_not_empty_cond (ace_mon, timeout);
01957   if (result == -1)
01958     return result;
01959 
01960   // call the internal dequeue method, which selects an item from the
01961   // highest priority status portion of the queue that has messages
01962   // enqueued.
01963   result = this->dequeue_head_i (first_item);
01964 
01965   return result;
01966 }
01967 
01968 // Dequeue and return the <ACE_Message_Block *> at the (logical) head
01969 // of the queue.
01970 
01971 template <ACE_SYNCH_DECL> void
01972 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump (void) const
01973 {
01974 #if defined (ACE_HAS_DUMP)
01975   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump");
01976   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
01977 
01978   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n")));
01979   this->ACE_Message_Queue<ACE_SYNCH_USE>::dump ();
01980 
01981   ACE_DEBUG ((LM_DEBUG,
01982               ACE_LIB_TEXT ("pending_head_ = %u\n")
01983               ACE_LIB_TEXT ("pending_tail_ = %u\n")
01984               ACE_LIB_TEXT ("late_head_ = %u\n")
01985               ACE_LIB_TEXT ("late_tail_ = %u\n")
01986               ACE_LIB_TEXT ("beyond_late_head_ = %u\n")
01987               ACE_LIB_TEXT ("beyond_late_tail_ = %u\n"),
01988               this->pending_head_,
01989               this->pending_tail_,
01990               this->late_head_,
01991               this->late_tail_,
01992               this->beyond_late_head_,
01993               this->beyond_late_tail_));
01994 
01995   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("message_strategy_ : \n")));
01996   message_strategy_.dump ();
01997 
01998   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
01999 #endif /* ACE_HAS_DUMP */
02000 }
02001   // dump the state of the queue
02002 
02003 template <ACE_SYNCH_DECL> int
02004 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
02005 {
02006   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
02007 
02008   if (new_item == 0)
02009     return -1;
02010 
02011   int result = 0;
02012 
02013   // Get the current time.
02014   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02015 
02016   // Refresh priority status boundaries in the queue.
02017 
02018   result = this->refresh_queue (current_time);
02019   if (result < 0)
02020     return result;
02021 
02022   // Where we enqueue depends on the message's priority status.
02023   switch (message_strategy_.priority_status (*new_item,
02024                                              current_time))
02025     {
02026     case ACE_Dynamic_Message_Strategy::PENDING:
02027       if (this->pending_tail_ == 0)
02028         {
02029           // Check for simple case of an empty pending queue, where
02030           // all we need to do is insert <new_item> into the tail of
02031           // the queue.
02032           pending_head_ = new_item;
02033           pending_tail_ = pending_head_;
02034           return this->enqueue_tail_i (new_item);
02035         }
02036       else
02037         {
02038           // Enqueue the new message in priority order in the pending
02039           // sublist
02040           result = sublist_enqueue_i (new_item,
02041                                       current_time,
02042                                       this->pending_head_,
02043                                       this->pending_tail_,
02044                                       ACE_Dynamic_Message_Strategy::PENDING);
02045         }
02046       break;
02047 
02048     case ACE_Dynamic_Message_Strategy::LATE:
02049       if (this->late_tail_ == 0)
02050         {
02051           late_head_ = new_item;
02052           late_tail_ = late_head_;
02053 
02054           if (this->pending_head_ == 0)
02055             // Check for simple case of an empty pending queue,
02056             // where all we need to do is insert <new_item> into the
02057             // tail of the queue.
02058             return this->enqueue_tail_i (new_item);
02059           else if (this->beyond_late_tail_ == 0)
02060             // Check for simple case of an empty beyond late queue, where all
02061             // we need to do is insert <new_item> into the head of the queue.
02062             return this->enqueue_head_i (new_item);
02063           else
02064             {
02065               // Otherwise, we can just splice the new message in
02066               // between the pending and beyond late portions of the
02067               // queue.
02068               this->beyond_late_tail_->next (new_item);
02069               new_item->prev (this->beyond_late_tail_);
02070               this->pending_head_->prev (new_item);
02071               new_item->next (this->pending_head_);
02072             }
02073         }
02074       else
02075         {
02076           // Enqueue the new message in priority order in the late
02077           // sublist
02078           result = sublist_enqueue_i (new_item,
02079                                       current_time,
02080                                       this->late_head_,
02081                                       this->late_tail_,
02082                                       ACE_Dynamic_Message_Strategy::LATE);
02083         }
02084       break;
02085 
02086     case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02087       if (this->beyond_late_tail_ == 0)
02088         {
02089           // Check for simple case of an empty beyond late queue,
02090           // where all we need to do is insert <new_item> into the
02091           // head of the queue.
02092           beyond_late_head_ = new_item;
02093           beyond_late_tail_ = beyond_late_head_;
02094           return this->enqueue_head_i (new_item);
02095         }
02096       else
02097         {
02098           // all beyond late messages have the same (zero) priority,
02099           // so just put the new one at the end of the beyond late
02100           // messages
02101           if (this->beyond_late_tail_->next ())
02102             this->beyond_late_tail_->next ()->prev (new_item);
02103           else
02104             this->tail_ = new_item;
02105 
02106           new_item->next (this->beyond_late_tail_->next ());
02107           this->beyond_late_tail_->next (new_item);
02108           new_item->prev (this->beyond_late_tail_);
02109           this->beyond_late_tail_ = new_item;
02110         }
02111 
02112       break;
02113 
02114       // should never get here, but just in case...
02115     default:
02116       result = -1;
02117       break;
02118     }
02119 
02120   if (result < 0)
02121     return result;
02122 
02123   size_t mb_bytes = 0;
02124   size_t mb_length = 0;
02125   new_item->total_size_and_length (mb_bytes,
02126                                    mb_length);
02127   this->cur_bytes_ += mb_bytes;
02128   this->cur_length_ += mb_length;
02129   ++this->cur_count_;
02130 
02131   if (this->signal_dequeue_waiters () == -1)
02132     return -1;
02133   else
02134     return this->cur_count_;
02135 }
02136 
02137 // Enqueue an <ACE_Message_Block *> in accordance with its priority.
02138 // priority may be *dynamic* or *static* or a combination or *both* It
02139 // calls the priority evaluation function passed into the Dynamic
02140 // Message Queue constructor to update the priorities of all enqueued
02141 // messages.
02142 
02143 template <ACE_SYNCH_DECL> int
02144 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::sublist_enqueue_i (ACE_Message_Block *new_item,
02145                                                              const ACE_Time_Value &current_time,
02146                                                              ACE_Message_Block *&sublist_head,
02147                                                              ACE_Message_Block *&sublist_tail,
02148                                                              ACE_Dynamic_Message_Strategy::Priority_Status status)
02149 {
02150   int result = 0;
02151   ACE_Message_Block *current_item = 0;
02152 
02153   // Find message after which to enqueue new item, based on message
02154   // priority and priority status.
02155   for (current_item = sublist_tail;
02156        current_item;
02157        current_item = current_item->prev ())
02158     {
02159       if (message_strategy_.priority_status (*current_item, current_time) == status)
02160         {
02161           if (current_item->msg_priority () >= new_item->msg_priority ())
02162             break;
02163         }
02164       else
02165         {
02166           sublist_head = new_item;
02167           break;
02168         }
02169     }
02170 
02171   if (current_item == 0)
02172     {
02173       // If the new message has highest priority of any, put it at the
02174       // head of the list (and sublist).
02175       new_item->prev (0);
02176       new_item->next (this->head_);
02177       if (this->head_ != 0)
02178         this->head_->prev (new_item);
02179       else
02180         {
02181           this->tail_ = new_item;
02182           sublist_tail = new_item;
02183         }
02184       this->head_ = new_item;
02185       sublist_head = new_item;
02186     }
02187   else
02188     {
02189       // insert the new item into the list
02190       new_item->next (current_item->next ());
02191       new_item->prev (current_item);
02192 
02193       if (current_item->next ())
02194         current_item->next ()->prev (new_item);
02195       else
02196         this->tail_ = new_item;
02197 
02198       current_item->next (new_item);
02199 
02200       // If the new item has lowest priority of any in the sublist,
02201       // move the tail pointer of the sublist back to the new item
02202       if (current_item == sublist_tail)
02203         sublist_tail = new_item;
02204     }
02205 
02206   return result;
02207 }
02208 
02209 // Enqueue a message in priority order within a given priority status
02210 // sublist.
02211 
02212 template <ACE_SYNCH_DECL> int
02213 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
02214 {
02215   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
02216 
02217   int result = 0;
02218   int last_in_subqueue = 0;
02219 
02220   // first, try to dequeue from the head of the pending list
02221   if (this->pending_head_)
02222     {
02223       first_item = this->pending_head_;
02224 
02225       if (0 == this->pending_head_->prev ())
02226         this->head_ = this->pending_head_->next ();
02227       else
02228         this->pending_head_->prev ()->next (this->pending_head_->next ());
02229 
02230       if (0 == this->pending_head_->next ())
02231         {
02232           this->tail_ = this->pending_head_->prev ();
02233           this->pending_head_ = 0;
02234           this->pending_tail_ = 0;
02235         }
02236       else
02237         {
02238           this->pending_head_->next ()->prev (this->pending_head_->prev ());
02239           this->pending_head_ = this->pending_head_->next ();
02240         }
02241 
02242       first_item->prev (0);
02243       first_item->next (0);
02244     }
02245 
02246   // Second, try to dequeue from the head of the late list
02247   else if (this->late_head_)
02248     {
02249       last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0;
02250 
02251       first_item = this->late_head_;
02252 
02253       if (0 == this->late_head_->prev ())
02254         this->head_ = this->late_head_->next ();
02255       else
02256         this->late_head_->prev ()->next (this->late_head_->next ());
02257 
02258       if (0 == this->late_head_->next ())
02259         this->tail_ = this->late_head_->prev ();
02260       else
02261         {
02262           this->late_head_->next ()->prev (this->late_head_->prev ());
02263           this->late_head_ = this->late_head_->next ();
02264         }
02265 
02266       if (last_in_subqueue)
02267         {
02268           this->late_head_ = 0;
02269           this->late_tail_ = 0;
02270         }
02271 
02272       first_item->prev (0);
02273       first_item->next (0);
02274     }
02275   // finally, try to dequeue from the head of the beyond late list
02276   else if (this->beyond_late_head_)
02277     {
02278       last_in_subqueue =
02279         (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0;
02280 
02281       first_item = this->beyond_late_head_;
02282       this->head_ = this->beyond_late_head_->next ();
02283 
02284       if (0 == this->beyond_late_head_->next ())
02285         this->tail_ = this->beyond_late_head_->prev ();
02286       else
02287         {
02288           this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ());
02289           this->beyond_late_head_ = this->beyond_late_head_->next ();
02290         }
02291 
02292       if (last_in_subqueue)
02293         {
02294           this->beyond_late_head_ = 0;
02295           this->beyond_late_tail_ = 0;
02296         }
02297 
02298       first_item->prev (0);
02299       first_item->next (0);
02300     }
02301   else
02302     {
02303       // nothing to dequeue: set the pointer to zero and return an error code
02304       first_item = 0;
02305       result = -1;
02306     }
02307 
02308   if (result < 0)
02309     return result;
02310 
02311   size_t mb_bytes = 0;
02312   size_t mb_length = 0;
02313   first_item->total_size_and_length (mb_bytes,
02314                                      mb_length);
02315   // Subtract off all of the bytes associated with this message.
02316   this->cur_bytes_ -= mb_bytes;
02317   this->cur_length_ -= mb_length;
02318   --this->cur_count_;
02319 
02320   // Only signal enqueueing threads if we've fallen below the low
02321   // water mark.
02322   if (this->cur_bytes_ <= this->low_water_mark_
02323       && this->signal_enqueue_waiters () == -1)
02324     return -1;
02325   else
02326     return this->cur_count_;
02327 }
02328 
02329 // Dequeue and return the <ACE_Message_Block *> at the head of the
02330 // logical queue.  Attempts first to dequeue from the pending portion
02331 // of the queue, or if that is empty from the late portion, or if that
02332 // is empty from the beyond late portion, or if that is empty just
02333 // sets the passed pointer to zero and returns -1.
02334 
02335 template <ACE_SYNCH_DECL> int
02336 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value &current_time)
02337 {
02338   int result;
02339 
02340   result = refresh_pending_queue (current_time);
02341 
02342   if (result != -1)
02343     result = refresh_late_queue (current_time);
02344 
02345   return result;
02346 }
02347 
02348 // Refresh the queue using the strategy specific priority status
02349 // function.
02350 
02351 template <ACE_SYNCH_DECL> int
02352 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_pending_queue (const ACE_Time_Value &current_time)
02353 {
02354   ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02355 
02356   // refresh priority status boundaries in the queue
02357   if (this->pending_head_)
02358     {
02359       current_status = message_strategy_.priority_status (*this->pending_head_,
02360                                                           current_time);
02361       switch (current_status)
02362         {
02363         case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02364           // Make sure the head of the beyond late queue is set (there
02365           // may not have been any beyond late messages previously)
02366           this->beyond_late_head_ = this->head_;
02367 
02368           // Zero out the late queue pointers, and set them only if
02369           // there turn out to be late messages in the pending sublist
02370           this->late_head_ = 0;
02371           this->late_tail_ = 0;
02372 
02373           // Advance through the beyond late messages in the pending queue
02374           do
02375             {
02376               this->pending_head_ = this->pending_head_->next ();
02377 
02378               if (this->pending_head_)
02379                 current_status = message_strategy_.priority_status (*this->pending_head_,
02380                                                                     current_time);
02381               else
02382                 break;  // do while
02383 
02384             }
02385           while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02386 
02387           if (this->pending_head_)
02388             {
02389               // point tail of beyond late sublist to previous item
02390               this->beyond_late_tail_ = this->pending_head_->prev ();
02391 
02392               if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02393                 // there are no late messages left in the queue
02394                 break; // switch
02395               else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02396                 {
02397                   // if we got here, something is *seriously* wrong with the queue
02398                   ACE_ERROR_RETURN ((LM_ERROR,
02399                                      ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02400                                      (int) current_status),
02401                                     -1);
02402                 }
02403               /* FALLTHRU */
02404             }
02405           else
02406             {
02407               // There are no pending or late messages left in the
02408               // queue.
02409               this->beyond_late_tail_ = this->tail_;
02410               this->pending_head_ = 0;
02411               this->pending_tail_ = 0;
02412               break; // switch
02413             }
02414 
02415         case ACE_Dynamic_Message_Strategy::LATE:
02416           // Make sure the head of the late queue is set (there may
02417           // not have been any late messages previously, or they may
02418           // have all become beyond late).
02419           if (this->late_head_ == 0)
02420             this->late_head_ = this->pending_head_;
02421 
02422           // advance through the beyond late messages in the pending queue
02423           do
02424             {
02425               this->pending_head_ = this->pending_head_->next ();
02426 
02427               if (this->pending_head_)
02428                 current_status = message_strategy_.priority_status (*this->pending_head_,
02429                                                                     current_time);
02430               else
02431                 break;  // do while
02432 
02433             }
02434           while (current_status == ACE_Dynamic_Message_Strategy::LATE);
02435 
02436           if (this->pending_head_)
02437             {
02438               if (current_status != ACE_Dynamic_Message_Strategy::PENDING)
02439                 // if we got here, something is *seriously* wrong with the queue
02440                 ACE_ERROR_RETURN((LM_ERROR,
02441                                   ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected PENDING)"),
02442                                   (int) current_status),
02443                                  -1);
02444 
02445               // Point tail of late sublist to previous item
02446               this->late_tail_ = this->pending_head_->prev ();
02447             }
02448           else
02449             {
02450               // there are no pending messages left in the queue
02451               this->late_tail_ = this->tail_;
02452               this->pending_head_ = 0;
02453               this->pending_tail_ = 0;
02454             }
02455 
02456           break; // switch
02457         case ACE_Dynamic_Message_Strategy::PENDING:
02458           // do nothing - the pending queue is unchanged
02459           break; // switch
02460         default:
02461           // if we got here, something is *seriously* wrong with the queue
02462           ACE_ERROR_RETURN((LM_ERROR,
02463                             ACE_LIB_TEXT ("Unknown message priority status [%d]"),
02464                             (int) current_status),
02465                            -1);
02466         }
02467     }
02468   return 0;
02469 }
02470 
02471 // Refresh the pending queue using the strategy specific priority
02472 // status function.
02473 
02474 template <ACE_SYNCH_DECL> int
02475 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_late_queue (const ACE_Time_Value &current_time)
02476 {
02477   ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02478 
02479   if (this->late_head_)
02480     {
02481       current_status = message_strategy_.priority_status (*this->late_head_,
02482                                                           current_time);
02483       switch (current_status)
02484         {
02485         case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02486 
02487           // make sure the head of the beyond late queue is set
02488           // (there may not have been any beyond late messages previously)
02489           this->beyond_late_head_ = this->head_;
02490 
02491           // advance through the beyond late messages in the late queue
02492           do
02493             {
02494               this->late_head_ = this->late_head_->next ();
02495 
02496               if (this->late_head_)
02497                 current_status = message_strategy_.priority_status (*this->late_head_,
02498                                                                     current_time);
02499               else
02500                 break;  // do while
02501 
02502             }
02503           while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02504 
02505           if (this->late_head_)
02506             {
02507               // point tail of beyond late sublist to previous item
02508               this->beyond_late_tail_ = this->late_head_->prev ();
02509 
02510               if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02511                 {
02512                   // there are no late messages left in the queue
02513                   this->late_head_ = 0;
02514                   this->late_tail_ = 0;
02515                 }
02516               else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02517                 // if we got here, something is *seriously* wrong with the queue
02518                 ACE_ERROR_RETURN ((LM_ERROR,
02519                                    ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02520                                    (int) current_status),
02521                                   -1);
02522             }
02523           else
02524             {
02525               // there are no late messages left in the queue
02526               this->beyond_late_tail_ = this->tail_;
02527               this->late_head_ = 0;
02528               this->late_tail_ = 0;
02529             }
02530 
02531           break;  // switch
02532 
02533         case ACE_Dynamic_Message_Strategy::LATE:
02534           // do nothing - the late queue is unchanged
02535           break; // switch
02536 
02537         case ACE_Dynamic_Message_Strategy::PENDING:
02538           // if we got here, something is *seriously* wrong with the queue
02539           ACE_ERROR_RETURN ((LM_ERROR,
02540                              ACE_LIB_TEXT ("Unexpected message priority status ")
02541                              ACE_LIB_TEXT ("[%d] (expected LATE or BEYOND_LATE)"),
02542                              (int) current_status),
02543                             -1);
02544         default:
02545           // if we got here, something is *seriously* wrong with the queue
02546           ACE_ERROR_RETURN ((LM_ERROR,
02547                              ACE_LIB_TEXT ("Unknown message priority status [%d]"),
02548                              (int) current_status),
02549                             -1);
02550         }
02551     }
02552 
02553   return 0;
02554 }
02555 
02556 // Refresh the late queue using the strategy specific priority status
02557 // function.
02558 
02559 template <ACE_SYNCH_DECL> int
02560 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
02561                                                              ACE_Time_Value *timeout)
02562 {
02563   return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item,
02564                                                               timeout);
02565 }
02566 
02567 // Private method to hide public base class method: just calls base
02568 // class method.
02569 
02570 template <ACE_SYNCH_DECL> int
02571 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
02572                                                         ACE_Time_Value *timeout)
02573 {
02574   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
02575   return this->enqueue_prio (new_item, timeout);
02576 }
02577 
02578 // Just call priority enqueue method: tail enqueue semantics for
02579 // dynamic message queues are unstable: the message may or may not be
02580 // where it was placed after the queue is refreshed prior to the next
02581 // enqueue or dequeue operation.
02582 
02583 template <ACE_SYNCH_DECL> int
02584 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
02585                                                         ACE_Time_Value *timeout)
02586 {
02587   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
02588   return this->enqueue_prio (new_item, timeout);
02589 }
02590 
02591 // Just call priority enqueue method: head enqueue semantics for
02592 // dynamic message queues are unstable: the message may or may not be
02593 // where it was placed after the queue is refreshed prior to the next
02594 // enqueue or dequeue operation.
02595 
02596 template <ACE_SYNCH_DECL>
02597 ACE_Message_Queue<ACE_SYNCH_USE> *
02598 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_static_message_queue (size_t hwm,
02599                                                                        size_t lwm,
02600                                                                        ACE_Notification_Strategy *ns)
02601 {
02602   ACE_Message_Queue<ACE_SYNCH_USE> *tmp = 0;
02603 
02604   ACE_NEW_RETURN (tmp,
02605                   ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
02606                   0);
02607   return tmp;
02608 }
02609 
02610 // Factory method for a statically prioritized ACE_Message_Queue.
02611 
02612 template <ACE_SYNCH_DECL>
02613 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
02614 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t hwm,
02615                                                                          size_t lwm,
02616                                                                          ACE_Notification_Strategy *ns,
02617                                                                          u_long static_bit_field_mask,
02618                                                                          u_long static_bit_field_shift,
02619                                                                          u_long dynamic_priority_max,
02620                                                                          u_long dynamic_priority_offset)
02621 {
02622   ACE_Deadline_Message_Strategy *adms = 0;
02623 
02624   ACE_NEW_RETURN (adms,
02625                   ACE_Deadline_Message_Strategy (static_bit_field_mask,
02626                                                  static_bit_field_shift,
02627                                                  dynamic_priority_max,
02628                                                  dynamic_priority_offset),
02629                   0);
02630 
02631   ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *tmp = 0;
02632   ACE_NEW_RETURN (tmp,
02633                   ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adms, hwm, lwm, ns),
02634                   0);
02635   return tmp;
02636 }
02637 
02638 // Factory method for a dynamically prioritized (by time to deadline)
02639 // ACE_Dynamic_Message_Queue.
02640 
02641 template <ACE_SYNCH_DECL>
02642 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
02643 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hwm,
02644                                                                        size_t lwm,
02645                                                                        ACE_Notification_Strategy *ns,
02646                                                                        u_long static_bit_field_mask,
02647                                                                        u_long static_bit_field_shift,
02648                                                                        u_long dynamic_priority_max,
02649                                                                        u_long dynamic_priority_offset)
02650 {
02651   ACE_Laxity_Message_Strategy *alms = 0;
02652 
02653   ACE_NEW_RETURN (alms,
02654                   ACE_Laxity_Message_Strategy (static_bit_field_mask,
02655                                                static_bit_field_shift,
02656                                                dynamic_priority_max,
02657                                                dynamic_priority_offset),
02658                   0);
02659 
02660   ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *tmp = 0;
02661   ACE_NEW_RETURN (tmp,
02662                   ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alms, hwm, lwm, ns),
02663                   0);
02664   return tmp;
02665 }
02666 
02667 // Factory method for a dynamically prioritized (by laxity)
02668 // <ACE_Dynamic_Message_Queue>.
02669 
02670 #if defined (ACE_VXWORKS)
02671 
02672 template <ACE_SYNCH_DECL>
02673 ACE_Message_Queue_Vx *
02674 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_Vx_message_queue (size_t max_messages,
02675                                                                    size_t max_message_length,
02676                                                                    ACE_Notification_Strategy *ns)
02677 {
02678   ACE_Message_Queue_Vx *tmp = 0;
02679 
02680   ACE_NEW_RETURN (tmp,
02681                   ACE_Message_Queue_Vx (max_messages, max_message_length, ns),
02682                   0);
02683   return tmp;
02684 }
02685   // factory method for a wrapped VxWorks message queue
02686 
02687 #if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0)
02688 
02689 template <ACE_SYNCH_DECL>
02690 ACE_Message_Queue_NT *
02691 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_NT_message_queue (size_t max_threads)
02692 {
02693   ACE_Message_Queue_NT *tmp = 0;
02694 
02695   ACE_NEW_RETURN (tmp,
02696                   ACE_Message_Queue_NT (max_threads);
02697                   0);
02698   return tmp;
02699 }
02700 
02701 #endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */
02702 #endif /* defined (ACE_VXWORKS) */
02703 
02704 ACE_END_VERSIONED_NAMESPACE_DECL
02705 
02706 #endif /* !ACE_MESSAGE_QUEUE_T_CPP */

Generated on Thu Nov 9 09:41:56 2006 for ACE by doxygen 1.3.6