Message_Queue_T.cpp

Go to the documentation of this file.
00001 // $Id: Message_Queue_T.cpp 79134 2007-07-31 18:23:50Z johnnyw $
00002 
00003 #ifndef ACE_MESSAGE_QUEUE_T_CPP
00004 #define ACE_MESSAGE_QUEUE_T_CPP
00005 
00006 // #include Message_Queue.h instead of Message_Queue_T.h to avoid
00007 // circular include problems.
00008 #include "ace/Message_Queue.h"
00009 #include "ace/Log_Msg.h"
00010 #include "ace/OS_NS_sys_time.h"
00011 
00012 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00013 # pragma once
00014 #endif /* ACE_LACKS_PRAGMA_ONCE */
00015 
00016 #include "ace/Notification_Strategy.h"
00017 #include "ace/Truncate.h"
00018 
00019 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00020 
00021 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue)
00022 ACE_ALLOC_HOOK_DEFINE(ACE_Dynamic_Message_Queue)
00023 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Ex)
00024 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Ex_N)
00025 
00026 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00027 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump (void) const
00028 {
00029 #if defined (ACE_HAS_DUMP)
00030   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump");
00031 
00032   this->queue_.dump ();
00033 #endif /* ACE_HAS_DUMP */
00034 }
00035 
00036 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00037 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes (size_t new_value)
00038 {
00039   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes");
00040 
00041   this->queue_.message_bytes (new_value);
00042 }
00043 
00044 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00045 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length (size_t new_value)
00046 {
00047   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length");
00048 
00049   this->queue_.message_length (new_value);
00050 }
00051 
00052 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00053 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex (size_t hwm,
00054                                                                              size_t lwm,
00055                                                                              ACE_Notification_Strategy *ns)
00056 {
00057   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex");
00058 
00059   if (this->queue_.open (hwm, lwm, ns) == -1)
00060     ACE_ERROR ((LM_ERROR,
00061                 ACE_TEXT ("ACE_Message_Queue_Ex")));
00062 }
00063 
00064 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00065 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex (void)
00066 {
00067   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex");
00068 }
00069 
00070 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00071 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::open (size_t hwm,
00072                                                              size_t lwm,
00073                                                              ACE_Notification_Strategy *ns)
00074 {
00075   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::open");
00076 
00077   return this->queue_.open (hwm, lwm, ns);
00078 }
00079 
00080 // Clean up the queue if we have not already done so!
00081 
00082 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00083 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::close (void)
00084 {
00085   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::close");
00086 
00087   return this->queue_.close ();
00088 }
00089 
00090 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00091 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush (void)
00092 {
00093   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush");
00094 
00095   return this->queue_.flush ();
00096 }
00097 
00098 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00099 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush_i (void)
00100 {
00101   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush_i");
00102 
00103   return this->queue_.flush_i ();
00104 }
00105 
00106 // Take a look at the first item without removing it.
00107 
00108 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00109 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::peek_dequeue_head (ACE_MESSAGE_TYPE *&first_item,
00110                                                                           ACE_Time_Value *timeout)
00111 {
00112   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::peek_dequeue_head");
00113 
00114   ACE_Message_Block *mb = 0;
00115 
00116   int const cur_count = this->queue_.peek_dequeue_head (mb, timeout);
00117 
00118   if (cur_count != -1)
00119     first_item  = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00120 
00121   return cur_count;
00122 }
00123 
00124 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00125 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head (ACE_MESSAGE_TYPE *new_item,
00126                                                                      ACE_Time_Value *timeout)
00127 {
00128   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head");
00129 
00130   ACE_Message_Block *mb = 0;
00131 
00132   ACE_NEW_RETURN (mb,
00133                   ACE_Message_Block ((char *) new_item,
00134                                      sizeof (*new_item),
00135                                      ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00136                   -1);
00137 
00138   int const result = this->queue_.enqueue_head (mb, timeout);
00139   if (result == -1)
00140     // Zap the message.
00141     mb->release ();
00142   return result;
00143 }
00144 
00145 // Enqueue an <ACE_MESSAGE_TYPE *> into the <Message_Queue> in
00146 // accordance with its <msg_priority> (0 is lowest priority).  Returns
00147 // -1 on failure, else the number of items still on the queue.
00148 
00149 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00150 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue (ACE_MESSAGE_TYPE *new_item,
00151                                                                 ACE_Time_Value *timeout)
00152 {
00153   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue");
00154 
00155   return this->enqueue_prio (new_item, timeout);
00156 }
00157 
00158 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00159 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio (ACE_MESSAGE_TYPE *new_item,
00160                                                                      ACE_Time_Value *timeout,
00161                                                                      unsigned long priority)
00162 {
00163   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio");
00164 
00165   ACE_Message_Block *mb = 0;
00166 
00167   ACE_NEW_RETURN (mb,
00168                   ACE_Message_Block ((char *) new_item,
00169                                      sizeof (*new_item),
00170                                      priority),
00171                   -1);
00172 
00173   int const result = this->queue_.enqueue_prio (mb, timeout);
00174   if (result == -1)
00175     // Zap the message.
00176     mb->release ();
00177 
00178   return result;
00179 }
00180 
00181 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00182 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_deadline (ACE_MESSAGE_TYPE *new_item,
00183                                                                          ACE_Time_Value *timeout)
00184 {
00185   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_deadline");
00186 
00187   ACE_Message_Block *mb = 0;
00188 
00189   ACE_NEW_RETURN (mb,
00190                   ACE_Message_Block ((char *) new_item,
00191                                      sizeof (*new_item),
00192                                      ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY ),
00193                   -1);
00194 
00195   int const result = this->queue_.enqueue_deadline (mb, timeout);
00196   if (result == -1)
00197     // Zap the message.
00198     mb->release ();
00199 
00200   return result;
00201 }
00202 
00203 // Block indefinitely waiting for an item to arrive,
00204 // does not ignore alerts (e.g., signals).
00205 
00206 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00207 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail (ACE_MESSAGE_TYPE *new_item,
00208                                                                      ACE_Time_Value *timeout)
00209 {
00210   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail");
00211 
00212   ACE_Message_Block *mb = 0;
00213 
00214   ACE_NEW_RETURN (mb,
00215                   ACE_Message_Block ((char *) new_item,
00216                                      sizeof (*new_item),
00217                                      ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00218                   -1);
00219 
00220   int const result = this->queue_.enqueue_tail (mb, timeout);
00221   if (result == -1)
00222     // Zap the message.
00223     mb->release ();
00224   return result;
00225 }
00226 
00227 // Remove an item from the front of the queue.  If timeout == 0 block
00228 // indefinitely (or until an alert occurs).  Otherwise, block for upto
00229 // the amount of time specified by timeout.
00230 
00231 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00232 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_head (ACE_MESSAGE_TYPE *&first_item,
00233                                                                      ACE_Time_Value *timeout)
00234 {
00235   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_head");
00236 
00237   ACE_Message_Block *mb = 0;
00238 
00239   int const cur_count = this->queue_.dequeue_head (mb, timeout);
00240 
00241   // Dequeue the message.
00242   if (cur_count != -1)
00243     {
00244       first_item = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00245       // Delete the message block.
00246       mb->release ();
00247     }
00248 
00249   return cur_count;
00250 }
00251 
00252 // Remove the item with the lowest priority from the queue.  If timeout == 0
00253 // block indefinitely (or until an alert occurs).  Otherwise, block for upto
00254 // the amount of time specified by timeout.
00255 
00256 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00257 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_prio (ACE_MESSAGE_TYPE *&dequeued,
00258                                                                      ACE_Time_Value *timeout)
00259 {
00260   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_prio");
00261 
00262   ACE_Message_Block *mb = 0;
00263 
00264   int const cur_count = this->queue_.dequeue_prio (mb, timeout);
00265 
00266   // Dequeue the message.
00267   if (cur_count != -1)
00268     {
00269       dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00270       // Delete the message block.
00271       mb->release ();
00272     }
00273 
00274   return cur_count;
00275 }
00276 
00277 // Remove an item from the end of the queue.  If timeout == 0 block
00278 // indefinitely (or until an alert occurs).  Otherwise, block for upto
00279 // the amount of time specified by timeout.
00280 
00281 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00282 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_tail (ACE_MESSAGE_TYPE *&dequeued,
00283                                                                      ACE_Time_Value *timeout)
00284 {
00285   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_tail");
00286 
00287   ACE_Message_Block *mb = 0;
00288 
00289   int const cur_count = this->queue_.dequeue_tail (mb, timeout);
00290 
00291   // Dequeue the message.
00292   if (cur_count != -1)
00293     {
00294       dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00295       // Delete the message block.
00296       mb->release ();
00297     }
00298 
00299   return cur_count;
00300 }
00301 
00302 // Remove an item with the lowest deadline time.  If timeout == 0 block
00303 // indefinitely (or until an alert occurs).  Otherwise, block for upto
00304 // the amount of time specified by timeout.
00305 
00306 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00307 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_deadline (ACE_MESSAGE_TYPE *&dequeued,
00308                                                                          ACE_Time_Value *timeout)
00309 {
00310   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_deadline");
00311 
00312   ACE_Message_Block *mb = 0;
00313 
00314   int const cur_count = this->queue_.dequeue_deadline (mb, timeout);
00315 
00316   // Dequeue the message.
00317   if (cur_count != -1)
00318     {
00319       dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00320       // Delete the message block.
00321       mb->release ();
00322     }
00323 
00324   return cur_count;
00325 }
00326 
00327 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00328 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notify (void)
00329 {
00330   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notify");
00331 
00332   return this->queue_.notify ();
00333 }
00334 
00335 
00336 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00337 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex_N
00338   (size_t high_water_mark,
00339    size_t low_water_mark,
00340    ACE_Notification_Strategy *ns):
00341     ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE> (high_water_mark,
00342                                                            low_water_mark,
00343                                                            ns)
00344 {
00345   ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex_N");
00346 }
00347 
00348 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00349 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex_N (void)
00350 {
00351   ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex_N");
00352 }
00353 
00354 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00355 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head
00356   (ACE_MESSAGE_TYPE *new_item,
00357    ACE_Time_Value   *timeout)
00358 {
00359   ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head");
00360 
00361   // Create a chained ACE_Message_Blocks wrappers around the 'chained'
00362   // ACE_MESSAGE_TYPES.
00363   ACE_Message_Block *mb = this->wrap_with_mbs_i (new_item);
00364   if (0 == mb)
00365     {
00366       return -1;
00367     }
00368 
00369   int result = this->queue_.enqueue_head (mb, timeout);
00370   if (-1 == result)
00371     {
00372       // Zap the messages.
00373       mb->release ();
00374     }
00375   return result;
00376 }
00377 
00378 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00379 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail
00380   (ACE_MESSAGE_TYPE *new_item,
00381    ACE_Time_Value   *timeout)
00382 {
00383   ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail");
00384 
00385   // Create a chained ACE_Message_Blocks wrappers around the 'chained'
00386   // ACE_MESSAGE_TYPES.
00387   ACE_Message_Block *mb = this->wrap_with_mbs_i (new_item);
00388   if (0 == mb)
00389     {
00390       return -1;
00391     }
00392 
00393   int result = this->queue_.enqueue_tail (mb, timeout);
00394   if (-1 == result)
00395     {
00396       // Zap the message.
00397       mb->release ();
00398     }
00399   return result;
00400 }
00401 
00402 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_Message_Block *
00403 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::wrap_with_mbs_i
00404   (ACE_MESSAGE_TYPE *new_item)
00405 {
00406   ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::wrap_with_mbs_i");
00407 
00408   // We need to keep a reference to the head of the chain
00409   ACE_Message_Block *mb_head = 0;
00410 
00411   ACE_NEW_RETURN (mb_head,
00412                   ACE_Message_Block ((char *) new_item,
00413                                      sizeof (*new_item),
00414                                      ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00415                   0);
00416 
00417   // mb_tail will point to the last ACE_Message_Block
00418   ACE_Message_Block *mb_tail = mb_head;
00419 
00420   // Run through rest of the messages and wrap them
00421   for (ACE_MESSAGE_TYPE *pobj = new_item->next (); pobj; pobj = pobj->next ())
00422     {
00423       ACE_Message_Block *mb_temp = 0;
00424       ACE_NEW_NORETURN (mb_temp,
00425                         ACE_Message_Block ((char *) pobj,
00426                                            sizeof (*pobj),
00427                                            ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY));
00428       if (mb_temp == 0)
00429         {
00430           mb_head->release ();
00431           mb_head = 0;
00432           break;
00433         }
00434 
00435       mb_tail->next (mb_temp);
00436       mb_tail = mb_temp;
00437     }
00438 
00439   return mb_head;
00440 }
00441 
00442 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Reverse_Iterator)
00443 
00444 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00445 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue (ACE_MESSAGE_TYPE *&first_item,
00446                                                                 ACE_Time_Value *timeout)
00447 {
00448   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue");
00449 
00450   return this->dequeue_head (first_item, timeout);
00451 }
00452 
00453 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_Notification_Strategy *
00454 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy (void)
00455 {
00456   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy");
00457 
00458   return this->queue_.notification_strategy ();
00459 }
00460 
00461 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00462 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy (ACE_Notification_Strategy *s)
00463 {
00464   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy");
00465 
00466   this->queue_.notification_strategy (s);
00467 }
00468 
00469 // Check if queue is empty (holds locks).
00470 
00471 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00472 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_empty (void)
00473 {
00474   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_empty");
00475 
00476   return this->queue_.is_empty ();
00477 }
00478 
00479 // Check if queue is full (holds locks).
00480 
00481 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00482 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_full (void)
00483 {
00484   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_full");
00485 
00486   return this->queue_.is_full ();
00487 }
00488 
00489 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00490 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark (void)
00491 {
00492   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark");
00493 
00494   return this->queue_.high_water_mark ();
00495 }
00496 
00497 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00498 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark (size_t hwm)
00499 {
00500   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark");
00501 
00502   this->queue_.high_water_mark (hwm);
00503 }
00504 
00505 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00506 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark (void)
00507 {
00508   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark");
00509 
00510   return this->queue_.low_water_mark ();
00511 }
00512 
00513 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00514 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark (size_t lwm)
00515 {
00516   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark");
00517 
00518   this->queue_.low_water_mark (lwm);
00519 }
00520 
00521 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00522 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes (void)
00523 {
00524   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes");
00525 
00526   return this->queue_.message_bytes ();
00527 }
00528 
00529 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00530 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length (void)
00531 {
00532   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length");
00533 
00534   return this->queue_.message_length ();
00535 }
00536 
00537 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00538 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_count (void)
00539 {
00540   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_count");
00541 
00542   return this->queue_.message_count ();
00543 }
00544 
00545 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00546 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate (void)
00547 {
00548   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate");
00549 
00550   return this->queue_.deactivate ();
00551 }
00552 
00553 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00554 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate (void)
00555 {
00556   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate");
00557 
00558   return this->queue_.activate ();
00559 }
00560 
00561 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00562 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::pulse (void)
00563 {
00564   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::pulse");
00565 
00566   return this->queue_.pulse ();
00567 }
00568 
00569 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00570 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivated (void)
00571 {
00572   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivated");
00573 
00574   return this->queue_.deactivated ();
00575 }
00576 
00577 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00578 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::state (void)
00579 {
00580   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::state");
00581 
00582   return this->queue_.state ();
00583 }
00584 
00585 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_SYNCH_MUTEX_T &
00586 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::lock (void)
00587 {
00588   return this->queue_.lock ();
00589 }
00590 
00591 template <ACE_SYNCH_DECL>
00592 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
00593   : queue_ (q),
00594     curr_ (q.head_)
00595 {
00596 }
00597 
00598 template <ACE_SYNCH_DECL> int
00599 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
00600 {
00601   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00602 
00603   if (this->curr_ != 0)
00604     {
00605       entry = this->curr_;
00606       return 1;
00607     }
00608 
00609   return 0;
00610 }
00611 
00612 template <ACE_SYNCH_DECL> int
00613 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::done (void) const
00614 {
00615   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00616 
00617   return this->curr_ == 0;
00618 }
00619 
00620 template <ACE_SYNCH_DECL> int
00621 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::advance (void)
00622 {
00623   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00624 
00625   if (this->curr_)
00626     this->curr_ = this->curr_->next ();
00627   return this->curr_ != 0;
00628 }
00629 
00630 template <ACE_SYNCH_DECL> void
00631 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::dump (void) const
00632 {
00633 #if defined (ACE_HAS_DUMP)
00634 #endif /* ACE_HAS_DUMP */
00635 }
00636 
00637 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator)
00638 
00639 template <ACE_SYNCH_DECL>
00640 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
00641   : queue_ (q),
00642     curr_ (queue_.tail_)
00643 {
00644 }
00645 
00646 template <ACE_SYNCH_DECL> int
00647 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
00648 {
00649   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00650 
00651   if (this->curr_ != 0)
00652     {
00653       entry = this->curr_;
00654       return 1;
00655     }
00656 
00657   return 0;
00658 }
00659 
00660 template <ACE_SYNCH_DECL> int
00661 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::done (void) const
00662 {
00663   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00664 
00665   return this->curr_ == 0;
00666 }
00667 
00668 template <ACE_SYNCH_DECL> int
00669 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::advance (void)
00670 {
00671   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00672 
00673   if (this->curr_)
00674     this->curr_ = this->curr_->prev ();
00675   return this->curr_ != 0;
00676 }
00677 
00678 template <ACE_SYNCH_DECL> void
00679 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::dump (void) const
00680 {
00681 #if defined (ACE_HAS_DUMP)
00682 #endif /* ACE_HAS_DUMP */
00683 }
00684 
00685 template <ACE_SYNCH_DECL> int
00686 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue (ACE_Message_Block *&first_item,
00687                                            ACE_Time_Value *timeout)
00688 {
00689   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue");
00690   return this->dequeue_head (first_item, timeout);
00691 }
00692 
00693 template <ACE_SYNCH_DECL> ACE_Notification_Strategy *
00694 ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (void)
00695 {
00696   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy");
00697 
00698   return this->notification_strategy_;
00699 }
00700 
00701 template <ACE_SYNCH_DECL> void
00702 ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (ACE_Notification_Strategy *s)
00703 {
00704   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy");
00705 
00706   this->notification_strategy_ = s;
00707 }
00708 
00709 // Check if queue is empty (does not hold locks).
00710 
00711 template <ACE_SYNCH_DECL> int
00712 ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i (void)
00713 {
00714   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i");
00715   return this->tail_ == 0;
00716 }
00717 
00718 // Check if queue is full (does not hold locks).
00719 
00720 template <ACE_SYNCH_DECL> int
00721 ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i (void)
00722 {
00723   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i");
00724   return this->cur_bytes_ >= this->high_water_mark_;
00725 }
00726 
00727 // Check if queue is empty (holds locks).
00728 
00729 template <ACE_SYNCH_DECL> int
00730 ACE_Message_Queue<ACE_SYNCH_USE>::is_empty (void)
00731 {
00732   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty");
00733   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00734 
00735   return this->is_empty_i ();
00736 }
00737 
00738 // Check if queue is full (holds locks).
00739 
00740 template <ACE_SYNCH_DECL> int
00741 ACE_Message_Queue<ACE_SYNCH_USE>::is_full (void)
00742 {
00743   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full");
00744   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00745 
00746   return this->is_full_i ();
00747 }
00748 
00749 template <ACE_SYNCH_DECL> size_t
00750 ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (void)
00751 {
00752   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark");
00753   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00754 
00755   return this->high_water_mark_;
00756 }
00757 
00758 template <ACE_SYNCH_DECL> void
00759 ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (size_t hwm)
00760 {
00761   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark");
00762   ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00763 
00764   this->high_water_mark_ = hwm;
00765 }
00766 
00767 template <ACE_SYNCH_DECL> size_t
00768 ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (void)
00769 {
00770   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark");
00771   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00772 
00773   return this->low_water_mark_;
00774 }
00775 
00776 template <ACE_SYNCH_DECL> void
00777 ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (size_t lwm)
00778 {
00779   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark");
00780   ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00781 
00782   this->low_water_mark_ = lwm;
00783 }
00784 
00785 template <ACE_SYNCH_DECL> size_t
00786 ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (void)
00787 {
00788   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes");
00789   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00790 
00791   return this->cur_bytes_;
00792 }
00793 
00794 template <ACE_SYNCH_DECL> size_t
00795 ACE_Message_Queue<ACE_SYNCH_USE>::message_length (void)
00796 {
00797   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_length");
00798   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00799 
00800   return this->cur_length_;
00801 }
00802 
00803 template <ACE_SYNCH_DECL> size_t
00804 ACE_Message_Queue<ACE_SYNCH_USE>::message_count (void)
00805 {
00806   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_count");
00807   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00808 
00809   return this->cur_count_;
00810 }
00811 
00812 template <ACE_SYNCH_DECL> int
00813 ACE_Message_Queue<ACE_SYNCH_USE>::deactivate ()
00814 {
00815   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate");
00816   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00817 
00818   return this->deactivate_i (0);   // Not a pulse
00819 }
00820 
00821 template <ACE_SYNCH_DECL> int
00822 ACE_Message_Queue<ACE_SYNCH_USE>::activate (void)
00823 {
00824   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate");
00825   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00826 
00827   return this->activate_i ();
00828 }
00829 
00830 template <ACE_SYNCH_DECL> int
00831 ACE_Message_Queue<ACE_SYNCH_USE>::pulse ()
00832 {
00833   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::pulse");
00834   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00835 
00836   return this->deactivate_i (1);   // Just a pulse
00837 }
00838 
00839 template <ACE_SYNCH_DECL> int
00840 ACE_Message_Queue<ACE_SYNCH_USE>::deactivated (void)
00841 {
00842   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivated");
00843 
00844   return this->state_ == ACE_Message_Queue_Base::DEACTIVATED;
00845 }
00846 
00847 template <ACE_SYNCH_DECL> int
00848 ACE_Message_Queue<ACE_SYNCH_USE>::state (void)
00849 {
00850   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::state");
00851 
00852   return this->state_;
00853 }
00854 
00855 template <ACE_SYNCH_DECL> ACE_SYNCH_MUTEX_T &
00856 ACE_Message_Queue<ACE_SYNCH_USE>::lock (void)
00857 {
00858   return this->lock_;
00859 }
00860 
00861 template <ACE_SYNCH_DECL> void
00862 ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const
00863 {
00864 #if defined (ACE_HAS_DUMP)
00865   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dump");
00866   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00867   switch (this->state_)
00868     {
00869     case ACE_Message_Queue_Base::ACTIVATED:
00870       ACE_DEBUG ((LM_DEBUG,
00871                   ACE_TEXT ("state = ACTIVATED\n")));
00872       break;
00873     case ACE_Message_Queue_Base::DEACTIVATED:
00874       ACE_DEBUG ((LM_DEBUG,
00875                   ACE_TEXT ("state = DEACTIVATED\n")));
00876       break;
00877     case ACE_Message_Queue_Base::PULSED:
00878       ACE_DEBUG ((LM_DEBUG,
00879                   ACE_TEXT ("state = PULSED\n")));
00880       break;
00881     }
00882   ACE_DEBUG ((LM_DEBUG,
00883               ACE_TEXT ("low_water_mark = %d\n")
00884               ACE_TEXT ("high_water_mark = %d\n")
00885               ACE_TEXT ("cur_bytes = %d\n")
00886               ACE_TEXT ("cur_length = %d\n")
00887               ACE_TEXT ("cur_count = %d\n")
00888               ACE_TEXT ("head_ = %u\n")
00889               ACE_TEXT ("tail_ = %u\n"),
00890               this->low_water_mark_,
00891               this->high_water_mark_,
00892               this->cur_bytes_,
00893               this->cur_length_,
00894               this->cur_count_,
00895               this->head_,
00896               this->tail_));
00897   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("not_full_cond: \n")));
00898   not_full_cond_.dump ();
00899   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("not_empty_cond: \n")));
00900   not_empty_cond_.dump ();
00901   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00902 #endif /* ACE_HAS_DUMP */
00903 }
00904 
00905 template <ACE_SYNCH_DECL> void
00906 ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (size_t new_value)
00907 {
00908   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes");
00909   ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00910 
00911   this->cur_bytes_ = new_value;
00912 }
00913 
00914 template <ACE_SYNCH_DECL> void
00915 ACE_Message_Queue<ACE_SYNCH_USE>::message_length (size_t new_value)
00916 {
00917   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_length");
00918   ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00919 
00920   this->cur_length_ = new_value;
00921 }
00922 
00923 template <ACE_SYNCH_DECL>
00924 ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue (size_t hwm,
00925                                                      size_t lwm,
00926                                                      ACE_Notification_Strategy *ns)
00927   : not_empty_cond_ (lock_),
00928     not_full_cond_ (lock_)
00929 {
00930   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue");
00931 
00932   if (this->open (hwm, lwm, ns) == -1)
00933     ACE_ERROR ((LM_ERROR,
00934                 ACE_TEXT ("open")));
00935 }
00936 
00937 template <ACE_SYNCH_DECL>
00938 ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue (void)
00939 {
00940   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue");
00941   if (this->head_ != 0 && this->close () == -1)
00942     ACE_ERROR ((LM_ERROR,
00943                 ACE_TEXT ("close")));
00944 }
00945 
00946 template <ACE_SYNCH_DECL> int
00947 ACE_Message_Queue<ACE_SYNCH_USE>::flush_i (void)
00948 {
00949   int number_flushed = 0;
00950 
00951   // Remove all the <ACE_Message_Block>s in the <ACE_Message_Queue>
00952   // and <release> their memory.
00953   for (this->tail_ = 0; this->head_ != 0; )
00954     {
00955       ++number_flushed;
00956 
00957       size_t mb_bytes = 0;
00958       size_t mb_length = 0;
00959       this->head_->total_size_and_length (mb_bytes,
00960                                           mb_length);
00961       // Subtract off all of the bytes associated with this message.
00962       this->cur_bytes_ -= mb_bytes;
00963       this->cur_length_ -= mb_length;
00964       --this->cur_count_;
00965 
00966       ACE_Message_Block *temp = this->head_;
00967       this->head_ = this->head_->next ();
00968 
00969       // Make sure to use <release> rather than <delete> since this is
00970       // reference counted.
00971       temp->release ();
00972     }
00973 
00974   return number_flushed;
00975 }
00976 
00977 // Don't bother locking since if someone calls this function more than
00978 // once for the same queue, we're in bigger trouble than just
00979 // concurrency control!
00980 
00981 template <ACE_SYNCH_DECL> int
00982 ACE_Message_Queue<ACE_SYNCH_USE>::open (size_t hwm,
00983                                         size_t lwm,
00984                                         ACE_Notification_Strategy *ns)
00985 {
00986   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::open");
00987   this->high_water_mark_ = hwm;
00988   this->low_water_mark_  = lwm;
00989   this->state_ = ACE_Message_Queue_Base::ACTIVATED;
00990   this->cur_bytes_ = 0;
00991   this->cur_length_ = 0;
00992   this->cur_count_ = 0;
00993   this->tail_ = 0;
00994   this->head_ = 0;
00995   this->notification_strategy_ = ns;
00996   return 0;
00997 }
00998 
00999 // Implementation of the public deactivate() method
01000 // (assumes locks are held).
01001 
01002 template <ACE_SYNCH_DECL> int
01003 ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i (int pulse)
01004 {
01005   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i");
01006   int const previous_state = this->state_;
01007 
01008   if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
01009     {
01010       // Wakeup all waiters.
01011       this->not_empty_cond_.broadcast ();
01012       this->not_full_cond_.broadcast ();
01013 
01014       if (pulse)
01015         this->state_ = ACE_Message_Queue_Base::PULSED;
01016       else
01017         this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
01018     }
01019   return previous_state;
01020 }
01021 
01022 template <ACE_SYNCH_DECL> int
01023 ACE_Message_Queue<ACE_SYNCH_USE>::activate_i (void)
01024 {
01025   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate_i");
01026   int const previous_state = this->state_;
01027   this->state_ = ACE_Message_Queue_Base::ACTIVATED;
01028   return previous_state;
01029 }
01030 
01031 template <ACE_SYNCH_DECL> int
01032 ACE_Message_Queue<ACE_SYNCH_USE>::flush (void)
01033 {
01034   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::flush");
01035   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01036 
01037   // Free up the remaining messages on the queue.
01038   return this->flush_i ();
01039 }
01040 
01041 // Clean up the queue if we have not already done so!
01042 
01043 template <ACE_SYNCH_DECL> int
01044 ACE_Message_Queue<ACE_SYNCH_USE>::close (void)
01045 {
01046   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::close");
01047   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01048 
01049   int const result = this->deactivate_i ();
01050 
01051   // Free up the remaining messages on the queue.
01052   this->flush_i ();
01053 
01054   return result;
01055 }
01056 
01057 template <ACE_SYNCH_DECL> int
01058 ACE_Message_Queue<ACE_SYNCH_USE>::signal_enqueue_waiters (void)
01059 {
01060   if (this->not_full_cond_.signal () != 0)
01061     return -1;
01062   return 0;
01063 }
01064 
01065 template <ACE_SYNCH_DECL> int
01066 ACE_Message_Queue<ACE_SYNCH_USE>::signal_dequeue_waiters (void)
01067 {
01068   // Tell any blocked threads that the queue has a new item!
01069   if (this->not_empty_cond_.signal () != 0)
01070     return -1;
01071   return 0;
01072 }
01073 
01074 // Actually put the node at the end (no locking so must be called with
01075 // locks held).
01076 
01077 template <ACE_SYNCH_DECL> int
01078 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i (ACE_Message_Block *new_item)
01079 {
01080   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i");
01081 
01082   if (new_item == 0)
01083     return -1;
01084 
01085   // Update the queued size and length, taking into account any chained
01086   // blocks (total_size_and_length() counts all continuation blocks).
01087   // Keep count of how many blocks we're adding and, if there is a chain of
01088   // blocks, find the end in seq_tail and be sure they're properly
01089   // back-connected along the way.
01090   ACE_Message_Block *seq_tail = new_item;
01091   ++this->cur_count_;
01092   new_item->total_size_and_length (this->cur_bytes_,
01093                                    this->cur_length_);
01094   while (seq_tail->next () != 0)
01095     {
01096       seq_tail->next ()->prev (seq_tail);
01097       seq_tail = seq_tail->next ();
01098       ++this->cur_count_;
01099       seq_tail->total_size_and_length (this->cur_bytes_,
01100                                        this->cur_length_);
01101     }
01102 
01103   // List was empty, so build a new one.
01104   if (this->tail_ == 0)
01105     {
01106       this->head_ = new_item;
01107       this->tail_ = seq_tail;
01108       // seq_tail->next (0);   This is a condition of the while() loop above.
01109       new_item->prev (0);
01110     }
01111   // Link at the end.
01112   else
01113     {
01114       // seq_tail->next (0);   This is a condition of the while() loop above.
01115       this->tail_->next (new_item);
01116       new_item->prev (this->tail_);
01117       this->tail_ = seq_tail;
01118     }
01119 
01120   if (this->signal_dequeue_waiters () == -1)
01121     return -1;
01122   else
01123     return ACE_Utils::truncate_cast<int> (this->cur_count_);
01124 }
01125 
01126 // Actually put the node(s) at the head (no locking)
01127 
01128 template <ACE_SYNCH_DECL> int
01129 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i (ACE_Message_Block *new_item)
01130 {
01131   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i");
01132 
01133   if (new_item == 0)
01134     return -1;
01135 
01136   // Update the queued size and length, taking into account any chained
01137   // blocks (total_size_and_length() counts all continuation blocks).
01138   // Keep count of how many blocks we're adding and, if there is a chain of
01139   // blocks, find the end in seq_tail and be sure they're properly
01140   // back-connected along the way.
01141   ACE_Message_Block *seq_tail = new_item;
01142   ++this->cur_count_;
01143   new_item->total_size_and_length (this->cur_bytes_,
01144                                    this->cur_length_);
01145   while (seq_tail->next () != 0)
01146     {
01147       seq_tail->next ()->prev (seq_tail);
01148       seq_tail = seq_tail->next ();
01149       ++this->cur_count_;
01150       seq_tail->total_size_and_length (this->cur_bytes_,
01151                                        this->cur_length_);
01152     }
01153 
01154   new_item->prev (0);
01155   seq_tail->next (this->head_);
01156 
01157   if (this->head_ != 0)
01158     this->head_->prev (seq_tail);
01159   else
01160     this->tail_ = seq_tail;
01161 
01162   this->head_ = new_item;
01163 
01164   if (this->signal_dequeue_waiters () == -1)
01165     return -1;
01166   else
01167     return ACE_Utils::truncate_cast<int> (this->cur_count_);
01168 }
01169 
01170 // Actually put the node at its proper position relative to its
01171 // priority.
01172 
01173 template <ACE_SYNCH_DECL> int
01174 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
01175 {
01176   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
01177 
01178   if (new_item == 0)
01179     return -1;
01180 
01181   // Since this method uses enqueue_head_i() and enqueue_tail_i() for
01182   // special situations, and this method doesn't support enqueueing
01183   // chains of blocks off the 'next' pointer, make sure the new_item's
01184   // next pointer is 0.
01185   new_item->next (0);
01186 
01187   if (this->head_ == 0)
01188     // Check for simple case of an empty queue, where all we need to
01189     // do is insert <new_item> into the head.
01190     return this->enqueue_head_i (new_item);
01191   else
01192     {
01193       ACE_Message_Block *temp = 0;
01194 
01195       // Figure out where the new item goes relative to its priority.
01196       // We start looking from the lowest priority (at the tail) to
01197       // the highest priority (at the head).
01198 
01199       for (temp = this->tail_;
01200            temp != 0;
01201            temp = temp->prev ())
01202         if (temp->msg_priority () >= new_item->msg_priority ())
01203           // Break out when we've located an item that has
01204           // greater or equal priority.
01205           break;
01206 
01207       if (temp == 0)
01208         // Check for simple case of inserting at the head of the queue,
01209         // where all we need to do is insert <new_item> before the
01210         // current head.
01211         return this->enqueue_head_i (new_item);
01212       else if (temp->next () == 0)
01213         // Check for simple case of inserting at the tail of the
01214         // queue, where all we need to do is insert <new_item> after
01215         // the current tail.
01216         return this->enqueue_tail_i (new_item);
01217       else
01218         {
01219           // Insert the new message behind the message of greater or
01220           // equal priority.  This ensures that FIFO order is
01221           // maintained when messages of the same priority are
01222           // inserted consecutively.
01223           new_item->prev (temp);
01224           new_item->next (temp->next ());
01225           temp->next ()->prev (new_item);
01226           temp->next (new_item);
01227         }
01228     }
01229 
01230   // Make sure to count all the bytes in a composite message!!!
01231   new_item->total_size_and_length (this->cur_bytes_,
01232                                    this->cur_length_);
01233   ++this->cur_count_;
01234 
01235   if (this->signal_dequeue_waiters () == -1)
01236     return -1;
01237   else
01238     return ACE_Utils::truncate_cast<int> (this->cur_count_);
01239 }
01240 
01241 // Actually put the node at its proper position relative to its
01242 // deadline time.
01243 
01244 template <ACE_SYNCH_DECL> int
01245 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i (ACE_Message_Block *new_item)
01246 {
01247 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
01248   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i");
01249 
01250   if (new_item == 0)
01251     return -1;
01252 
01253   // Since this method uses enqueue_head_i() and enqueue_tail_i() for
01254   // special situations, and this method doesn't support enqueueing
01255   // chains of blocks off the 'next' pointer, make sure the new_item's
01256   // next pointer is 0.
01257   new_item->next (0);
01258 
01259   if (this->head_ == 0)
01260     // Check for simple case of an empty queue, where all we need to
01261     // do is insert <new_item> into the head.
01262     return this->enqueue_head_i (new_item);
01263   else
01264     {
01265       ACE_Message_Block *temp = 0;
01266 
01267       // Figure out where the new item goes relative to its priority.
01268       // We start looking from the smallest deadline to the highest
01269       // deadline.
01270 
01271       for (temp = this->head_;
01272            temp != 0;
01273            temp = temp->next ())
01274         if (new_item->msg_deadline_time () < temp->msg_deadline_time ())
01275           // Break out when we've located an item that has
01276           // greater or equal priority.
01277           break;
01278 
01279       if (temp == 0 || temp->next () == 0)
01280         // Check for simple case of inserting at the tail of the queue,
01281         // where all we need to do is insert <new_item> after the
01282         // current tail.
01283         return this->enqueue_tail_i (new_item);
01284       else
01285         {
01286           // Insert the new message behind the message of
01287           // lesser or equal deadline time.  This ensures that FIFO order is
01288           // maintained when messages of the same priority are
01289           // inserted consecutively.
01290           new_item->prev (temp);
01291           new_item->next (temp->next ());
01292           temp->next ()->prev (new_item);
01293           temp->next (new_item);
01294         }
01295     }
01296 
01297   // Make sure to count all the bytes in a composite message!!!
01298   new_item->total_size_and_length (this->cur_bytes_,
01299                                    this->cur_length_);
01300   ++this->cur_count_;
01301 
01302   if (this->signal_dequeue_waiters () == -1)
01303     return -1;
01304   else
01305     return this->cur_count_;
01306 #else
01307   return this->enqueue_tail_i (new_item);
01308 #endif /* ACE_HAS_TIMED_MESSAGE_BLOCKS */
01309 }
01310 
01311 // Actually get the first ACE_Message_Block (no locking, so must be
01312 // called with locks held).  This method assumes that the queue has at
01313 // least one item in it when it is called.
01314 
01315 template <ACE_SYNCH_DECL> int
01316 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
01317 {
01318   if (this->head_ ==0)
01319     ACE_ERROR_RETURN ((LM_ERROR,
01320                        ACE_TEXT ("Attempting to dequeue from empty queue")),
01321                       -1);
01322   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
01323   first_item = this->head_;
01324   this->head_ = this->head_->next ();
01325 
01326   if (this->head_ == 0)
01327     this->tail_ = 0;
01328   else
01329     // The prev pointer of first message block must point to 0...
01330     this->head_->prev (0);
01331 
01332   size_t mb_bytes = 0;
01333   size_t mb_length = 0;
01334   first_item->total_size_and_length (mb_bytes,
01335                                      mb_length);
01336   // Subtract off all of the bytes associated with this message.
01337   this->cur_bytes_ -= mb_bytes;
01338   this->cur_length_ -= mb_length;
01339   --this->cur_count_;
01340 
01341   if (this->cur_count_ == 0 && this->head_ == this->tail_)
01342     this->head_ = this->tail_ = 0;
01343 
01344   // Make sure that the prev and next fields are 0!
01345   first_item->prev (0);
01346   first_item->next (0);
01347 
01348   // Only signal enqueueing threads if we've fallen below the low
01349   // water mark.
01350   if (this->cur_bytes_ <= this->low_water_mark_
01351       && this->signal_enqueue_waiters () == -1)
01352     return -1;
01353   else
01354     return ACE_Utils::truncate_cast<int> (this->cur_count_);
01355 }
01356 
01357 // Get the earliest (i.e., FIFO) ACE_Message_Block with the lowest
01358 // priority (no locking, so must be called with locks held).  This
01359 // method assumes that the queue has at least one item in it when it
01360 // is called.
01361 
01362 template <ACE_SYNCH_DECL> int
01363 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio_i (ACE_Message_Block *&dequeued)
01364 {
01365   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio_i");
01366 
01367   if (this->head_ == 0)
01368     return -1;
01369 
01370   // Find the earliest (i.e., FIFO) message enqueued with the lowest
01371   // priority.
01372   ACE_Message_Block *chosen = 0;
01373   u_long priority = ULONG_MAX;
01374 
01375   for (ACE_Message_Block *temp = this->tail_;
01376        temp != 0;
01377        temp = temp->prev ())
01378     {
01379       // Find the first version of the earliest message (i.e.,
01380       // preserve FIFO order for messages at the same priority).
01381       if (temp->msg_priority () <= priority)
01382         {
01383           priority = temp->msg_priority ();
01384           chosen = temp;
01385         }
01386     }
01387 
01388   // If every message block is the same priority, pass back the first
01389   // one.
01390   if (chosen == 0)
01391     chosen = this->head_;
01392 
01393   // Patch up the queue.  If we don't have a previous then we are at
01394   // the head of the queue.
01395   if (chosen->prev () == 0)
01396     this->head_ = chosen->next ();
01397   else
01398     chosen->prev ()->next (chosen->next ());
01399 
01400   if (chosen->next () == 0)
01401     this->tail_ = chosen->prev ();
01402   else
01403     chosen->next ()->prev (chosen->prev ());
01404 
01405   // Pass back the chosen block
01406   dequeued = chosen;
01407 
01408   size_t mb_bytes = 0;
01409   size_t mb_length = 0;
01410   dequeued->total_size_and_length (mb_bytes,
01411                                    mb_length);
01412   // Subtract off all of the bytes associated with this message.
01413   this->cur_bytes_ -= mb_bytes;
01414   this->cur_length_ -= mb_length;
01415   --this->cur_count_;
01416 
01417   if (this->cur_count_ == 0 && this->head_ == this->tail_)
01418     this->head_ = this->tail_ = 0;
01419 
01420   // Make sure that the prev and next fields are 0!
01421   dequeued->prev (0);
01422   dequeued->next (0);
01423 
01424   // Only signal enqueueing threads if we've fallen below the low
01425   // water mark.
01426   if (this->cur_bytes_ <= this->low_water_mark_
01427       && this->signal_enqueue_waiters () == -1)
01428     return -1;
01429   else
01430     return ACE_Utils::truncate_cast<int> (this->cur_count_);
01431 }
01432 
01433 // Actually get the last ACE_Message_Block (no locking, so must be
01434 // called with locks held).  This method assumes that the queue has at
01435 // least one item in it when it is called.
01436 
01437 template <ACE_SYNCH_DECL> int
01438 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail_i (ACE_Message_Block *&dequeued)
01439 {
01440   if (this->head_ == 0)
01441     ACE_ERROR_RETURN ((LM_ERROR,
01442                        ACE_TEXT ("Attempting to dequeue from empty queue")),
01443                       -1);
01444   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail_i");
01445   dequeued = this->tail_;
01446   if (this->tail_->prev () == 0)
01447     {
01448       this->head_ = 0;
01449       this->tail_ = 0;
01450     }
01451   else
01452     {
01453       this->tail_->prev ()->next (0);
01454       this->tail_ = this->tail_->prev ();
01455     }
01456 
01457   size_t mb_bytes = 0;
01458   size_t mb_length = 0;
01459   dequeued->total_size_and_length (mb_bytes,
01460                                    mb_length);
01461   // Subtract off all of the bytes associated with this message.
01462   this->cur_bytes_ -= mb_bytes;
01463   this->cur_length_ -= mb_length;
01464   --this->cur_count_;
01465 
01466   if (this->cur_count_ == 0 && this->head_ == this->tail_)
01467     this->head_ = this->tail_ = 0;
01468 
01469   // Make sure that the prev and next fields are 0!
01470   dequeued->prev (0);
01471   dequeued->next (0);
01472 
01473   // Only signal enqueueing threads if we've fallen below the low
01474   // water mark.
01475   if (this->cur_bytes_ <= this->low_water_mark_
01476       && this->signal_enqueue_waiters () == -1)
01477     return -1;
01478   else
01479     return ACE_Utils::truncate_cast<int> (this->cur_count_);
01480 }
01481 
01482 // Actually get the ACE_Message_Block with the lowest deadline time
01483 // (no locking, so must be called with locks held).  This method assumes
01484 // that the queue has at least one item in it when it is called.
01485 
01486 template <ACE_SYNCH_DECL> int
01487 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline_i (ACE_Message_Block *&dequeued)
01488 {
01489 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
01490   if (this->head_ == 0)
01491     ACE_ERROR_RETURN ((LM_ERROR,
01492                        ACE_TEXT ("Attempting to dequeue from empty queue")),
01493                       -1);
01494   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline_i");
01495 
01496   // Find the last message enqueued with the lowest deadline time
01497   ACE_Message_Block* chosen = 0;
01498   ACE_Time_Value deadline = ACE_Time_Value::max_time;
01499   for (ACE_Message_Block *temp = this->head_; temp != 0; temp = temp->next ())
01500     if (temp->msg_deadline_time () < deadline)
01501       {
01502         deadline = temp->msg_deadline_time ();
01503         chosen = temp;
01504       }
01505 
01506   // If every message block is the same deadline time,
01507   // pass back the first one
01508   if (chosen == 0)
01509     chosen = this->head_;
01510 
01511   // Patch up the queue.  If we don't have a previous
01512   // then we are at the head of the queue.
01513   if (chosen->prev () == 0)
01514     this->head_ = chosen->next ();
01515   else
01516     chosen->prev ()->next (chosen->next ());
01517 
01518   if (chosen->next () == 0)
01519     this->tail_ = chosen->prev ();
01520   else
01521     chosen->next ()->prev (chosen->prev ());
01522 
01523   // Pass back the chosen block
01524   dequeued = chosen;
01525 
01526   size_t mb_bytes = 0;
01527   size_t mb_length = 0;
01528   dequeued->total_size_and_length (mb_bytes,
01529                                    mb_length);
01530   // Subtract off all of the bytes associated with this message.
01531   this->cur_bytes_ -= mb_bytes;
01532   this->cur_length_ -= mb_length;
01533   --this->cur_count_;
01534 
01535   if (this->cur_count_ == 0 && this->head_ == this->tail_)
01536     this->head_ = this->tail_ = 0;
01537 
01538   // Make sure that the prev and next fields are 0!
01539   dequeued->prev (0);
01540   dequeued->next (0);
01541 
01542   // Only signal enqueueing threads if we've fallen below the low
01543   // water mark.
01544   if (this->cur_bytes_ <= this->low_water_mark_
01545       && this->signal_enqueue_waiters () == -1)
01546     return -1;
01547   else
01548     return this->cur_count_;
01549 #else
01550   return this->dequeue_head_i (dequeued);
01551 #endif /* ACE_HAS_TIMED_MESSAGE_BLOCKS */
01552 }
01553 
01554 // Take a look at the first item without removing it.
01555 
01556 template <ACE_SYNCH_DECL> int
01557 ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
01558                                                      ACE_Time_Value *timeout)
01559 {
01560   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head");
01561   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01562 
01563   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01564     {
01565       errno = ESHUTDOWN;
01566       return -1;
01567     }
01568 
01569   // Wait for at least one item to become available.
01570 
01571   if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01572     return -1;
01573 
01574   first_item = this->head_;
01575   return ACE_Utils::truncate_cast<int> (this->cur_count_);
01576 }
01577 
01578 template <ACE_SYNCH_DECL> int
01579 ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &,
01580                                                       ACE_Time_Value *timeout)
01581 {
01582   int result = 0;
01583 
01584   // Wait while the queue is full.
01585 
01586   while (this->is_full_i ())
01587     {
01588       if (this->not_full_cond_.wait (timeout) == -1)
01589         {
01590           if (errno == ETIME)
01591             errno = EWOULDBLOCK;
01592           result = -1;
01593           break;
01594         }
01595       if (this->state_ != ACE_Message_Queue_Base::ACTIVATED)
01596         {
01597           errno = ESHUTDOWN;
01598           result = -1;
01599           break;
01600         }
01601     }
01602   return result;
01603 }
01604 
01605 template <ACE_SYNCH_DECL> int
01606 ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond
01607     (ACE_Guard<ACE_SYNCH_MUTEX_T> &, ACE_Time_Value *timeout)
01608 {
01609   int result = 0;
01610 
01611   // Wait while the queue is empty.
01612 
01613   while (this->is_empty_i ())
01614     {
01615       if (this->not_empty_cond_.wait (timeout) == -1)
01616         {
01617           if (errno == ETIME)
01618             errno = EWOULDBLOCK;
01619           result = -1;
01620           break;
01621         }
01622       if (this->state_ != ACE_Message_Queue_Base::ACTIVATED)
01623         {
01624           errno = ESHUTDOWN;
01625           result = -1;
01626           break;
01627         }
01628     }
01629   return result;
01630 }
01631 
01632 // Block indefinitely waiting for an item to arrive, does not ignore
01633 // alerts (e.g., signals).
01634 
01635 template <ACE_SYNCH_DECL> int
01636 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
01637                                                 ACE_Time_Value *timeout)
01638 {
01639   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
01640   int queue_count = 0;
01641   {
01642     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01643 
01644     if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01645       {
01646         errno = ESHUTDOWN;
01647         return -1;
01648       }
01649 
01650     if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01651       return -1;
01652 
01653     queue_count = this->enqueue_head_i (new_item);
01654 
01655     if (queue_count == -1)
01656       return -1;
01657 
01658     this->notify ();
01659   }
01660   return queue_count;
01661 }
01662 
01663 // Enqueue an <ACE_Message_Block *> into the <Message_Queue> in
01664 // accordance with its <msg_priority> (0 is lowest priority).  Returns
01665 // -1 on failure, else the number of items still on the queue.
01666 
01667 template <ACE_SYNCH_DECL> int
01668 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item,
01669                                                 ACE_Time_Value *timeout)
01670 {
01671   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio");
01672   int queue_count = 0;
01673   {
01674     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01675 
01676     if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01677       {
01678         errno = ESHUTDOWN;
01679         return -1;
01680       }
01681 
01682     if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01683       return -1;
01684 
01685     queue_count = this->enqueue_i (new_item);
01686 
01687     if (queue_count == -1)
01688       return -1;
01689 
01690     this->notify ();
01691   }
01692   return queue_count;
01693 }
01694 
01695 // Enqueue an <ACE_Message_Block *> into the <Message_Queue> in
01696 // accordance with its <msg_deadline_time>.  Returns
01697 // -1 on failure, else the number of items still on the queue.
01698 
01699 template <ACE_SYNCH_DECL> int
01700 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline (ACE_Message_Block *new_item,
01701                                                     ACE_Time_Value *timeout)
01702 {
01703   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline");
01704   int queue_count = 0;
01705   {
01706     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01707 
01708     if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01709       {
01710         errno = ESHUTDOWN;
01711         return -1;
01712       }
01713 
01714     if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01715       return -1;
01716 
01717     queue_count = this->enqueue_deadline_i (new_item);
01718 
01719     if (queue_count == -1)
01720       return -1;
01721 
01722     this->notify ();
01723   }
01724   return queue_count;
01725 }
01726 
01727 template <ACE_SYNCH_DECL> int
01728 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item,
01729                                            ACE_Time_Value *timeout)
01730 {
01731   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue");
01732   return this->enqueue_prio (new_item, timeout);
01733 }
01734 
01735 // Block indefinitely waiting for an item to arrive,
01736 // does not ignore alerts (e.g., signals).
01737 
01738 template <ACE_SYNCH_DECL> int
01739 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
01740                                               ACE_Time_Value *timeout)
01741 {
01742   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
01743   int queue_count = 0;
01744   {
01745     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01746 
01747     if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01748       {
01749         errno = ESHUTDOWN;
01750         return -1;
01751       }
01752 
01753     if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01754       return -1;
01755 
01756     queue_count = this->enqueue_tail_i (new_item);
01757 
01758     if (queue_count == -1)
01759       return -1;
01760 
01761     this->notify ();
01762   }
01763   return queue_count;
01764 }
01765 
01766 // Remove an item from the front of the queue.  If timeout == 0 block
01767 // indefinitely (or until an alert occurs).  Otherwise, block for upto
01768 // the amount of time specified by timeout.
01769 
01770 template <ACE_SYNCH_DECL> int
01771 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
01772                                                 ACE_Time_Value *timeout)
01773 {
01774   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
01775   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01776 
01777   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01778     {
01779       errno = ESHUTDOWN;
01780       return -1;
01781     }
01782 
01783   if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01784     return -1;
01785 
01786   return this->dequeue_head_i (first_item);
01787 }
01788 
01789 // Remove item with the lowest priority from the queue.  If timeout == 0 block
01790 // indefinitely (or until an alert occurs).  Otherwise, block for upto
01791 // the amount of time specified by timeout.
01792 
01793 template <ACE_SYNCH_DECL> int
01794 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio (ACE_Message_Block *&dequeued,
01795                                                 ACE_Time_Value *timeout)
01796 {
01797   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio");
01798   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01799 
01800   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01801     {
01802       errno = ESHUTDOWN;
01803       return -1;
01804     }
01805 
01806   if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01807     return -1;
01808 
01809   return this->dequeue_prio_i (dequeued);
01810 }
01811 
01812 // Remove an item from the end of the queue.  If timeout == 0 block
01813 // indefinitely (or until an alert occurs).  Otherwise, block for upto
01814 // the amount of time specified by timeout.
01815 
01816 template <ACE_SYNCH_DECL> int
01817 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail (ACE_Message_Block *&dequeued,
01818                                                 ACE_Time_Value *timeout)
01819 {
01820   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail");
01821   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01822 
01823   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01824     {
01825       errno = ESHUTDOWN;
01826       return -1;
01827     }
01828 
01829   if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01830     return -1;
01831 
01832   return this->dequeue_tail_i (dequeued);
01833 }
01834 
01835 // Remove an item with the lowest deadline time.  If timeout == 0 block
01836 // indefinitely (or until an alert occurs).  Otherwise, block for upto
01837 // the amount of time specified by timeout.
01838 
01839 template <ACE_SYNCH_DECL> int
01840 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline (ACE_Message_Block *&dequeued,
01841                                                     ACE_Time_Value *timeout)
01842 {
01843   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline");
01844   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01845 
01846   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01847     {
01848       errno = ESHUTDOWN;
01849       return -1;
01850     }
01851 
01852   if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01853     return -1;
01854 
01855   return this->dequeue_deadline_i (dequeued);
01856 }
01857 
01858 template <ACE_SYNCH_DECL> int
01859 ACE_Message_Queue<ACE_SYNCH_USE>::notify (void)
01860 {
01861   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notify");
01862 
01863   // By default, don't do anything.
01864   if (this->notification_strategy_ == 0)
01865     return 0;
01866   else
01867     return this->notification_strategy_->notify ();
01868 }
01869 
01870 
01871 // = Initialization and termination methods.
01872 template <ACE_SYNCH_DECL>
01873 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy,
01874                                                                      size_t hwm,
01875                                                                      size_t lwm,
01876                                                                      ACE_Notification_Strategy *ns)
01877   : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
01878     pending_head_ (0),
01879     pending_tail_ (0),
01880     late_head_ (0),
01881     late_tail_ (0),
01882     beyond_late_head_ (0),
01883     beyond_late_tail_ (0),
01884     message_strategy_ (message_strategy)
01885 {
01886   // Note, the ACE_Dynamic_Message_Queue assumes full responsibility
01887   // for the passed ACE_Dynamic_Message_Strategy object, and deletes
01888   // it in its own dtor
01889 }
01890 
01891 // dtor: free message strategy and let base class dtor do the rest.
01892 
01893 template <ACE_SYNCH_DECL>
01894 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void)
01895 {
01896   delete &this->message_strategy_;
01897 }
01898 
01899 template <ACE_SYNCH_DECL> int
01900 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_messages (ACE_Message_Block *&list_head,
01901                                                            ACE_Message_Block *&list_tail,
01902                                                            u_int status_flags)
01903 {
01904   // start with an empty list
01905   list_head = 0;
01906   list_tail = 0;
01907 
01908   // Get the current time
01909   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01910 
01911   // Refresh priority status boundaries in the queue.
01912   int result = this->refresh_queue (current_time);
01913   if (result < 0)
01914     return result;
01915 
01916   if (ACE_BIT_ENABLED (status_flags,
01917                        (u_int) ACE_Dynamic_Message_Strategy::PENDING)
01918       && this->pending_head_
01919       && this->pending_tail_)
01920     {
01921       // patch up pointers for the new tail of the queue
01922       if (this->pending_head_->prev ())
01923         {
01924           this->tail_ = this->pending_head_->prev ();
01925           this->pending_head_->prev ()->next (0);
01926         }
01927       else
01928         {
01929           // the list has become empty
01930           this->head_ = 0;
01931           this->tail_ = 0;
01932         }
01933 
01934       // point to the head and tail of the list
01935       list_head = this->pending_head_;
01936       list_tail = this->pending_tail_;
01937 
01938       // cut the pending messages out of the queue entirely
01939       this->pending_head_->prev (0);
01940       this->pending_head_ = 0;
01941       this->pending_tail_ = 0;
01942     }
01943 
01944   if (ACE_BIT_ENABLED (status_flags,
01945                        (u_int) ACE_Dynamic_Message_Strategy::LATE)
01946       && this->late_head_
01947       && this->late_tail_)
01948     {
01949       // Patch up pointers for the (possibly) new head and tail of the
01950       // queue.
01951       if (this->late_tail_->next ())
01952         this->late_tail_->next ()->prev (this->late_head_->prev ());
01953       else
01954         this->tail_ = this->late_head_->prev ();
01955 
01956       if (this->late_head_->prev ())
01957         this->late_head_->prev ()->next (this->late_tail_->next ());
01958       else
01959         this->head_ = this->late_tail_->next ();
01960 
01961       // put late messages behind pending messages (if any) being returned
01962       this->late_head_->prev (list_tail);
01963       if (list_tail)
01964         list_tail->next (this->late_head_);
01965       else
01966         list_head = this->late_head_;
01967 
01968       list_tail = this->late_tail_;
01969 
01970       this->late_tail_->next (0);
01971       this->late_head_ = 0;
01972       this->late_tail_ = 0;
01973     }
01974 
01975   if (ACE_BIT_ENABLED (status_flags,
01976       (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE)
01977       && this->beyond_late_head_
01978       && this->beyond_late_tail_)
01979     {
01980       // Patch up pointers for the new tail of the queue
01981       if (this->beyond_late_tail_->next ())
01982         {
01983           this->head_ = this->beyond_late_tail_->next ();
01984           this->beyond_late_tail_->next ()->prev (0);
01985         }
01986       else
01987         {
01988           // the list has become empty
01989           this->head_ = 0;
01990           this->tail_ = 0;
01991         }
01992 
01993       // Put beyond late messages at the end of the list being
01994       // returned.
01995       if (list_tail)
01996         {
01997           this->beyond_late_head_->prev (list_tail);
01998           list_tail->next (this->beyond_late_head_);
01999         }
02000       else
02001         list_head = this->beyond_late_head_;
02002 
02003       list_tail = this->beyond_late_tail_;
02004 
02005       this->beyond_late_tail_->next (0);
02006       this->beyond_late_head_ = 0;
02007       this->beyond_late_tail_ = 0;
02008     }
02009 
02010   // Decrement message and size counts for removed messages.
02011   ACE_Message_Block *temp1;
02012 
02013   for (temp1 = list_head;
02014        temp1 != 0;
02015        temp1 = temp1->next ())
02016     {
02017       --this->cur_count_;
02018 
02019       size_t mb_bytes = 0;
02020       size_t mb_length = 0;
02021       temp1->total_size_and_length (mb_bytes,
02022                                     mb_length);
02023       // Subtract off all of the bytes associated with this message.
02024       this->cur_bytes_ -= mb_bytes;
02025       this->cur_length_ -= mb_length;
02026     }
02027 
02028   return result;
02029 }
02030 
02031 // Detach all messages with status given in the passed flags from the
02032 // queue and return them by setting passed head and tail pointers to
02033 // the linked list they comprise.  This method is intended primarily
02034 // as a means of periodically harvesting messages that have missed
02035 // their deadlines, but is available in its most general form.  All
02036 // messages are returned in priority order, from head to tail, as of
02037 // the time this method was called.
02038 
02039 template <ACE_SYNCH_DECL> int
02040 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
02041                                                         ACE_Time_Value *timeout)
02042 {
02043   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
02044 
02045   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
02046 
02047   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
02048     {
02049       errno = ESHUTDOWN;
02050       return -1;
02051     }
02052 
02053   int result;
02054 
02055   // get the current time
02056   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02057 
02058   // refresh priority status boundaries in the queue
02059   result = this->refresh_queue (current_time);
02060   if (result < 0)
02061     return result;
02062 
02063   // *now* it's appropriate to wait for an enqueued item
02064   result = this->wait_not_empty_cond (ace_mon, timeout);
02065   if (result == -1)
02066     return result;
02067 
02068   // call the internal dequeue method, which selects an item from the
02069   // highest priority status portion of the queue that has messages
02070   // enqueued.
02071   result = this->dequeue_head_i (first_item);
02072 
02073   return result;
02074 }
02075 
02076 // Dequeue and return the <ACE_Message_Block *> at the (logical) head
02077 // of the queue.
02078 
02079 template <ACE_SYNCH_DECL> void
02080 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump (void) const
02081 {
02082 #if defined (ACE_HAS_DUMP)
02083   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump");
02084   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02085 
02086   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n")));
02087   this->ACE_Message_Queue<ACE_SYNCH_USE>::dump ();
02088 
02089   ACE_DEBUG ((LM_DEBUG,
02090               ACE_TEXT ("pending_head_ = %u\n")
02091               ACE_TEXT ("pending_tail_ = %u\n")
02092               ACE_TEXT ("late_head_ = %u\n")
02093               ACE_TEXT ("late_tail_ = %u\n")
02094               ACE_TEXT ("beyond_late_head_ = %u\n")
02095               ACE_TEXT ("beyond_late_tail_ = %u\n"),
02096               this->pending_head_,
02097               this->pending_tail_,
02098               this->late_head_,
02099               this->late_tail_,
02100               this->beyond_late_head_,
02101               this->beyond_late_tail_));
02102 
02103   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("message_strategy_ : \n")));
02104   message_strategy_.dump ();
02105 
02106   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02107 #endif /* ACE_HAS_DUMP */
02108 }
02109   // dump the state of the queue
02110 
02111 template <ACE_SYNCH_DECL> int
02112 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
02113 {
02114   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
02115 
02116   if (new_item == 0)
02117     return -1;
02118 
02119   int result = 0;
02120 
02121   // Get the current time.
02122   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02123 
02124   // Refresh priority status boundaries in the queue.
02125 
02126   result = this->refresh_queue (current_time);
02127   if (result < 0)
02128     return result;
02129 
02130   // Where we enqueue depends on the message's priority status.
02131   switch (message_strategy_.priority_status (*new_item,
02132                                              current_time))
02133     {
02134     case ACE_Dynamic_Message_Strategy::PENDING:
02135       if (this->pending_tail_ == 0)
02136         {
02137           // Check for simple case of an empty pending queue, where
02138           // all we need to do is insert <new_item> into the tail of
02139           // the queue.
02140           pending_head_ = new_item;
02141           pending_tail_ = pending_head_;
02142           return this->enqueue_tail_i (new_item);
02143         }
02144       else
02145         {
02146           // Enqueue the new message in priority order in the pending
02147           // sublist
02148           result = sublist_enqueue_i (new_item,
02149                                       current_time,
02150                                       this->pending_head_,
02151                                       this->pending_tail_,
02152                                       ACE_Dynamic_Message_Strategy::PENDING);
02153         }
02154       break;
02155 
02156     case ACE_Dynamic_Message_Strategy::LATE:
02157       if (this->late_tail_ == 0)
02158         {
02159           late_head_ = new_item;
02160           late_tail_ = late_head_;
02161 
02162           if (this->pending_head_ == 0)
02163             // Check for simple case of an empty pending queue,
02164             // where all we need to do is insert <new_item> into the
02165             // tail of the queue.
02166             return this->enqueue_tail_i (new_item);
02167           else if (this->beyond_late_tail_ == 0)
02168             // Check for simple case of an empty beyond late queue, where all
02169             // we need to do is insert <new_item> into the head of the queue.
02170             return this->enqueue_head_i (new_item);
02171           else
02172             {
02173               // Otherwise, we can just splice the new message in
02174               // between the pending and beyond late portions of the
02175               // queue.
02176               this->beyond_late_tail_->next (new_item);
02177               new_item->prev (this->beyond_late_tail_);
02178               this->pending_head_->prev (new_item);
02179               new_item->next (this->pending_head_);
02180             }
02181         }
02182       else
02183         {
02184           // Enqueue the new message in priority order in the late
02185           // sublist
02186           result = sublist_enqueue_i (new_item,
02187                                       current_time,
02188                                       this->late_head_,
02189                                       this->late_tail_,
02190                                       ACE_Dynamic_Message_Strategy::LATE);
02191         }
02192       break;
02193 
02194     case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02195       if (this->beyond_late_tail_ == 0)
02196         {
02197           // Check for simple case of an empty beyond late queue,
02198           // where all we need to do is insert <new_item> into the
02199           // head of the queue.
02200           beyond_late_head_ = new_item;
02201           beyond_late_tail_ = beyond_late_head_;
02202           return this->enqueue_head_i (new_item);
02203         }
02204       else
02205         {
02206           // all beyond late messages have the same (zero) priority,
02207           // so just put the new one at the end of the beyond late
02208           // messages
02209           if (this->beyond_late_tail_->next ())
02210             this->beyond_late_tail_->next ()->prev (new_item);
02211           else
02212             this->tail_ = new_item;
02213 
02214           new_item->next (this->beyond_late_tail_->next ());
02215           this->beyond_late_tail_->next (new_item);
02216           new_item->prev (this->beyond_late_tail_);
02217           this->beyond_late_tail_ = new_item;
02218         }
02219 
02220       break;
02221 
02222       // should never get here, but just in case...
02223     default:
02224       result = -1;
02225       break;
02226     }
02227 
02228   if (result < 0)
02229     return result;
02230 
02231   size_t mb_bytes = 0;
02232   size_t mb_length = 0;
02233   new_item->total_size_and_length (mb_bytes,
02234                                    mb_length);
02235   this->cur_bytes_ += mb_bytes;
02236   this->cur_length_ += mb_length;
02237   ++this->cur_count_;
02238 
02239   if (this->signal_dequeue_waiters () == -1)
02240     return -1;
02241   else
02242     return this->cur_count_;
02243 }
02244 
02245 // Enqueue an <ACE_Message_Block *> in accordance with its priority.
02246 // priority may be *dynamic* or *static* or a combination or *both* It
02247 // calls the priority evaluation function passed into the Dynamic
02248 // Message Queue constructor to update the priorities of all enqueued
02249 // messages.
02250 
02251 template <ACE_SYNCH_DECL> int
02252 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::sublist_enqueue_i (ACE_Message_Block *new_item,
02253                                                              const ACE_Time_Value &current_time,
02254                                                              ACE_Message_Block *&sublist_head,
02255                                                              ACE_Message_Block *&sublist_tail,
02256                                                              ACE_Dynamic_Message_Strategy::Priority_Status status)
02257 {
02258   int result = 0;
02259   ACE_Message_Block *current_item = 0;
02260 
02261   // Find message after which to enqueue new item, based on message
02262   // priority and priority status.
02263   for (current_item = sublist_tail;
02264        current_item;
02265        current_item = current_item->prev ())
02266     {
02267       if (message_strategy_.priority_status (*current_item, current_time) == status)
02268         {
02269           if (current_item->msg_priority () >= new_item->msg_priority ())
02270             break;
02271         }
02272       else
02273         {
02274           sublist_head = new_item;
02275           break;
02276         }
02277     }
02278 
02279   if (current_item == 0)
02280     {
02281       // If the new message has highest priority of any, put it at the
02282       // head of the list (and sublist).
02283       new_item->prev (0);
02284       new_item->next (this->head_);
02285       if (this->head_ != 0)
02286         this->head_->prev (new_item);
02287       else
02288         {
02289           this->tail_ = new_item;
02290           sublist_tail = new_item;
02291         }
02292       this->head_ = new_item;
02293       sublist_head = new_item;
02294     }
02295   else
02296     {
02297       // insert the new item into the list
02298       new_item->next (current_item->next ());
02299       new_item->prev (current_item);
02300 
02301       if (current_item->next ())
02302         current_item->next ()->prev (new_item);
02303       else
02304         this->tail_ = new_item;
02305 
02306       current_item->next (new_item);
02307 
02308       // If the new item has lowest priority of any in the sublist,
02309       // move the tail pointer of the sublist back to the new item
02310       if (current_item == sublist_tail)
02311         sublist_tail = new_item;
02312     }
02313 
02314   return result;
02315 }
02316 
02317 // Enqueue a message in priority order within a given priority status
02318 // sublist.
02319 
02320 template <ACE_SYNCH_DECL> int
02321 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
02322 {
02323   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
02324 
02325   int result = 0;
02326   int last_in_subqueue = 0;
02327 
02328   // first, try to dequeue from the head of the pending list
02329   if (this->pending_head_)
02330     {
02331       first_item = this->pending_head_;
02332 
02333       if (0 == this->pending_head_->prev ())
02334         this->head_ = this->pending_head_->next ();
02335       else
02336         this->pending_head_->prev ()->next (this->pending_head_->next ());
02337 
02338       if (0 == this->pending_head_->next ())
02339         {
02340           this->tail_ = this->pending_head_->prev ();
02341           this->pending_head_ = 0;
02342           this->pending_tail_ = 0;
02343         }
02344       else
02345         {
02346           this->pending_head_->next ()->prev (this->pending_head_->prev ());
02347           this->pending_head_ = this->pending_head_->next ();
02348         }
02349 
02350       first_item->prev (0);
02351       first_item->next (0);
02352     }
02353 
02354   // Second, try to dequeue from the head of the late list
02355   else if (this->late_head_)
02356     {
02357       last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0;
02358 
02359       first_item = this->late_head_;
02360 
02361       if (0 == this->late_head_->prev ())
02362         this->head_ = this->late_head_->next ();
02363       else
02364         this->late_head_->prev ()->next (this->late_head_->next ());
02365 
02366       if (0 == this->late_head_->next ())
02367         this->tail_ = this->late_head_->prev ();
02368       else
02369         {
02370           this->late_head_->next ()->prev (this->late_head_->prev ());
02371           this->late_head_ = this->late_head_->next ();
02372         }
02373 
02374       if (last_in_subqueue)
02375         {
02376           this->late_head_ = 0;
02377           this->late_tail_ = 0;
02378         }
02379 
02380       first_item->prev (0);
02381       first_item->next (0);
02382     }
02383   // finally, try to dequeue from the head of the beyond late list
02384   else if (this->beyond_late_head_)
02385     {
02386       last_in_subqueue =
02387         (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0;
02388 
02389       first_item = this->beyond_late_head_;
02390       this->head_ = this->beyond_late_head_->next ();
02391 
02392       if (0 == this->beyond_late_head_->next ())
02393         this->tail_ = this->beyond_late_head_->prev ();
02394       else
02395         {
02396           this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ());
02397           this->beyond_late_head_ = this->beyond_late_head_->next ();
02398         }
02399 
02400       if (last_in_subqueue)
02401         {
02402           this->beyond_late_head_ = 0;
02403           this->beyond_late_tail_ = 0;
02404         }
02405 
02406       first_item->prev (0);
02407       first_item->next (0);
02408     }
02409   else
02410     {
02411       // nothing to dequeue: set the pointer to zero and return an error code
02412       first_item = 0;
02413       result = -1;
02414     }
02415 
02416   if (result < 0)
02417     return result;
02418 
02419   size_t mb_bytes = 0;
02420   size_t mb_length = 0;
02421   first_item->total_size_and_length (mb_bytes,
02422                                      mb_length);
02423   // Subtract off all of the bytes associated with this message.
02424   this->cur_bytes_ -= mb_bytes;
02425   this->cur_length_ -= mb_length;
02426   --this->cur_count_;
02427 
02428   // Only signal enqueueing threads if we've fallen below the low
02429   // water mark.
02430   if (this->cur_bytes_ <= this->low_water_mark_
02431       && this->signal_enqueue_waiters () == -1)
02432     return -1;
02433   else
02434     return this->cur_count_;
02435 }
02436 
02437 // Dequeue and return the <ACE_Message_Block *> at the head of the
02438 // logical queue.  Attempts first to dequeue from the pending portion
02439 // of the queue, or if that is empty from the late portion, or if that
02440 // is empty from the beyond late portion, or if that is empty just
02441 // sets the passed pointer to zero and returns -1.
02442 
02443 template <ACE_SYNCH_DECL> int
02444 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value &current_time)
02445 {
02446   int result;
02447 
02448   result = refresh_pending_queue (current_time);
02449 
02450   if (result != -1)
02451     result = refresh_late_queue (current_time);
02452 
02453   return result;
02454 }
02455 
02456 // Refresh the queue using the strategy specific priority status
02457 // function.
02458 
02459 template <ACE_SYNCH_DECL> int
02460 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_pending_queue (const ACE_Time_Value &current_time)
02461 {
02462   ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02463 
02464   // refresh priority status boundaries in the queue
02465   if (this->pending_head_)
02466     {
02467       current_status = message_strategy_.priority_status (*this->pending_head_,
02468                                                           current_time);
02469       switch (current_status)
02470         {
02471         case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02472           // Make sure the head of the beyond late queue is set (there
02473           // may not have been any beyond late messages previously)
02474           this->beyond_late_head_ = this->head_;
02475 
02476           // Zero out the late queue pointers, and set them only if
02477           // there turn out to be late messages in the pending sublist
02478           this->late_head_ = 0;
02479           this->late_tail_ = 0;
02480 
02481           // Advance through the beyond late messages in the pending queue
02482           do
02483             {
02484               this->pending_head_ = this->pending_head_->next ();
02485 
02486               if (this->pending_head_)
02487                 current_status = message_strategy_.priority_status (*this->pending_head_,
02488                                                                     current_time);
02489               else
02490                 break;  // do while
02491 
02492             }
02493           while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02494 
02495           if (this->pending_head_)
02496             {
02497               // point tail of beyond late sublist to previous item
02498               this->beyond_late_tail_ = this->pending_head_->prev ();
02499 
02500               if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02501                 // there are no late messages left in the queue
02502                 break; // switch
02503               else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02504                 {
02505                   // if we got here, something is *seriously* wrong with the queue
02506                   ACE_ERROR_RETURN ((LM_ERROR,
02507                                      ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02508                                      (int) current_status),
02509                                     -1);
02510                 }
02511               /* FALLTHRU */
02512             }
02513           else
02514             {
02515               // There are no pending or late messages left in the
02516               // queue.
02517               this->beyond_late_tail_ = this->tail_;
02518               this->pending_head_ = 0;
02519               this->pending_tail_ = 0;
02520               break; // switch
02521             }
02522 
02523         case ACE_Dynamic_Message_Strategy::LATE:
02524           // Make sure the head of the late queue is set (there may
02525           // not have been any late messages previously, or they may
02526           // have all become beyond late).
02527           if (this->late_head_ == 0)
02528             this->late_head_ = this->pending_head_;
02529 
02530           // advance through the beyond late messages in the pending queue
02531           do
02532             {
02533               this->pending_head_ = this->pending_head_->next ();
02534 
02535               if (this->pending_head_)
02536                 current_status = message_strategy_.priority_status (*this->pending_head_,
02537                                                                     current_time);
02538               else
02539                 break;  // do while
02540 
02541             }
02542           while (current_status == ACE_Dynamic_Message_Strategy::LATE);
02543 
02544           if (this->pending_head_)
02545             {
02546               if (current_status != ACE_Dynamic_Message_Strategy::PENDING)
02547                 // if we got here, something is *seriously* wrong with the queue
02548                 ACE_ERROR_RETURN((LM_ERROR,
02549                                   ACE_TEXT ("Unexpected message priority status [%d] (expected PENDING)"),
02550                                   (int) current_status),
02551                                  -1);
02552 
02553               // Point tail of late sublist to previous item
02554               this->late_tail_ = this->pending_head_->prev ();
02555             }
02556           else
02557             {
02558               // there are no pending messages left in the queue
02559               this->late_tail_ = this->tail_;
02560               this->pending_head_ = 0;
02561               this->pending_tail_ = 0;
02562             }
02563 
02564           break; // switch
02565         case ACE_Dynamic_Message_Strategy::PENDING:
02566           // do nothing - the pending queue is unchanged
02567           break; // switch
02568         default:
02569           // if we got here, something is *seriously* wrong with the queue
02570           ACE_ERROR_RETURN((LM_ERROR,
02571                             ACE_TEXT ("Unknown message priority status [%d]"),
02572                             (int) current_status),
02573                            -1);
02574         }
02575     }
02576   return 0;
02577 }
02578 
02579 // Refresh the pending queue using the strategy specific priority
02580 // status function.
02581 
02582 template <ACE_SYNCH_DECL> int
02583 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_late_queue (const ACE_Time_Value &current_time)
02584 {
02585   ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02586 
02587   if (this->late_head_)
02588     {
02589       current_status = message_strategy_.priority_status (*this->late_head_,
02590                                                           current_time);
02591       switch (current_status)
02592         {
02593         case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02594 
02595           // make sure the head of the beyond late queue is set
02596           // (there may not have been any beyond late messages previously)
02597           this->beyond_late_head_ = this->head_;
02598 
02599           // advance through the beyond late messages in the late queue
02600           do
02601             {
02602               this->late_head_ = this->late_head_->next ();
02603 
02604               if (this->late_head_)
02605                 current_status = message_strategy_.priority_status (*this->late_head_,
02606                                                                     current_time);
02607               else
02608                 break;  // do while
02609 
02610             }
02611           while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02612 
02613           if (this->late_head_)
02614             {
02615               // point tail of beyond late sublist to previous item
02616               this->beyond_late_tail_ = this->late_head_->prev ();
02617 
02618               if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02619                 {
02620                   // there are no late messages left in the queue
02621                   this->late_head_ = 0;
02622                   this->late_tail_ = 0;
02623                 }
02624               else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02625                 // if we got here, something is *seriously* wrong with the queue
02626                 ACE_ERROR_RETURN ((LM_ERROR,
02627                                    ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02628                                    (int) current_status),
02629                                   -1);
02630             }
02631           else
02632             {
02633               // there are no late messages left in the queue
02634               this->beyond_late_tail_ = this->tail_;
02635               this->late_head_ = 0;
02636               this->late_tail_ = 0;
02637             }
02638 
02639           break;  // switch
02640 
02641         case ACE_Dynamic_Message_Strategy::LATE:
02642           // do nothing - the late queue is unchanged
02643           break; // switch
02644 
02645         case ACE_Dynamic_Message_Strategy::PENDING:
02646           // if we got here, something is *seriously* wrong with the queue
02647           ACE_ERROR_RETURN ((LM_ERROR,
02648                              ACE_TEXT ("Unexpected message priority status ")
02649                              ACE_TEXT ("[%d] (expected LATE or BEYOND_LATE)"),
02650                              (int) current_status),
02651                             -1);
02652         default:
02653           // if we got here, something is *seriously* wrong with the queue
02654           ACE_ERROR_RETURN ((LM_ERROR,
02655                              ACE_TEXT ("Unknown message priority status [%d]"),
02656                              (int) current_status),
02657                             -1);
02658         }
02659     }
02660 
02661   return 0;
02662 }
02663 
02664 // Refresh the late queue using the strategy specific priority status
02665 // function.
02666 
02667 template <ACE_SYNCH_DECL> int
02668 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
02669                                                              ACE_Time_Value *timeout)
02670 {
02671   return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item,
02672                                                               timeout);
02673 }
02674 
02675 // Private method to hide public base class method: just calls base
02676 // class method.
02677 
02678 template <ACE_SYNCH_DECL> int
02679 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
02680                                                         ACE_Time_Value *timeout)
02681 {
02682   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
02683   return this->enqueue_prio (new_item, timeout);
02684 }
02685 
02686 // Just call priority enqueue method: tail enqueue semantics for
02687 // dynamic message queues are unstable: the message may or may not be
02688 // where it was placed after the queue is refreshed prior to the next
02689 // enqueue or dequeue operation.
02690 
02691 template <ACE_SYNCH_DECL> int
02692 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
02693                                                         ACE_Time_Value *timeout)
02694 {
02695   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
02696   return this->enqueue_prio (new_item, timeout);
02697 }
02698 
02699 // Just call priority enqueue method: head enqueue semantics for
02700 // dynamic message queues are unstable: the message may or may not be
02701 // where it was placed after the queue is refreshed prior to the next
02702 // enqueue or dequeue operation.
02703 
02704 template <ACE_SYNCH_DECL>
02705 ACE_Message_Queue<ACE_SYNCH_USE> *
02706 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_static_message_queue (size_t hwm,
02707                                                                        size_t lwm,
02708                                                                        ACE_Notification_Strategy *ns)
02709 {
02710   ACE_Message_Queue<ACE_SYNCH_USE> *tmp = 0;
02711 
02712   ACE_NEW_RETURN (tmp,
02713                   ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
02714                   0);
02715   return tmp;
02716 }
02717 
02718 // Factory method for a statically prioritized ACE_Message_Queue.
02719 
02720 template <ACE_SYNCH_DECL>
02721 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
02722 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t hwm,
02723                                                                          size_t lwm,
02724                                                                          ACE_Notification_Strategy *ns,
02725                                                                          u_long static_bit_field_mask,
02726                                                                          u_long static_bit_field_shift,
02727                                                                          u_long dynamic_priority_max,
02728                                                                          u_long dynamic_priority_offset)
02729 {
02730   ACE_Deadline_Message_Strategy *adms = 0;
02731 
02732   ACE_NEW_RETURN (adms,
02733                   ACE_Deadline_Message_Strategy (static_bit_field_mask,
02734                                                  static_bit_field_shift,
02735                                                  dynamic_priority_max,
02736                                                  dynamic_priority_offset),
02737                   0);
02738 
02739   ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *tmp = 0;
02740   ACE_NEW_RETURN (tmp,
02741                   ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adms, hwm, lwm, ns),
02742                   0);
02743   return tmp;
02744 }
02745 
02746 // Factory method for a dynamically prioritized (by time to deadline)
02747 // ACE_Dynamic_Message_Queue.
02748 
02749 template <ACE_SYNCH_DECL>
02750 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
02751 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hwm,
02752                                                                        size_t lwm,
02753                                                                        ACE_Notification_Strategy *ns,
02754                                                                        u_long static_bit_field_mask,
02755                                                                        u_long static_bit_field_shift,
02756                                                                        u_long dynamic_priority_max,
02757                                                                        u_long dynamic_priority_offset)
02758 {
02759   ACE_Laxity_Message_Strategy *alms = 0;
02760 
02761   ACE_NEW_RETURN (alms,
02762                   ACE_Laxity_Message_Strategy (static_bit_field_mask,
02763                                                static_bit_field_shift,
02764                                                dynamic_priority_max,
02765                                                dynamic_priority_offset),
02766                   0);
02767 
02768   ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *tmp = 0;
02769   ACE_NEW_RETURN (tmp,
02770                   ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alms, hwm, lwm, ns),
02771                   0);
02772   return tmp;
02773 }
02774 
02775 // Factory method for a dynamically prioritized (by laxity)
02776 // <ACE_Dynamic_Message_Queue>.
02777 
02778 #if defined (ACE_VXWORKS)
02779 
02780 template <ACE_SYNCH_DECL>
02781 ACE_Message_Queue_Vx *
02782 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_Vx_message_queue (size_t max_messages,
02783                                                                    size_t max_message_length,
02784                                                                    ACE_Notification_Strategy *ns)
02785 {
02786   ACE_Message_Queue_Vx *tmp = 0;
02787 
02788   ACE_NEW_RETURN (tmp,
02789                   ACE_Message_Queue_Vx (max_messages, max_message_length, ns),
02790                   0);
02791   return tmp;
02792 }
02793   // factory method for a wrapped VxWorks message queue
02794 
02795 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
02796 
02797 template <ACE_SYNCH_DECL>
02798 ACE_Message_Queue_NT *
02799 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_NT_message_queue (size_t max_threads)
02800 {
02801   ACE_Message_Queue_NT *tmp = 0;
02802 
02803   ACE_NEW_RETURN (tmp,
02804                   ACE_Message_Queue_NT (max_threads);
02805                   0);
02806   return tmp;
02807 }
02808 
02809 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
02810 #endif /* defined (ACE_VXWORKS) */
02811 
02812 ACE_END_VERSIONED_NAMESPACE_DECL
02813 
02814 #endif /* !ACE_MESSAGE_QUEUE_T_CPP */

Generated on Sun Jan 27 12:05:31 2008 for ACE by doxygen 1.3.6