Message_Queue_T.cpp

Go to the documentation of this file.
00001 // $Id: Message_Queue_T.cpp 81691 2008-05-14 11:09:21Z johnnyw $
00002 
00003 #ifndef ACE_MESSAGE_QUEUE_T_CPP
00004 #define ACE_MESSAGE_QUEUE_T_CPP
00005 
00006 // #include Message_Queue.h instead of Message_Queue_T.h to avoid
00007 // circular include problems.
00008 #include "ace/Message_Queue.h"
00009 #include "ace/Log_Msg.h"
00010 #include "ace/OS_NS_sys_time.h"
00011 
00012 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00013 # pragma once
00014 #endif /* ACE_LACKS_PRAGMA_ONCE */
00015 
00016 #include "ace/Notification_Strategy.h"
00017 #include "ace/Truncate.h"
00018 
00019 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
00020 #include "ace/OS_NS_stdio.h"
00021 #include "ace/Monitor_Size.h"
00022 #endif /* ACE_HAS_MONITOR_POINTS==1 */
00023 
00024 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00025 
00026 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue)
00027 ACE_ALLOC_HOOK_DEFINE(ACE_Dynamic_Message_Queue)
00028 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Ex)
00029 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Ex_N)
00030 
00031 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00032 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump (void) const
00033 {
00034 #if defined (ACE_HAS_DUMP)
00035   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump");
00036 
00037   this->queue_.dump ();
00038 #endif /* ACE_HAS_DUMP */
00039 }
00040 
00041 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00042 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes (size_t new_value)
00043 {
00044   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes");
00045 
00046   this->queue_.message_bytes (new_value);
00047 }
00048 
00049 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00050 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length (size_t new_value)
00051 {
00052   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length");
00053 
00054   this->queue_.message_length (new_value);
00055 }
00056 
00057 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00058 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex (size_t hwm,
00059                                                                              size_t lwm,
00060                                                                              ACE_Notification_Strategy *ns)
00061 {
00062   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex");
00063 
00064   if (this->queue_.open (hwm, lwm, ns) == -1)
00065     ACE_ERROR ((LM_ERROR,
00066                 ACE_TEXT ("ACE_Message_Queue_Ex")));
00067 }
00068 
00069 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00070 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex (void)
00071 {
00072   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex");
00073 }
00074 
00075 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00076 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::open (size_t hwm,
00077                                                              size_t lwm,
00078                                                              ACE_Notification_Strategy *ns)
00079 {
00080   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::open");
00081 
00082   return this->queue_.open (hwm, lwm, ns);
00083 }
00084 
00085 // Clean up the queue if we have not already done so!
00086 
00087 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00088 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::close (void)
00089 {
00090   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::close");
00091 
00092   return this->queue_.close ();
00093 }
00094 
00095 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00096 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush (void)
00097 {
00098   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush");
00099 
00100   return this->queue_.flush ();
00101 }
00102 
00103 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00104 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush_i (void)
00105 {
00106   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush_i");
00107 
00108   return this->queue_.flush_i ();
00109 }
00110 
00111 // Take a look at the first item without removing it.
00112 
00113 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00114 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::peek_dequeue_head (ACE_MESSAGE_TYPE *&first_item,
00115                                                                           ACE_Time_Value *timeout)
00116 {
00117   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::peek_dequeue_head");
00118 
00119   ACE_Message_Block *mb = 0;
00120 
00121   int const cur_count = this->queue_.peek_dequeue_head (mb, timeout);
00122 
00123   if (cur_count != -1)
00124     first_item  = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00125 
00126   return cur_count;
00127 }
00128 
00129 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00130 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head (ACE_MESSAGE_TYPE *new_item,
00131                                                                      ACE_Time_Value *timeout)
00132 {
00133   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head");
00134 
00135   ACE_Message_Block *mb = 0;
00136 
00137   ACE_NEW_RETURN (mb,
00138                   ACE_Message_Block ((char *) new_item,
00139                                      sizeof (*new_item),
00140                                      ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00141                   -1);
00142 
00143   int const result = this->queue_.enqueue_head (mb, timeout);
00144   if (result == -1)
00145     // Zap the message.
00146     mb->release ();
00147   return result;
00148 }
00149 
00150 // Enqueue an <ACE_MESSAGE_TYPE *> into the <Message_Queue> in
00151 // accordance with its <msg_priority> (0 is lowest priority).  Returns
00152 // -1 on failure, else the number of items still on the queue.
00153 
00154 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00155 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue (ACE_MESSAGE_TYPE *new_item,
00156                                                                 ACE_Time_Value *timeout)
00157 {
00158   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue");
00159 
00160   return this->enqueue_prio (new_item, timeout);
00161 }
00162 
00163 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00164 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio (ACE_MESSAGE_TYPE *new_item,
00165                                                                      ACE_Time_Value *timeout,
00166                                                                      unsigned long priority)
00167 {
00168   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio");
00169 
00170   ACE_Message_Block *mb = 0;
00171 
00172   ACE_NEW_RETURN (mb,
00173                   ACE_Message_Block ((char *) new_item,
00174                                      sizeof (*new_item),
00175                                      priority),
00176                   -1);
00177 
00178   int const result = this->queue_.enqueue_prio (mb, timeout);
00179   if (result == -1)
00180     // Zap the message.
00181     mb->release ();
00182 
00183   return result;
00184 }
00185 
00186 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00187 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_deadline (ACE_MESSAGE_TYPE *new_item,
00188                                                                          ACE_Time_Value *timeout)
00189 {
00190   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_deadline");
00191 
00192   ACE_Message_Block *mb = 0;
00193 
00194   ACE_NEW_RETURN (mb,
00195                   ACE_Message_Block ((char *) new_item,
00196                                      sizeof (*new_item),
00197                                      ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY ),
00198                   -1);
00199 
00200   int const result = this->queue_.enqueue_deadline (mb, timeout);
00201   if (result == -1)
00202     // Zap the message.
00203     mb->release ();
00204 
00205   return result;
00206 }
00207 
00208 // Block indefinitely waiting for an item to arrive,
00209 // does not ignore alerts (e.g., signals).
00210 
00211 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00212 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail (ACE_MESSAGE_TYPE *new_item,
00213                                                                      ACE_Time_Value *timeout)
00214 {
00215   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail");
00216 
00217   ACE_Message_Block *mb = 0;
00218 
00219   ACE_NEW_RETURN (mb,
00220                   ACE_Message_Block ((char *) new_item,
00221                                      sizeof (*new_item),
00222                                      ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00223                   -1);
00224 
00225   int const result = this->queue_.enqueue_tail (mb, timeout);
00226   if (result == -1)
00227     // Zap the message.
00228     mb->release ();
00229   return result;
00230 }
00231 
00232 // Remove an item from the front of the queue.  If timeout == 0 block
00233 // indefinitely (or until an alert occurs).  Otherwise, block for upto
00234 // the amount of time specified by timeout.
00235 
00236 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00237 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_head (ACE_MESSAGE_TYPE *&first_item,
00238                                                                      ACE_Time_Value *timeout)
00239 {
00240   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_head");
00241 
00242   ACE_Message_Block *mb = 0;
00243 
00244   int const cur_count = this->queue_.dequeue_head (mb, timeout);
00245 
00246   // Dequeue the message.
00247   if (cur_count != -1)
00248     {
00249       first_item = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00250       // Delete the message block.
00251       mb->release ();
00252     }
00253 
00254   return cur_count;
00255 }
00256 
00257 // Remove the item with the lowest priority from the queue.  If timeout == 0
00258 // block indefinitely (or until an alert occurs).  Otherwise, block for upto
00259 // the amount of time specified by timeout.
00260 
00261 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00262 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_prio (ACE_MESSAGE_TYPE *&dequeued,
00263                                                                      ACE_Time_Value *timeout)
00264 {
00265   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_prio");
00266 
00267   ACE_Message_Block *mb = 0;
00268 
00269   int const cur_count = this->queue_.dequeue_prio (mb, timeout);
00270 
00271   // Dequeue the message.
00272   if (cur_count != -1)
00273     {
00274       dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00275       // Delete the message block.
00276       mb->release ();
00277     }
00278 
00279   return cur_count;
00280 }
00281 
00282 // Remove an item from the end of the queue.  If timeout == 0 block
00283 // indefinitely (or until an alert occurs).  Otherwise, block for upto
00284 // the amount of time specified by timeout.
00285 
00286 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00287 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_tail (ACE_MESSAGE_TYPE *&dequeued,
00288                                                                      ACE_Time_Value *timeout)
00289 {
00290   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_tail");
00291 
00292   ACE_Message_Block *mb = 0;
00293 
00294   int const cur_count = this->queue_.dequeue_tail (mb, timeout);
00295 
00296   // Dequeue the message.
00297   if (cur_count != -1)
00298     {
00299       dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00300       // Delete the message block.
00301       mb->release ();
00302     }
00303 
00304   return cur_count;
00305 }
00306 
00307 // Remove an item with the lowest deadline time.  If timeout == 0 block
00308 // indefinitely (or until an alert occurs).  Otherwise, block for upto
00309 // the amount of time specified by timeout.
00310 
00311 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00312 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_deadline (ACE_MESSAGE_TYPE *&dequeued,
00313                                                                          ACE_Time_Value *timeout)
00314 {
00315   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_deadline");
00316 
00317   ACE_Message_Block *mb = 0;
00318 
00319   int const cur_count = this->queue_.dequeue_deadline (mb, timeout);
00320 
00321   // Dequeue the message.
00322   if (cur_count != -1)
00323     {
00324       dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00325       // Delete the message block.
00326       mb->release ();
00327     }
00328 
00329   return cur_count;
00330 }
00331 
00332 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00333 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notify (void)
00334 {
00335   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notify");
00336 
00337   return this->queue_.notify ();
00338 }
00339 
00340 
00341 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00342 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex_N
00343   (size_t high_water_mark,
00344    size_t low_water_mark,
00345    ACE_Notification_Strategy *ns):
00346     ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE> (high_water_mark,
00347                                                            low_water_mark,
00348                                                            ns)
00349 {
00350   ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex_N");
00351 }
00352 
00353 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00354 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex_N (void)
00355 {
00356   ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex_N");
00357 }
00358 
00359 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00360 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head
00361   (ACE_MESSAGE_TYPE *new_item,
00362    ACE_Time_Value   *timeout)
00363 {
00364   ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head");
00365 
00366   // Create a chained ACE_Message_Blocks wrappers around the 'chained'
00367   // ACE_MESSAGE_TYPES.
00368   ACE_Message_Block *mb = this->wrap_with_mbs_i (new_item);
00369   if (0 == mb)
00370     {
00371       return -1;
00372     }
00373 
00374   int result = this->queue_.enqueue_head (mb, timeout);
00375   if (-1 == result)
00376     {
00377       // Zap the messages.
00378       mb->release ();
00379     }
00380   return result;
00381 }
00382 
00383 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00384 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail
00385   (ACE_MESSAGE_TYPE *new_item,
00386    ACE_Time_Value   *timeout)
00387 {
00388   ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail");
00389 
00390   // Create a chained ACE_Message_Blocks wrappers around the 'chained'
00391   // ACE_MESSAGE_TYPES.
00392   ACE_Message_Block *mb = this->wrap_with_mbs_i (new_item);
00393   if (0 == mb)
00394     {
00395       return -1;
00396     }
00397 
00398   int result = this->queue_.enqueue_tail (mb, timeout);
00399   if (-1 == result)
00400     {
00401       // Zap the message.
00402       mb->release ();
00403     }
00404   return result;
00405 }
00406 
00407 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_Message_Block *
00408 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::wrap_with_mbs_i
00409   (ACE_MESSAGE_TYPE *new_item)
00410 {
00411   ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::wrap_with_mbs_i");
00412 
00413   // We need to keep a reference to the head of the chain
00414   ACE_Message_Block *mb_head = 0;
00415 
00416   ACE_NEW_RETURN (mb_head,
00417                   ACE_Message_Block ((char *) new_item,
00418                                      sizeof (*new_item),
00419                                      ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00420                   0);
00421 
00422   // mb_tail will point to the last ACE_Message_Block
00423   ACE_Message_Block *mb_tail = mb_head;
00424 
00425   // Run through rest of the messages and wrap them
00426   for (ACE_MESSAGE_TYPE *pobj = new_item->next (); pobj; pobj = pobj->next ())
00427     {
00428       ACE_Message_Block *mb_temp = 0;
00429       ACE_NEW_NORETURN (mb_temp,
00430                         ACE_Message_Block ((char *) pobj,
00431                                            sizeof (*pobj),
00432                                            ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY));
00433       if (mb_temp == 0)
00434         {
00435           mb_head->release ();
00436           mb_head = 0;
00437           break;
00438         }
00439 
00440       mb_tail->next (mb_temp);
00441       mb_tail = mb_temp;
00442     }
00443 
00444   return mb_head;
00445 }
00446 
00447 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Reverse_Iterator)
00448 
00449 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00450 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue (ACE_MESSAGE_TYPE *&first_item,
00451                                                                 ACE_Time_Value *timeout)
00452 {
00453   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue");
00454 
00455   return this->dequeue_head (first_item, timeout);
00456 }
00457 
00458 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_Notification_Strategy *
00459 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy (void)
00460 {
00461   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy");
00462 
00463   return this->queue_.notification_strategy ();
00464 }
00465 
00466 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00467 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy (ACE_Notification_Strategy *s)
00468 {
00469   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy");
00470 
00471   this->queue_.notification_strategy (s);
00472 }
00473 
00474 // Check if queue is empty (holds locks).
00475 
00476 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> bool
00477 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_empty (void)
00478 {
00479   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_empty");
00480 
00481   return this->queue_.is_empty ();
00482 }
00483 
00484 // Check if queue is full (holds locks).
00485 
00486 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> bool
00487 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_full (void)
00488 {
00489   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_full");
00490 
00491   return this->queue_.is_full ();
00492 }
00493 
00494 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00495 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark (void)
00496 {
00497   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark");
00498 
00499   return this->queue_.high_water_mark ();
00500 }
00501 
00502 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00503 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark (size_t hwm)
00504 {
00505   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark");
00506 
00507   this->queue_.high_water_mark (hwm);
00508 }
00509 
00510 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00511 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark (void)
00512 {
00513   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark");
00514 
00515   return this->queue_.low_water_mark ();
00516 }
00517 
00518 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00519 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark (size_t lwm)
00520 {
00521   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark");
00522 
00523   this->queue_.low_water_mark (lwm);
00524 }
00525 
00526 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00527 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes (void)
00528 {
00529   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes");
00530 
00531   return this->queue_.message_bytes ();
00532 }
00533 
00534 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00535 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length (void)
00536 {
00537   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length");
00538 
00539   return this->queue_.message_length ();
00540 }
00541 
00542 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00543 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_count (void)
00544 {
00545   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_count");
00546 
00547   return this->queue_.message_count ();
00548 }
00549 
00550 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00551 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate (void)
00552 {
00553   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate");
00554 
00555   return this->queue_.deactivate ();
00556 }
00557 
00558 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00559 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate (void)
00560 {
00561   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate");
00562 
00563   return this->queue_.activate ();
00564 }
00565 
00566 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00567 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::pulse (void)
00568 {
00569   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::pulse");
00570 
00571   return this->queue_.pulse ();
00572 }
00573 
00574 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00575 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivated (void)
00576 {
00577   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivated");
00578 
00579   return this->queue_.deactivated ();
00580 }
00581 
00582 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00583 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::state (void)
00584 {
00585   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::state");
00586 
00587   return this->queue_.state ();
00588 }
00589 
00590 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_SYNCH_MUTEX_T &
00591 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::lock (void)
00592 {
00593   return this->queue_.lock ();
00594 }
00595 
00596 template <ACE_SYNCH_DECL>
00597 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
00598   : queue_ (q),
00599     curr_ (q.head_)
00600 {
00601 }
00602 
00603 template <ACE_SYNCH_DECL> int
00604 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
00605 {
00606   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00607 
00608   if (this->curr_ != 0)
00609     {
00610       entry = this->curr_;
00611       return 1;
00612     }
00613 
00614   return 0;
00615 }
00616 
00617 template <ACE_SYNCH_DECL> int
00618 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::done (void) const
00619 {
00620   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00621 
00622   return this->curr_ == 0;
00623 }
00624 
00625 template <ACE_SYNCH_DECL> int
00626 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::advance (void)
00627 {
00628   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00629 
00630   if (this->curr_)
00631     this->curr_ = this->curr_->next ();
00632   return this->curr_ != 0;
00633 }
00634 
00635 template <ACE_SYNCH_DECL> void
00636 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::dump (void) const
00637 {
00638 #if defined (ACE_HAS_DUMP)
00639 #endif /* ACE_HAS_DUMP */
00640 }
00641 
00642 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator)
00643 
00644 template <ACE_SYNCH_DECL>
00645 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
00646   : queue_ (q),
00647     curr_ (queue_.tail_)
00648 {
00649 }
00650 
00651 template <ACE_SYNCH_DECL> int
00652 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
00653 {
00654   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00655 
00656   if (this->curr_ != 0)
00657     {
00658       entry = this->curr_;
00659       return 1;
00660     }
00661 
00662   return 0;
00663 }
00664 
00665 template <ACE_SYNCH_DECL> int
00666 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::done (void) const
00667 {
00668   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00669 
00670   return this->curr_ == 0;
00671 }
00672 
00673 template <ACE_SYNCH_DECL> int
00674 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::advance (void)
00675 {
00676   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00677 
00678   if (this->curr_)
00679     this->curr_ = this->curr_->prev ();
00680   return this->curr_ != 0;
00681 }
00682 
00683 template <ACE_SYNCH_DECL> void
00684 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::dump (void) const
00685 {
00686 #if defined (ACE_HAS_DUMP)
00687 #endif /* ACE_HAS_DUMP */
00688 }
00689 
00690 template <ACE_SYNCH_DECL> int
00691 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue (ACE_Message_Block *&first_item,
00692                                            ACE_Time_Value *timeout)
00693 {
00694   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue");
00695   return this->dequeue_head (first_item, timeout);
00696 }
00697 
00698 template <ACE_SYNCH_DECL> ACE_Notification_Strategy *
00699 ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (void)
00700 {
00701   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy");
00702 
00703   return this->notification_strategy_;
00704 }
00705 
00706 template <ACE_SYNCH_DECL> void
00707 ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (ACE_Notification_Strategy *s)
00708 {
00709   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy");
00710 
00711   this->notification_strategy_ = s;
00712 }
00713 
00714 // Check if queue is empty (does not hold locks).
00715 
00716 template <ACE_SYNCH_DECL> bool
00717 ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i (void)
00718 {
00719   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i");
00720   return this->tail_ == 0;
00721 }
00722 
00723 // Check if queue is full (does not hold locks).
00724 
00725 template <ACE_SYNCH_DECL> bool
00726 ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i (void)
00727 {
00728   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i");
00729   return this->cur_bytes_ >= this->high_water_mark_;
00730 }
00731 
00732 // Check if queue is empty (holds locks).
00733 
00734 template <ACE_SYNCH_DECL> bool
00735 ACE_Message_Queue<ACE_SYNCH_USE>::is_empty (void)
00736 {
00737   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty");
00738   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, false);
00739 
00740   return this->is_empty_i ();
00741 }
00742 
00743 // Check if queue is full (holds locks).
00744 
00745 template <ACE_SYNCH_DECL> bool
00746 ACE_Message_Queue<ACE_SYNCH_USE>::is_full (void)
00747 {
00748   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full");
00749   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, false);
00750 
00751   return this->is_full_i ();
00752 }
00753 
00754 template <ACE_SYNCH_DECL> size_t
00755 ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (void)
00756 {
00757   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark");
00758   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00759 
00760   return this->high_water_mark_;
00761 }
00762 
00763 template <ACE_SYNCH_DECL> void
00764 ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (size_t hwm)
00765 {
00766   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark");
00767   ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00768 
00769   this->high_water_mark_ = hwm;
00770 }
00771 
00772 template <ACE_SYNCH_DECL> size_t
00773 ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (void)
00774 {
00775   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark");
00776   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00777 
00778   return this->low_water_mark_;
00779 }
00780 
00781 template <ACE_SYNCH_DECL> void
00782 ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (size_t lwm)
00783 {
00784   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark");
00785   ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00786 
00787   this->low_water_mark_ = lwm;
00788 }
00789 
00790 template <ACE_SYNCH_DECL> size_t
00791 ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (void)
00792 {
00793   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes");
00794   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00795 
00796   return this->cur_bytes_;
00797 }
00798 
00799 template <ACE_SYNCH_DECL> size_t
00800 ACE_Message_Queue<ACE_SYNCH_USE>::message_length (void)
00801 {
00802   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_length");
00803   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00804 
00805   return this->cur_length_;
00806 }
00807 
00808 template <ACE_SYNCH_DECL> size_t
00809 ACE_Message_Queue<ACE_SYNCH_USE>::message_count (void)
00810 {
00811   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_count");
00812   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00813 
00814   return this->cur_count_;
00815 }
00816 
00817 template <ACE_SYNCH_DECL> int
00818 ACE_Message_Queue<ACE_SYNCH_USE>::deactivate ()
00819 {
00820   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate");
00821   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00822 
00823   return this->deactivate_i (0);   // Not a pulse
00824 }
00825 
00826 template <ACE_SYNCH_DECL> int
00827 ACE_Message_Queue<ACE_SYNCH_USE>::activate (void)
00828 {
00829   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate");
00830   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00831 
00832   return this->activate_i ();
00833 }
00834 
00835 template <ACE_SYNCH_DECL> int
00836 ACE_Message_Queue<ACE_SYNCH_USE>::pulse ()
00837 {
00838   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::pulse");
00839   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00840 
00841   return this->deactivate_i (1);   // Just a pulse
00842 }
00843 
00844 template <ACE_SYNCH_DECL> int
00845 ACE_Message_Queue<ACE_SYNCH_USE>::deactivated (void)
00846 {
00847   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivated");
00848 
00849   return this->state_ == ACE_Message_Queue_Base::DEACTIVATED;
00850 }
00851 
00852 template <ACE_SYNCH_DECL> int
00853 ACE_Message_Queue<ACE_SYNCH_USE>::state (void)
00854 {
00855   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::state");
00856 
00857   return this->state_;
00858 }
00859 
00860 template <ACE_SYNCH_DECL> ACE_SYNCH_MUTEX_T &
00861 ACE_Message_Queue<ACE_SYNCH_USE>::lock (void)
00862 {
00863   return this->lock_;
00864 }
00865 
00866 template <ACE_SYNCH_DECL> void
00867 ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const
00868 {
00869 #if defined (ACE_HAS_DUMP)
00870   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dump");
00871   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00872   switch (this->state_)
00873     {
00874     case ACE_Message_Queue_Base::ACTIVATED:
00875       ACE_DEBUG ((LM_DEBUG,
00876                   ACE_TEXT ("state = ACTIVATED\n")));
00877       break;
00878     case ACE_Message_Queue_Base::DEACTIVATED:
00879       ACE_DEBUG ((LM_DEBUG,
00880                   ACE_TEXT ("state = DEACTIVATED\n")));
00881       break;
00882     case ACE_Message_Queue_Base::PULSED:
00883       ACE_DEBUG ((LM_DEBUG,
00884                   ACE_TEXT ("state = PULSED\n")));
00885       break;
00886     }
00887   ACE_DEBUG ((LM_DEBUG,
00888               ACE_TEXT ("low_water_mark = %d\n")
00889               ACE_TEXT ("high_water_mark = %d\n")
00890               ACE_TEXT ("cur_bytes = %d\n")
00891               ACE_TEXT ("cur_length = %d\n")
00892               ACE_TEXT ("cur_count = %d\n")
00893               ACE_TEXT ("head_ = %u\n")
00894               ACE_TEXT ("tail_ = %u\n"),
00895               this->low_water_mark_,
00896               this->high_water_mark_,
00897               this->cur_bytes_,
00898               this->cur_length_,
00899               this->cur_count_,
00900               this->head_,
00901               this->tail_));
00902   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("not_full_cond: \n")));
00903   not_full_cond_.dump ();
00904   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("not_empty_cond: \n")));
00905   not_empty_cond_.dump ();
00906   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00907 #endif /* ACE_HAS_DUMP */
00908 }
00909 
00910 template <ACE_SYNCH_DECL> void
00911 ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (size_t new_value)
00912 {
00913   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes");
00914   ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00915 
00916   this->cur_bytes_ = new_value;
00917 }
00918 
00919 template <ACE_SYNCH_DECL> void
00920 ACE_Message_Queue<ACE_SYNCH_USE>::message_length (size_t new_value)
00921 {
00922   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_length");
00923   ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00924 
00925   this->cur_length_ = new_value;
00926 }
00927 
00928 template <ACE_SYNCH_DECL>
00929 ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue (size_t hwm,
00930                                                      size_t lwm,
00931                                                      ACE_Notification_Strategy *ns)
00932   : not_empty_cond_ (lock_)
00933   , not_full_cond_ (lock_)
00934 {
00935   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue");
00936 
00937   if (this->open (hwm, lwm, ns) == -1)
00938     ACE_ERROR ((LM_ERROR,
00939                 ACE_TEXT ("open")));
00940 
00941 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
00942   ACE_NEW (this->monitor_,
00943            ACE::Monitor_Control::Size_Monitor);
00944 
00945   /// Make a unique name using our hex address.
00946   const int nibbles = 2 * sizeof (ptrdiff_t);
00947   char buf[nibbles + 1];
00948   ACE_OS::sprintf (buf, "%p", this);
00949   buf[nibbles] = '\0';
00950   ACE_CString name_str ("Message_Queue_");
00951   name_str += buf;
00952   this->monitor_->name (name_str.c_str ());
00953   this->monitor_->add_to_registry ();
00954 #endif /* ACE_HAS_MONITOR_POINTS==1 */
00955 }
00956 
00957 template <ACE_SYNCH_DECL>
00958 ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue (void)
00959 {
00960   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue");
00961   if (this->head_ != 0 && this->close () == -1)
00962     ACE_ERROR ((LM_ERROR,
00963                 ACE_TEXT ("close")));
00964 
00965 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
00966   this->monitor_->remove_from_registry ();
00967   this->monitor_->remove_ref ();
00968 #endif /* ACE_HAS_MONITOR_POINTS==1 */
00969 }
00970 
00971 template <ACE_SYNCH_DECL> int
00972 ACE_Message_Queue<ACE_SYNCH_USE>::flush_i (void)
00973 {
00974   int number_flushed = 0;
00975 
00976   // Remove all the <ACE_Message_Block>s in the <ACE_Message_Queue>
00977   // and <release> their memory.
00978   for (this->tail_ = 0; this->head_ != 0; )
00979     {
00980       ++number_flushed;
00981 
00982       size_t mb_bytes = 0;
00983       size_t mb_length = 0;
00984       this->head_->total_size_and_length (mb_bytes,
00985                                           mb_length);
00986       // Subtract off all of the bytes associated with this message.
00987       this->cur_bytes_ -= mb_bytes;
00988       this->cur_length_ -= mb_length;
00989       --this->cur_count_;
00990 
00991       ACE_Message_Block *temp = this->head_;
00992       this->head_ = this->head_->next ();
00993 
00994       // Make sure to use <release> rather than <delete> since this is
00995       // reference counted.
00996       temp->release ();
00997     }
00998 
00999 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01000   // The monitor should output only if the size has actually changed.
01001   if (number_flushed > 0)
01002     {
01003       this->monitor_->receive (this->cur_length_);
01004     }
01005 #endif
01006 
01007   return number_flushed;
01008 }
01009 
01010 // Don't bother locking since if someone calls this function more than
01011 // once for the same queue, we're in bigger trouble than just
01012 // concurrency control!
01013 
01014 template <ACE_SYNCH_DECL> int
01015 ACE_Message_Queue<ACE_SYNCH_USE>::open (size_t hwm,
01016                                         size_t lwm,
01017                                         ACE_Notification_Strategy *ns)
01018 {
01019   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::open");
01020   this->high_water_mark_ = hwm;
01021   this->low_water_mark_  = lwm;
01022   this->state_ = ACE_Message_Queue_Base::ACTIVATED;
01023   this->cur_bytes_ = 0;
01024   this->cur_length_ = 0;
01025   this->cur_count_ = 0;
01026   this->tail_ = 0;
01027   this->head_ = 0;
01028   this->notification_strategy_ = ns;
01029   return 0;
01030 }
01031 
01032 // Implementation of the public deactivate() method
01033 // (assumes locks are held).
01034 
01035 template <ACE_SYNCH_DECL> int
01036 ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i (int pulse)
01037 {
01038   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i");
01039   int const previous_state = this->state_;
01040 
01041   if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
01042     {
01043       // Wakeup all waiters.
01044       this->not_empty_cond_.broadcast ();
01045       this->not_full_cond_.broadcast ();
01046 
01047       if (pulse)
01048         this->state_ = ACE_Message_Queue_Base::PULSED;
01049       else
01050         this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
01051     }
01052   return previous_state;
01053 }
01054 
01055 template <ACE_SYNCH_DECL> int
01056 ACE_Message_Queue<ACE_SYNCH_USE>::activate_i (void)
01057 {
01058   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate_i");
01059   int const previous_state = this->state_;
01060   this->state_ = ACE_Message_Queue_Base::ACTIVATED;
01061   return previous_state;
01062 }
01063 
01064 template <ACE_SYNCH_DECL> int
01065 ACE_Message_Queue<ACE_SYNCH_USE>::flush (void)
01066 {
01067   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::flush");
01068   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01069 
01070   // Free up the remaining messages on the queue.
01071   return this->flush_i ();
01072 }
01073 
01074 // Clean up the queue if we have not already done so!
01075 
01076 template <ACE_SYNCH_DECL> int
01077 ACE_Message_Queue<ACE_SYNCH_USE>::close (void)
01078 {
01079   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::close");
01080   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01081 
01082   int const result = this->deactivate_i ();
01083 
01084   // Free up the remaining messages on the queue.
01085   this->flush_i ();
01086 
01087   return result;
01088 }
01089 
01090 template <ACE_SYNCH_DECL> int
01091 ACE_Message_Queue<ACE_SYNCH_USE>::signal_enqueue_waiters (void)
01092 {
01093   if (this->not_full_cond_.signal () != 0)
01094     return -1;
01095   return 0;
01096 }
01097 
01098 template <ACE_SYNCH_DECL> int
01099 ACE_Message_Queue<ACE_SYNCH_USE>::signal_dequeue_waiters (void)
01100 {
01101   // Tell any blocked threads that the queue has a new item!
01102   if (this->not_empty_cond_.signal () != 0)
01103     return -1;
01104   return 0;
01105 }
01106 
01107 // Actually put the node at the end (no locking so must be called with
01108 // locks held).
01109 
01110 template <ACE_SYNCH_DECL> int
01111 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i (ACE_Message_Block *new_item)
01112 {
01113   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i");
01114 
01115   if (new_item == 0)
01116     return -1;
01117 
01118   // Update the queued size and length, taking into account any chained
01119   // blocks (total_size_and_length() counts all continuation blocks).
01120   // Keep count of how many blocks we're adding and, if there is a chain of
01121   // blocks, find the end in seq_tail and be sure they're properly
01122   // back-connected along the way.
01123   ACE_Message_Block *seq_tail = new_item;
01124   ++this->cur_count_;
01125   new_item->total_size_and_length (this->cur_bytes_,
01126                                    this->cur_length_);
01127   while (seq_tail->next () != 0)
01128     {
01129       seq_tail->next ()->prev (seq_tail);
01130       seq_tail = seq_tail->next ();
01131       ++this->cur_count_;
01132       seq_tail->total_size_and_length (this->cur_bytes_,
01133                                        this->cur_length_);
01134     }
01135 
01136   // List was empty, so build a new one.
01137   if (this->tail_ == 0)
01138     {
01139       this->head_ = new_item;
01140       this->tail_ = seq_tail;
01141       // seq_tail->next (0);   This is a condition of the while() loop above.
01142       new_item->prev (0);
01143     }
01144   // Link at the end.
01145   else
01146     {
01147       // seq_tail->next (0);   This is a condition of the while() loop above.
01148       this->tail_->next (new_item);
01149       new_item->prev (this->tail_);
01150       this->tail_ = seq_tail;
01151     }
01152 
01153   if (this->signal_dequeue_waiters () == -1)
01154     return -1;
01155   else
01156     return ACE_Utils::truncate_cast<int> (this->cur_count_);
01157 }
01158 
01159 // Actually put the node(s) at the head (no locking)
01160 
01161 template <ACE_SYNCH_DECL> int
01162 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i (ACE_Message_Block *new_item)
01163 {
01164   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i");
01165 
01166   if (new_item == 0)
01167     return -1;
01168 
01169   // Update the queued size and length, taking into account any chained
01170   // blocks (total_size_and_length() counts all continuation blocks).
01171   // Keep count of how many blocks we're adding and, if there is a chain of
01172   // blocks, find the end in seq_tail and be sure they're properly
01173   // back-connected along the way.
01174   ACE_Message_Block *seq_tail = new_item;
01175   ++this->cur_count_;
01176   new_item->total_size_and_length (this->cur_bytes_,
01177                                    this->cur_length_);
01178   while (seq_tail->next () != 0)
01179     {
01180       seq_tail->next ()->prev (seq_tail);
01181       seq_tail = seq_tail->next ();
01182       ++this->cur_count_;
01183       seq_tail->total_size_and_length (this->cur_bytes_,
01184                                        this->cur_length_);
01185     }
01186 
01187   new_item->prev (0);
01188   seq_tail->next (this->head_);
01189 
01190   if (this->head_ != 0)
01191     this->head_->prev (seq_tail);
01192   else
01193     this->tail_ = seq_tail;
01194 
01195   this->head_ = new_item;
01196 
01197   if (this->signal_dequeue_waiters () == -1)
01198     return -1;
01199   else
01200     return ACE_Utils::truncate_cast<int> (this->cur_count_);
01201 }
01202 
01203 // Actually put the node at its proper position relative to its
01204 // priority.
01205 
01206 template <ACE_SYNCH_DECL> int
01207 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
01208 {
01209   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
01210 
01211   if (new_item == 0)
01212     return -1;
01213 
01214   // Since this method uses enqueue_head_i() and enqueue_tail_i() for
01215   // special situations, and this method doesn't support enqueueing
01216   // chains of blocks off the 'next' pointer, make sure the new_item's
01217   // next pointer is 0.
01218   new_item->next (0);
01219 
01220   if (this->head_ == 0)
01221     // Check for simple case of an empty queue, where all we need to
01222     // do is insert <new_item> into the head.
01223     return this->enqueue_head_i (new_item);
01224   else
01225     {
01226       ACE_Message_Block *temp = 0;
01227 
01228       // Figure out where the new item goes relative to its priority.
01229       // We start looking from the lowest priority (at the tail) to
01230       // the highest priority (at the head).
01231 
01232       for (temp = this->tail_;
01233            temp != 0;
01234            temp = temp->prev ())
01235         if (temp->msg_priority () >= new_item->msg_priority ())
01236           // Break out when we've located an item that has
01237           // greater or equal priority.
01238           break;
01239 
01240       if (temp == 0)
01241         // Check for simple case of inserting at the head of the queue,
01242         // where all we need to do is insert <new_item> before the
01243         // current head.
01244         return this->enqueue_head_i (new_item);
01245       else if (temp->next () == 0)
01246         // Check for simple case of inserting at the tail of the
01247         // queue, where all we need to do is insert <new_item> after
01248         // the current tail.
01249         return this->enqueue_tail_i (new_item);
01250       else
01251         {
01252           // Insert the new message behind the message of greater or
01253           // equal priority.  This ensures that FIFO order is
01254           // maintained when messages of the same priority are
01255           // inserted consecutively.
01256           new_item->prev (temp);
01257           new_item->next (temp->next ());
01258           temp->next ()->prev (new_item);
01259           temp->next (new_item);
01260         }
01261     }
01262 
01263   // Make sure to count all the bytes in a composite message!!!
01264   new_item->total_size_and_length (this->cur_bytes_,
01265                                    this->cur_length_);
01266   ++this->cur_count_;
01267 
01268   if (this->signal_dequeue_waiters () == -1)
01269     return -1;
01270   else
01271     return ACE_Utils::truncate_cast<int> (this->cur_count_);
01272 }
01273 
01274 // Actually put the node at its proper position relative to its
01275 // deadline time.
01276 
01277 template <ACE_SYNCH_DECL> int
01278 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i (ACE_Message_Block *new_item)
01279 {
01280 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
01281   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i");
01282 
01283   if (new_item == 0)
01284     return -1;
01285 
01286   // Since this method uses enqueue_head_i() and enqueue_tail_i() for
01287   // special situations, and this method doesn't support enqueueing
01288   // chains of blocks off the 'next' pointer, make sure the new_item's
01289   // next pointer is 0.
01290   new_item->next (0);
01291 
01292   if (this->head_ == 0)
01293     // Check for simple case of an empty queue, where all we need to
01294     // do is insert <new_item> into the head.
01295     return this->enqueue_head_i (new_item);
01296   else
01297     {
01298       ACE_Message_Block *temp = 0;
01299 
01300       // Figure out where the new item goes relative to its priority.
01301       // We start looking from the smallest deadline to the highest
01302       // deadline.
01303 
01304       for (temp = this->head_;
01305            temp != 0;
01306            temp = temp->next ())
01307         if (new_item->msg_deadline_time () < temp->msg_deadline_time ())
01308           // Break out when we've located an item that has
01309           // greater or equal priority.
01310           break;
01311 
01312       if (temp == 0 || temp->next () == 0)
01313         // Check for simple case of inserting at the tail of the queue,
01314         // where all we need to do is insert <new_item> after the
01315         // current tail.
01316         return this->enqueue_tail_i (new_item);
01317       else
01318         {
01319           // Insert the new message behind the message of
01320           // lesser or equal deadline time.  This ensures that FIFO order is
01321           // maintained when messages of the same priority are
01322           // inserted consecutively.
01323           new_item->prev (temp);
01324           new_item->next (temp->next ());
01325           temp->next ()->prev (new_item);
01326           temp->next (new_item);
01327         }
01328     }
01329 
01330   // Make sure to count all the bytes in a composite message!!!
01331   new_item->total_size_and_length (this->cur_bytes_,
01332                                    this->cur_length_);
01333   ++this->cur_count_;
01334 
01335   if (this->signal_dequeue_waiters () == -1)
01336     return -1;
01337   else
01338     return this->cur_count_;
01339 #else
01340   return this->enqueue_tail_i (new_item);
01341 #endif /* ACE_HAS_TIMED_MESSAGE_BLOCKS */
01342 }
01343 
01344 // Actually get the first ACE_Message_Block (no locking, so must be
01345 // called with locks held).  This method assumes that the queue has at
01346 // least one item in it when it is called.
01347 
01348 template <ACE_SYNCH_DECL> int
01349 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
01350 {
01351   if (this->head_ ==0)
01352     ACE_ERROR_RETURN ((LM_ERROR,
01353                        ACE_TEXT ("Attempting to dequeue from empty queue")),
01354                       -1);
01355   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
01356   first_item = this->head_;
01357   this->head_ = this->head_->next ();
01358 
01359   if (this->head_ == 0)
01360     this->tail_ = 0;
01361   else
01362     // The prev pointer of first message block must point to 0...
01363     this->head_->prev (0);
01364 
01365   size_t mb_bytes = 0;
01366   size_t mb_length = 0;
01367   first_item->total_size_and_length (mb_bytes,
01368                                      mb_length);
01369   // Subtract off all of the bytes associated with this message.
01370   this->cur_bytes_ -= mb_bytes;
01371   this->cur_length_ -= mb_length;
01372   --this->cur_count_;
01373 
01374   if (this->cur_count_ == 0 && this->head_ == this->tail_)
01375     this->head_ = this->tail_ = 0;
01376 
01377   // Make sure that the prev and next fields are 0!
01378   first_item->prev (0);
01379   first_item->next (0);
01380 
01381 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01382   this->monitor_->receive (this->cur_length_);
01383 #endif
01384 
01385   // Only signal enqueueing threads if we've fallen below the low
01386   // water mark.
01387   if (this->cur_bytes_ <= this->low_water_mark_
01388       && this->signal_enqueue_waiters () == -1)
01389     return -1;
01390   else
01391     return ACE_Utils::truncate_cast<int> (this->cur_count_);
01392 }
01393 
01394 // Get the earliest (i.e., FIFO) ACE_Message_Block with the lowest
01395 // priority (no locking, so must be called with locks held).  This
01396 // method assumes that the queue has at least one item in it when it
01397 // is called.
01398 
01399 template <ACE_SYNCH_DECL> int
01400 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio_i (ACE_Message_Block *&dequeued)
01401 {
01402   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio_i");
01403 
01404   if (this->head_ == 0)
01405     return -1;
01406 
01407   // Find the earliest (i.e., FIFO) message enqueued with the lowest
01408   // priority.
01409   ACE_Message_Block *chosen = 0;
01410   u_long priority = ULONG_MAX;
01411 
01412   for (ACE_Message_Block *temp = this->tail_;
01413        temp != 0;
01414        temp = temp->prev ())
01415     {
01416       // Find the first version of the earliest message (i.e.,
01417       // preserve FIFO order for messages at the same priority).
01418       if (temp->msg_priority () <= priority)
01419         {
01420           priority = temp->msg_priority ();
01421           chosen = temp;
01422         }
01423     }
01424 
01425   // If every message block is the same priority, pass back the first
01426   // one.
01427   if (chosen == 0)
01428     chosen = this->head_;
01429 
01430   // Patch up the queue.  If we don't have a previous then we are at
01431   // the head of the queue.
01432   if (chosen->prev () == 0)
01433     this->head_ = chosen->next ();
01434   else
01435     chosen->prev ()->next (chosen->next ());
01436 
01437   if (chosen->next () == 0)
01438     this->tail_ = chosen->prev ();
01439   else
01440     chosen->next ()->prev (chosen->prev ());
01441 
01442   // Pass back the chosen block
01443   dequeued = chosen;
01444 
01445   size_t mb_bytes = 0;
01446   size_t mb_length = 0;
01447   dequeued->total_size_and_length (mb_bytes,
01448                                    mb_length);
01449   // Subtract off all of the bytes associated with this message.
01450   this->cur_bytes_ -= mb_bytes;
01451   this->cur_length_ -= mb_length;
01452   --this->cur_count_;
01453 
01454   if (this->cur_count_ == 0 && this->head_ == this->tail_)
01455     this->head_ = this->tail_ = 0;
01456 
01457   // Make sure that the prev and next fields are 0!
01458   dequeued->prev (0);
01459   dequeued->next (0);
01460 
01461   // Only signal enqueueing threads if we've fallen below the low
01462   // water mark.
01463   if (this->cur_bytes_ <= this->low_water_mark_
01464       && this->signal_enqueue_waiters () == -1)
01465     return -1;
01466   else
01467     return ACE_Utils::truncate_cast<int> (this->cur_count_);
01468 }
01469 
01470 // Actually get the last ACE_Message_Block (no locking, so must be
01471 // called with locks held).  This method assumes that the queue has at
01472 // least one item in it when it is called.
01473 
01474 template <ACE_SYNCH_DECL> int
01475 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail_i (ACE_Message_Block *&dequeued)
01476 {
01477   if (this->head_ == 0)
01478     ACE_ERROR_RETURN ((LM_ERROR,
01479                        ACE_TEXT ("Attempting to dequeue from empty queue")),
01480                       -1);
01481   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail_i");
01482   dequeued = this->tail_;
01483   if (this->tail_->prev () == 0)
01484     {
01485       this->head_ = 0;
01486       this->tail_ = 0;
01487     }
01488   else
01489     {
01490       this->tail_->prev ()->next (0);
01491       this->tail_ = this->tail_->prev ();
01492     }
01493 
01494   size_t mb_bytes = 0;
01495   size_t mb_length = 0;
01496   dequeued->total_size_and_length (mb_bytes,
01497                                    mb_length);
01498   // Subtract off all of the bytes associated with this message.
01499   this->cur_bytes_ -= mb_bytes;
01500   this->cur_length_ -= mb_length;
01501   --this->cur_count_;
01502 
01503   if (this->cur_count_ == 0 && this->head_ == this->tail_)
01504     this->head_ = this->tail_ = 0;
01505 
01506   // Make sure that the prev and next fields are 0!
01507   dequeued->prev (0);
01508   dequeued->next (0);
01509 
01510   // Only signal enqueueing threads if we've fallen below the low
01511   // water mark.
01512   if (this->cur_bytes_ <= this->low_water_mark_
01513       && this->signal_enqueue_waiters () == -1)
01514     return -1;
01515   else
01516     return ACE_Utils::truncate_cast<int> (this->cur_count_);
01517 }
01518 
01519 // Actually get the ACE_Message_Block with the lowest deadline time
01520 // (no locking, so must be called with locks held).  This method assumes
01521 // that the queue has at least one item in it when it is called.
01522 
01523 template <ACE_SYNCH_DECL> int
01524 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline_i (ACE_Message_Block *&dequeued)
01525 {
01526 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
01527   if (this->head_ == 0)
01528     ACE_ERROR_RETURN ((LM_ERROR,
01529                        ACE_TEXT ("Attempting to dequeue from empty queue")),
01530                       -1);
01531   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline_i");
01532 
01533   // Find the last message enqueued with the lowest deadline time
01534   ACE_Message_Block* chosen = 0;
01535   ACE_Time_Value deadline = ACE_Time_Value::max_time;
01536   for (ACE_Message_Block *temp = this->head_; temp != 0; temp = temp->next ())
01537     if (temp->msg_deadline_time () < deadline)
01538       {
01539         deadline = temp->msg_deadline_time ();
01540         chosen = temp;
01541       }
01542 
01543   // If every message block is the same deadline time,
01544   // pass back the first one
01545   if (chosen == 0)
01546     chosen = this->head_;
01547 
01548   // Patch up the queue.  If we don't have a previous
01549   // then we are at the head of the queue.
01550   if (chosen->prev () == 0)
01551     this->head_ = chosen->next ();
01552   else
01553     chosen->prev ()->next (chosen->next ());
01554 
01555   if (chosen->next () == 0)
01556     this->tail_ = chosen->prev ();
01557   else
01558     chosen->next ()->prev (chosen->prev ());
01559 
01560   // Pass back the chosen block
01561   dequeued = chosen;
01562 
01563   size_t mb_bytes = 0;
01564   size_t mb_length = 0;
01565   dequeued->total_size_and_length (mb_bytes,
01566                                    mb_length);
01567   // Subtract off all of the bytes associated with this message.
01568   this->cur_bytes_ -= mb_bytes;
01569   this->cur_length_ -= mb_length;
01570   --this->cur_count_;
01571 
01572   if (this->cur_count_ == 0 && this->head_ == this->tail_)
01573     this->head_ = this->tail_ = 0;
01574 
01575   // Make sure that the prev and next fields are 0!
01576   dequeued->prev (0);
01577   dequeued->next (0);
01578 
01579   // Only signal enqueueing threads if we've fallen below the low
01580   // water mark.
01581   if (this->cur_bytes_ <= this->low_water_mark_
01582       && this->signal_enqueue_waiters () == -1)
01583     return -1;
01584   else
01585     return this->cur_count_;
01586 #else
01587   return this->dequeue_head_i (dequeued);
01588 #endif /* ACE_HAS_TIMED_MESSAGE_BLOCKS */
01589 }
01590 
01591 // Take a look at the first item without removing it.
01592 
01593 template <ACE_SYNCH_DECL> int
01594 ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
01595                                                      ACE_Time_Value *timeout)
01596 {
01597   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head");
01598   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01599 
01600   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01601     {
01602       errno = ESHUTDOWN;
01603       return -1;
01604     }
01605 
01606   // Wait for at least one item to become available.
01607 
01608   if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01609     return -1;
01610 
01611   first_item = this->head_;
01612   return ACE_Utils::truncate_cast<int> (this->cur_count_);
01613 }
01614 
01615 template <ACE_SYNCH_DECL> int
01616 ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &,
01617                                                       ACE_Time_Value *timeout)
01618 {
01619   int result = 0;
01620 
01621   // Wait while the queue is full.
01622 
01623   while (this->is_full_i ())
01624     {
01625       if (this->not_full_cond_.wait (timeout) == -1)
01626         {
01627           if (errno == ETIME)
01628             errno = EWOULDBLOCK;
01629           result = -1;
01630           break;
01631         }
01632       if (this->state_ != ACE_Message_Queue_Base::ACTIVATED)
01633         {
01634           errno = ESHUTDOWN;
01635           result = -1;
01636           break;
01637         }
01638     }
01639   return result;
01640 }
01641 
01642 template <ACE_SYNCH_DECL> int
01643 ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond
01644     (ACE_Guard<ACE_SYNCH_MUTEX_T> &, ACE_Time_Value *timeout)
01645 {
01646   int result = 0;
01647 
01648   // Wait while the queue is empty.
01649 
01650   while (this->is_empty_i ())
01651     {
01652       if (this->not_empty_cond_.wait (timeout) == -1)
01653         {
01654           if (errno == ETIME)
01655             errno = EWOULDBLOCK;
01656           result = -1;
01657           break;
01658         }
01659       if (this->state_ != ACE_Message_Queue_Base::ACTIVATED)
01660         {
01661           errno = ESHUTDOWN;
01662           result = -1;
01663           break;
01664         }
01665     }
01666   return result;
01667 }
01668 
01669 // Block indefinitely waiting for an item to arrive, does not ignore
01670 // alerts (e.g., signals).
01671 
01672 template <ACE_SYNCH_DECL> int
01673 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
01674                                                 ACE_Time_Value *timeout)
01675 {
01676   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
01677   int queue_count = 0;
01678   {
01679     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01680 
01681     if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01682       {
01683         errno = ESHUTDOWN;
01684         return -1;
01685       }
01686 
01687     if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01688       return -1;
01689 
01690     queue_count = this->enqueue_head_i (new_item);
01691 
01692     if (queue_count == -1)
01693       return -1;
01694 
01695     this->notify ();
01696   }
01697   return queue_count;
01698 }
01699 
01700 // Enqueue an <ACE_Message_Block *> into the <Message_Queue> in
01701 // accordance with its <msg_priority> (0 is lowest priority).  Returns
01702 // -1 on failure, else the number of items still on the queue.
01703 
01704 template <ACE_SYNCH_DECL> int
01705 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item,
01706                                                 ACE_Time_Value *timeout)
01707 {
01708   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio");
01709   int queue_count = 0;
01710   {
01711     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01712 
01713     if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01714       {
01715         errno = ESHUTDOWN;
01716         return -1;
01717       }
01718 
01719     if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01720       return -1;
01721 
01722     queue_count = this->enqueue_i (new_item);
01723 
01724     if (queue_count == -1)
01725       return -1;
01726 
01727     this->notify ();
01728   }
01729   return queue_count;
01730 }
01731 
01732 // Enqueue an <ACE_Message_Block *> into the <Message_Queue> in
01733 // accordance with its <msg_deadline_time>.  Returns
01734 // -1 on failure, else the number of items still on the queue.
01735 
01736 template <ACE_SYNCH_DECL> int
01737 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline (ACE_Message_Block *new_item,
01738                                                     ACE_Time_Value *timeout)
01739 {
01740   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline");
01741   int queue_count = 0;
01742   {
01743     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01744 
01745     if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01746       {
01747         errno = ESHUTDOWN;
01748         return -1;
01749       }
01750 
01751     if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01752       return -1;
01753 
01754     queue_count = this->enqueue_deadline_i (new_item);
01755 
01756     if (queue_count == -1)
01757       return -1;
01758 
01759     this->notify ();
01760   }
01761   return queue_count;
01762 }
01763 
01764 template <ACE_SYNCH_DECL> int
01765 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item,
01766                                            ACE_Time_Value *timeout)
01767 {
01768   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue");
01769   return this->enqueue_prio (new_item, timeout);
01770 }
01771 
01772 // Block indefinitely waiting for an item to arrive,
01773 // does not ignore alerts (e.g., signals).
01774 
01775 template <ACE_SYNCH_DECL> int
01776 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
01777                                               ACE_Time_Value *timeout)
01778 {
01779   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
01780   int queue_count = 0;
01781   {
01782     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01783 
01784     if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01785       {
01786         errno = ESHUTDOWN;
01787         return -1;
01788       }
01789 
01790     if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01791       return -1;
01792 
01793     queue_count = this->enqueue_tail_i (new_item);
01794 
01795     if (queue_count == -1)
01796       return -1;
01797 
01798     this->notify ();
01799   }
01800   return queue_count;
01801 }
01802 
01803 // Remove an item from the front of the queue.  If timeout == 0 block
01804 // indefinitely (or until an alert occurs).  Otherwise, block for upto
01805 // the amount of time specified by timeout.
01806 
01807 template <ACE_SYNCH_DECL> int
01808 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
01809                                                 ACE_Time_Value *timeout)
01810 {
01811   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
01812   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01813 
01814   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01815     {
01816       errno = ESHUTDOWN;
01817       return -1;
01818     }
01819 
01820   if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01821     return -1;
01822 
01823   return this->dequeue_head_i (first_item);
01824 }
01825 
01826 // Remove item with the lowest priority from the queue.  If timeout == 0 block
01827 // indefinitely (or until an alert occurs).  Otherwise, block for upto
01828 // the amount of time specified by timeout.
01829 
01830 template <ACE_SYNCH_DECL> int
01831 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio (ACE_Message_Block *&dequeued,
01832                                                 ACE_Time_Value *timeout)
01833 {
01834   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio");
01835   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01836 
01837   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01838     {
01839       errno = ESHUTDOWN;
01840       return -1;
01841     }
01842 
01843   if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01844     return -1;
01845 
01846   return this->dequeue_prio_i (dequeued);
01847 }
01848 
01849 // Remove an item from the end of the queue.  If timeout == 0 block
01850 // indefinitely (or until an alert occurs).  Otherwise, block for upto
01851 // the amount of time specified by timeout.
01852 
01853 template <ACE_SYNCH_DECL> int
01854 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail (ACE_Message_Block *&dequeued,
01855                                                 ACE_Time_Value *timeout)
01856 {
01857   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail");
01858   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01859 
01860   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01861     {
01862       errno = ESHUTDOWN;
01863       return -1;
01864     }
01865 
01866   if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01867     return -1;
01868 
01869   return this->dequeue_tail_i (dequeued);
01870 }
01871 
01872 // Remove an item with the lowest deadline time.  If timeout == 0 block
01873 // indefinitely (or until an alert occurs).  Otherwise, block for upto
01874 // the amount of time specified by timeout.
01875 
01876 template <ACE_SYNCH_DECL> int
01877 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline (ACE_Message_Block *&dequeued,
01878                                                     ACE_Time_Value *timeout)
01879 {
01880   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline");
01881   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01882 
01883   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01884     {
01885       errno = ESHUTDOWN;
01886       return -1;
01887     }
01888 
01889   if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01890     return -1;
01891 
01892   return this->dequeue_deadline_i (dequeued);
01893 }
01894 
01895 template <ACE_SYNCH_DECL> int
01896 ACE_Message_Queue<ACE_SYNCH_USE>::notify (void)
01897 {
01898   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notify");
01899 
01900 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01901   this->monitor_->receive (this->cur_length_);
01902 #endif
01903 
01904   // By default, don't do anything.
01905   if (this->notification_strategy_ == 0)
01906     return 0;
01907   else
01908     return this->notification_strategy_->notify ();
01909 }
01910 
01911 // = Initialization and termination methods.
01912 template <ACE_SYNCH_DECL>
01913 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy,
01914                                                                      size_t hwm,
01915                                                                      size_t lwm,
01916                                                                      ACE_Notification_Strategy *ns)
01917   : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
01918     pending_head_ (0),
01919     pending_tail_ (0),
01920     late_head_ (0),
01921     late_tail_ (0),
01922     beyond_late_head_ (0),
01923     beyond_late_tail_ (0),
01924     message_strategy_ (message_strategy)
01925 {
01926   // Note, the ACE_Dynamic_Message_Queue assumes full responsibility
01927   // for the passed ACE_Dynamic_Message_Strategy object, and deletes
01928   // it in its own dtor
01929 }
01930 
01931 // dtor: free message strategy and let base class dtor do the rest.
01932 
01933 template <ACE_SYNCH_DECL>
01934 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void)
01935 {
01936   delete &this->message_strategy_;
01937 }
01938 
01939 template <ACE_SYNCH_DECL> int
01940 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_messages (ACE_Message_Block *&list_head,
01941                                                            ACE_Message_Block *&list_tail,
01942                                                            u_int status_flags)
01943 {
01944   // start with an empty list
01945   list_head = 0;
01946   list_tail = 0;
01947 
01948   // Get the current time
01949   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01950 
01951   // Refresh priority status boundaries in the queue.
01952   int result = this->refresh_queue (current_time);
01953   if (result < 0)
01954     return result;
01955 
01956   if (ACE_BIT_ENABLED (status_flags,
01957                        (u_int) ACE_Dynamic_Message_Strategy::PENDING)
01958       && this->pending_head_
01959       && this->pending_tail_)
01960     {
01961       // patch up pointers for the new tail of the queue
01962       if (this->pending_head_->prev ())
01963         {
01964           this->tail_ = this->pending_head_->prev ();
01965           this->pending_head_->prev ()->next (0);
01966         }
01967       else
01968         {
01969           // the list has become empty
01970           this->head_ = 0;
01971           this->tail_ = 0;
01972         }
01973 
01974       // point to the head and tail of the list
01975       list_head = this->pending_head_;
01976       list_tail = this->pending_tail_;
01977 
01978       // cut the pending messages out of the queue entirely
01979       this->pending_head_->prev (0);
01980       this->pending_head_ = 0;
01981       this->pending_tail_ = 0;
01982     }
01983 
01984   if (ACE_BIT_ENABLED (status_flags,
01985                        (u_int) ACE_Dynamic_Message_Strategy::LATE)
01986       && this->late_head_
01987       && this->late_tail_)
01988     {
01989       // Patch up pointers for the (possibly) new head and tail of the
01990       // queue.
01991       if (this->late_tail_->next ())
01992         this->late_tail_->next ()->prev (this->late_head_->prev ());
01993       else
01994         this->tail_ = this->late_head_->prev ();
01995 
01996       if (this->late_head_->prev ())
01997         this->late_head_->prev ()->next (this->late_tail_->next ());
01998       else
01999         this->head_ = this->late_tail_->next ();
02000 
02001       // put late messages behind pending messages (if any) being returned
02002       this->late_head_->prev (list_tail);
02003       if (list_tail)
02004         list_tail->next (this->late_head_);
02005       else
02006         list_head = this->late_head_;
02007 
02008       list_tail = this->late_tail_;
02009 
02010       this->late_tail_->next (0);
02011       this->late_head_ = 0;
02012       this->late_tail_ = 0;
02013     }
02014 
02015   if (ACE_BIT_ENABLED (status_flags,
02016       (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE)
02017       && this->beyond_late_head_
02018       && this->beyond_late_tail_)
02019     {
02020       // Patch up pointers for the new tail of the queue
02021       if (this->beyond_late_tail_->next ())
02022         {
02023           this->head_ = this->beyond_late_tail_->next ();
02024           this->beyond_late_tail_->next ()->prev (0);
02025         }
02026       else
02027         {
02028           // the list has become empty
02029           this->head_ = 0;
02030           this->tail_ = 0;
02031         }
02032 
02033       // Put beyond late messages at the end of the list being
02034       // returned.
02035       if (list_tail)
02036         {
02037           this->beyond_late_head_->prev (list_tail);
02038           list_tail->next (this->beyond_late_head_);
02039         }
02040       else
02041         list_head = this->beyond_late_head_;
02042 
02043       list_tail = this->beyond_late_tail_;
02044 
02045       this->beyond_late_tail_->next (0);
02046       this->beyond_late_head_ = 0;
02047       this->beyond_late_tail_ = 0;
02048     }
02049 
02050   // Decrement message and size counts for removed messages.
02051   ACE_Message_Block *temp1;
02052 
02053   for (temp1 = list_head;
02054        temp1 != 0;
02055        temp1 = temp1->next ())
02056     {
02057       --this->cur_count_;
02058 
02059       size_t mb_bytes = 0;
02060       size_t mb_length = 0;
02061       temp1->total_size_and_length (mb_bytes,
02062                                     mb_length);
02063       // Subtract off all of the bytes associated with this message.
02064       this->cur_bytes_ -= mb_bytes;
02065       this->cur_length_ -= mb_length;
02066     }
02067 
02068   return result;
02069 }
02070 
02071 // Detach all messages with status given in the passed flags from the
02072 // queue and return them by setting passed head and tail pointers to
02073 // the linked list they comprise.  This method is intended primarily
02074 // as a means of periodically harvesting messages that have missed
02075 // their deadlines, but is available in its most general form.  All
02076 // messages are returned in priority order, from head to tail, as of
02077 // the time this method was called.
02078 
02079 template <ACE_SYNCH_DECL> int
02080 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
02081                                                         ACE_Time_Value *timeout)
02082 {
02083   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
02084 
02085   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
02086 
02087   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
02088     {
02089       errno = ESHUTDOWN;
02090       return -1;
02091     }
02092 
02093   int result;
02094 
02095   // get the current time
02096   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02097 
02098   // refresh priority status boundaries in the queue
02099   result = this->refresh_queue (current_time);
02100   if (result < 0)
02101     return result;
02102 
02103   // *now* it's appropriate to wait for an enqueued item
02104   result = this->wait_not_empty_cond (ace_mon, timeout);
02105   if (result == -1)
02106     return result;
02107 
02108   // call the internal dequeue method, which selects an item from the
02109   // highest priority status portion of the queue that has messages
02110   // enqueued.
02111   result = this->dequeue_head_i (first_item);
02112 
02113   return result;
02114 }
02115 
02116 // Dequeue and return the <ACE_Message_Block *> at the (logical) head
02117 // of the queue.
02118 
02119 template <ACE_SYNCH_DECL> void
02120 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump (void) const
02121 {
02122 #if defined (ACE_HAS_DUMP)
02123   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump");
02124   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02125 
02126   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n")));
02127   this->ACE_Message_Queue<ACE_SYNCH_USE>::dump ();
02128 
02129   ACE_DEBUG ((LM_DEBUG,
02130               ACE_TEXT ("pending_head_ = %u\n")
02131               ACE_TEXT ("pending_tail_ = %u\n")
02132               ACE_TEXT ("late_head_ = %u\n")
02133               ACE_TEXT ("late_tail_ = %u\n")
02134               ACE_TEXT ("beyond_late_head_ = %u\n")
02135               ACE_TEXT ("beyond_late_tail_ = %u\n"),
02136               this->pending_head_,
02137               this->pending_tail_,
02138               this->late_head_,
02139               this->late_tail_,
02140               this->beyond_late_head_,
02141               this->beyond_late_tail_));
02142 
02143   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("message_strategy_ : \n")));
02144   message_strategy_.dump ();
02145 
02146   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02147 #endif /* ACE_HAS_DUMP */
02148 }
02149   // dump the state of the queue
02150 
02151 template <ACE_SYNCH_DECL> int
02152 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
02153 {
02154   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
02155 
02156   if (new_item == 0)
02157     return -1;
02158 
02159   int result = 0;
02160 
02161   // Get the current time.
02162   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02163 
02164   // Refresh priority status boundaries in the queue.
02165 
02166   result = this->refresh_queue (current_time);
02167   if (result < 0)
02168     return result;
02169 
02170   // Where we enqueue depends on the message's priority status.
02171   switch (message_strategy_.priority_status (*new_item,
02172                                              current_time))
02173     {
02174     case ACE_Dynamic_Message_Strategy::PENDING:
02175       if (this->pending_tail_ == 0)
02176         {
02177           // Check for simple case of an empty pending queue, where
02178           // all we need to do is insert <new_item> into the tail of
02179           // the queue.
02180           pending_head_ = new_item;
02181           pending_tail_ = pending_head_;
02182           return this->enqueue_tail_i (new_item);
02183         }
02184       else
02185         {
02186           // Enqueue the new message in priority order in the pending
02187           // sublist
02188           result = sublist_enqueue_i (new_item,
02189                                       current_time,
02190                                       this->pending_head_,
02191                                       this->pending_tail_,
02192                                       ACE_Dynamic_Message_Strategy::PENDING);
02193         }
02194       break;
02195 
02196     case ACE_Dynamic_Message_Strategy::LATE:
02197       if (this->late_tail_ == 0)
02198         {
02199           late_head_ = new_item;
02200           late_tail_ = late_head_;
02201 
02202           if (this->pending_head_ == 0)
02203             // Check for simple case of an empty pending queue,
02204             // where all we need to do is insert <new_item> into the
02205             // tail of the queue.
02206             return this->enqueue_tail_i (new_item);
02207           else if (this->beyond_late_tail_ == 0)
02208             // Check for simple case of an empty beyond late queue, where all
02209             // we need to do is insert <new_item> into the head of the queue.
02210             return this->enqueue_head_i (new_item);
02211           else
02212             {
02213               // Otherwise, we can just splice the new message in
02214               // between the pending and beyond late portions of the
02215               // queue.
02216               this->beyond_late_tail_->next (new_item);
02217               new_item->prev (this->beyond_late_tail_);
02218               this->pending_head_->prev (new_item);
02219               new_item->next (this->pending_head_);
02220             }
02221         }
02222       else
02223         {
02224           // Enqueue the new message in priority order in the late
02225           // sublist
02226           result = sublist_enqueue_i (new_item,
02227                                       current_time,
02228                                       this->late_head_,
02229                                       this->late_tail_,
02230                                       ACE_Dynamic_Message_Strategy::LATE);
02231         }
02232       break;
02233 
02234     case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02235       if (this->beyond_late_tail_ == 0)
02236         {
02237           // Check for simple case of an empty beyond late queue,
02238           // where all we need to do is insert <new_item> into the
02239           // head of the queue.
02240           beyond_late_head_ = new_item;
02241           beyond_late_tail_ = beyond_late_head_;
02242           return this->enqueue_head_i (new_item);
02243         }
02244       else
02245         {
02246           // all beyond late messages have the same (zero) priority,
02247           // so just put the new one at the end of the beyond late
02248           // messages
02249           if (this->beyond_late_tail_->next ())
02250             this->beyond_late_tail_->next ()->prev (new_item);
02251           else
02252             this->tail_ = new_item;
02253 
02254           new_item->next (this->beyond_late_tail_->next ());
02255           this->beyond_late_tail_->next (new_item);
02256           new_item->prev (this->beyond_late_tail_);
02257           this->beyond_late_tail_ = new_item;
02258         }
02259 
02260       break;
02261 
02262       // should never get here, but just in case...
02263     default:
02264       result = -1;
02265       break;
02266     }
02267 
02268   if (result < 0)
02269     return result;
02270 
02271   size_t mb_bytes = 0;
02272   size_t mb_length = 0;
02273   new_item->total_size_and_length (mb_bytes,
02274                                    mb_length);
02275   this->cur_bytes_ += mb_bytes;
02276   this->cur_length_ += mb_length;
02277   ++this->cur_count_;
02278 
02279   if (this->signal_dequeue_waiters () == -1)
02280     return -1;
02281   else
02282     return this->cur_count_;
02283 }
02284 
02285 // Enqueue an <ACE_Message_Block *> in accordance with its priority.
02286 // priority may be *dynamic* or *static* or a combination or *both* It
02287 // calls the priority evaluation function passed into the Dynamic
02288 // Message Queue constructor to update the priorities of all enqueued
02289 // messages.
02290 
02291 template <ACE_SYNCH_DECL> int
02292 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::sublist_enqueue_i (ACE_Message_Block *new_item,
02293                                                              const ACE_Time_Value &current_time,
02294                                                              ACE_Message_Block *&sublist_head,
02295                                                              ACE_Message_Block *&sublist_tail,
02296                                                              ACE_Dynamic_Message_Strategy::Priority_Status status)
02297 {
02298   int result = 0;
02299   ACE_Message_Block *current_item = 0;
02300 
02301   // Find message after which to enqueue new item, based on message
02302   // priority and priority status.
02303   for (current_item = sublist_tail;
02304        current_item;
02305        current_item = current_item->prev ())
02306     {
02307       if (message_strategy_.priority_status (*current_item, current_time) == status)
02308         {
02309           if (current_item->msg_priority () >= new_item->msg_priority ())
02310             break;
02311         }
02312       else
02313         {
02314           sublist_head = new_item;
02315           break;
02316         }
02317     }
02318 
02319   if (current_item == 0)
02320     {
02321       // If the new message has highest priority of any, put it at the
02322       // head of the list (and sublist).
02323       new_item->prev (0);
02324       new_item->next (this->head_);
02325       if (this->head_ != 0)
02326         this->head_->prev (new_item);
02327       else
02328         {
02329           this->tail_ = new_item;
02330           sublist_tail = new_item;
02331         }
02332       this->head_ = new_item;
02333       sublist_head = new_item;
02334     }
02335   else
02336     {
02337       // insert the new item into the list
02338       new_item->next (current_item->next ());
02339       new_item->prev (current_item);
02340 
02341       if (current_item->next ())
02342         current_item->next ()->prev (new_item);
02343       else
02344         this->tail_ = new_item;
02345 
02346       current_item->next (new_item);
02347 
02348       // If the new item has lowest priority of any in the sublist,
02349       // move the tail pointer of the sublist back to the new item
02350       if (current_item == sublist_tail)
02351         sublist_tail = new_item;
02352     }
02353 
02354   return result;
02355 }
02356 
02357 // Enqueue a message in priority order within a given priority status
02358 // sublist.
02359 
02360 template <ACE_SYNCH_DECL> int
02361 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
02362 {
02363   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
02364 
02365   int result = 0;
02366   int last_in_subqueue = 0;
02367 
02368   // first, try to dequeue from the head of the pending list
02369   if (this->pending_head_)
02370     {
02371       first_item = this->pending_head_;
02372 
02373       if (0 == this->pending_head_->prev ())
02374         this->head_ = this->pending_head_->next ();
02375       else
02376         this->pending_head_->prev ()->next (this->pending_head_->next ());
02377 
02378       if (0 == this->pending_head_->next ())
02379         {
02380           this->tail_ = this->pending_head_->prev ();
02381           this->pending_head_ = 0;
02382           this->pending_tail_ = 0;
02383         }
02384       else
02385         {
02386           this->pending_head_->next ()->prev (this->pending_head_->prev ());
02387           this->pending_head_ = this->pending_head_->next ();
02388         }
02389 
02390       first_item->prev (0);
02391       first_item->next (0);
02392     }
02393 
02394   // Second, try to dequeue from the head of the late list
02395   else if (this->late_head_)
02396     {
02397       last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0;
02398 
02399       first_item = this->late_head_;
02400 
02401       if (0 == this->late_head_->prev ())
02402         this->head_ = this->late_head_->next ();
02403       else
02404         this->late_head_->prev ()->next (this->late_head_->next ());
02405 
02406       if (0 == this->late_head_->next ())
02407         this->tail_ = this->late_head_->prev ();
02408       else
02409         {
02410           this->late_head_->next ()->prev (this->late_head_->prev ());
02411           this->late_head_ = this->late_head_->next ();
02412         }
02413 
02414       if (last_in_subqueue)
02415         {
02416           this->late_head_ = 0;
02417           this->late_tail_ = 0;
02418         }
02419 
02420       first_item->prev (0);
02421       first_item->next (0);
02422     }
02423   // finally, try to dequeue from the head of the beyond late list
02424   else if (this->beyond_late_head_)
02425     {
02426       last_in_subqueue =
02427         (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0;
02428 
02429       first_item = this->beyond_late_head_;
02430       this->head_ = this->beyond_late_head_->next ();
02431 
02432       if (0 == this->beyond_late_head_->next ())
02433         this->tail_ = this->beyond_late_head_->prev ();
02434       else
02435         {
02436           this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ());
02437           this->beyond_late_head_ = this->beyond_late_head_->next ();
02438         }
02439 
02440       if (last_in_subqueue)
02441         {
02442           this->beyond_late_head_ = 0;
02443           this->beyond_late_tail_ = 0;
02444         }
02445 
02446       first_item->prev (0);
02447       first_item->next (0);
02448     }
02449   else
02450     {
02451       // nothing to dequeue: set the pointer to zero and return an error code
02452       first_item = 0;
02453       result = -1;
02454     }
02455 
02456   if (result < 0)
02457     return result;
02458 
02459   size_t mb_bytes = 0;
02460   size_t mb_length = 0;
02461   first_item->total_size_and_length (mb_bytes,
02462                                      mb_length);
02463   // Subtract off all of the bytes associated with this message.
02464   this->cur_bytes_ -= mb_bytes;
02465   this->cur_length_ -= mb_length;
02466   --this->cur_count_;
02467 
02468   // Only signal enqueueing threads if we've fallen below the low
02469   // water mark.
02470   if (this->cur_bytes_ <= this->low_water_mark_
02471       && this->signal_enqueue_waiters () == -1)
02472     return -1;
02473   else
02474     return this->cur_count_;
02475 }
02476 
02477 // Dequeue and return the <ACE_Message_Block *> at the head of the
02478 // logical queue.  Attempts first to dequeue from the pending portion
02479 // of the queue, or if that is empty from the late portion, or if that
02480 // is empty from the beyond late portion, or if that is empty just
02481 // sets the passed pointer to zero and returns -1.
02482 
02483 template <ACE_SYNCH_DECL> int
02484 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value &current_time)
02485 {
02486   int result;
02487 
02488   result = refresh_pending_queue (current_time);
02489 
02490   if (result != -1)
02491     result = refresh_late_queue (current_time);
02492 
02493   return result;
02494 }
02495 
02496 // Refresh the queue using the strategy specific priority status
02497 // function.
02498 
02499 template <ACE_SYNCH_DECL> int
02500 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_pending_queue (const ACE_Time_Value &current_time)
02501 {
02502   ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02503 
02504   // refresh priority status boundaries in the queue
02505   if (this->pending_head_)
02506     {
02507       current_status = message_strategy_.priority_status (*this->pending_head_,
02508                                                           current_time);
02509       switch (current_status)
02510         {
02511         case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02512           // Make sure the head of the beyond late queue is set (there
02513           // may not have been any beyond late messages previously)
02514           this->beyond_late_head_ = this->head_;
02515 
02516           // Zero out the late queue pointers, and set them only if
02517           // there turn out to be late messages in the pending sublist
02518           this->late_head_ = 0;
02519           this->late_tail_ = 0;
02520 
02521           // Advance through the beyond late messages in the pending queue
02522           do
02523             {
02524               this->pending_head_ = this->pending_head_->next ();
02525 
02526               if (this->pending_head_)
02527                 current_status = message_strategy_.priority_status (*this->pending_head_,
02528                                                                     current_time);
02529               else
02530                 break;  // do while
02531 
02532             }
02533           while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02534 
02535           if (this->pending_head_)
02536             {
02537               // point tail of beyond late sublist to previous item
02538               this->beyond_late_tail_ = this->pending_head_->prev ();
02539 
02540               if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02541                 // there are no late messages left in the queue
02542                 break; // switch
02543               else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02544                 {
02545                   // if we got here, something is *seriously* wrong with the queue
02546                   ACE_ERROR_RETURN ((LM_ERROR,
02547                                      ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02548                                      (int) current_status),
02549                                     -1);
02550                 }
02551               /* FALLTHRU */
02552             }
02553           else
02554             {
02555               // There are no pending or late messages left in the
02556               // queue.
02557               this->beyond_late_tail_ = this->tail_;
02558               this->pending_head_ = 0;
02559               this->pending_tail_ = 0;
02560               break; // switch
02561             }
02562 
02563         case ACE_Dynamic_Message_Strategy::LATE:
02564           // Make sure the head of the late queue is set (there may
02565           // not have been any late messages previously, or they may
02566           // have all become beyond late).
02567           if (this->late_head_ == 0)
02568             this->late_head_ = this->pending_head_;
02569 
02570           // advance through the beyond late messages in the pending queue
02571           do
02572             {
02573               this->pending_head_ = this->pending_head_->next ();
02574 
02575               if (this->pending_head_)
02576                 current_status = message_strategy_.priority_status (*this->pending_head_,
02577                                                                     current_time);
02578               else
02579                 break;  // do while
02580 
02581             }
02582           while (current_status == ACE_Dynamic_Message_Strategy::LATE);
02583 
02584           if (this->pending_head_)
02585             {
02586               if (current_status != ACE_Dynamic_Message_Strategy::PENDING)
02587                 // if we got here, something is *seriously* wrong with the queue
02588                 ACE_ERROR_RETURN((LM_ERROR,
02589                                   ACE_TEXT ("Unexpected message priority status [%d] (expected PENDING)"),
02590                                   (int) current_status),
02591                                  -1);
02592 
02593               // Point tail of late sublist to previous item
02594               this->late_tail_ = this->pending_head_->prev ();
02595             }
02596           else
02597             {
02598               // there are no pending messages left in the queue
02599               this->late_tail_ = this->tail_;
02600               this->pending_head_ = 0;
02601               this->pending_tail_ = 0;
02602             }
02603 
02604           break; // switch
02605         case ACE_Dynamic_Message_Strategy::PENDING:
02606           // do nothing - the pending queue is unchanged
02607           break; // switch
02608         default:
02609           // if we got here, something is *seriously* wrong with the queue
02610           ACE_ERROR_RETURN((LM_ERROR,
02611                             ACE_TEXT ("Unknown message priority status [%d]"),
02612                             (int) current_status),
02613                            -1);
02614         }
02615     }
02616   return 0;
02617 }
02618 
02619 // Refresh the pending queue using the strategy specific priority
02620 // status function.
02621 
02622 template <ACE_SYNCH_DECL> int
02623 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_late_queue (const ACE_Time_Value &current_time)
02624 {
02625   ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02626 
02627   if (this->late_head_)
02628     {
02629       current_status = message_strategy_.priority_status (*this->late_head_,
02630                                                           current_time);
02631       switch (current_status)
02632         {
02633         case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02634 
02635           // make sure the head of the beyond late queue is set
02636           // (there may not have been any beyond late messages previously)
02637           this->beyond_late_head_ = this->head_;
02638 
02639           // advance through the beyond late messages in the late queue
02640           do
02641             {
02642               this->late_head_ = this->late_head_->next ();
02643 
02644               if (this->late_head_)
02645                 current_status = message_strategy_.priority_status (*this->late_head_,
02646                                                                     current_time);
02647               else
02648                 break;  // do while
02649 
02650             }
02651           while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02652 
02653           if (this->late_head_)
02654             {
02655               // point tail of beyond late sublist to previous item
02656               this->beyond_late_tail_ = this->late_head_->prev ();
02657 
02658               if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02659                 {
02660                   // there are no late messages left in the queue
02661                   this->late_head_ = 0;
02662                   this->late_tail_ = 0;
02663                 }
02664               else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02665                 // if we got here, something is *seriously* wrong with the queue
02666                 ACE_ERROR_RETURN ((LM_ERROR,
02667                                    ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02668                                    (int) current_status),
02669                                   -1);
02670             }
02671           else
02672             {
02673               // there are no late messages left in the queue
02674               this->beyond_late_tail_ = this->tail_;
02675               this->late_head_ = 0;
02676               this->late_tail_ = 0;
02677             }
02678 
02679           break;  // switch
02680 
02681         case ACE_Dynamic_Message_Strategy::LATE:
02682           // do nothing - the late queue is unchanged
02683           break; // switch
02684 
02685         case ACE_Dynamic_Message_Strategy::PENDING:
02686           // if we got here, something is *seriously* wrong with the queue
02687           ACE_ERROR_RETURN ((LM_ERROR,
02688                              ACE_TEXT ("Unexpected message priority status ")
02689                              ACE_TEXT ("[%d] (expected LATE or BEYOND_LATE)"),
02690                              (int) current_status),
02691                             -1);
02692         default:
02693           // if we got here, something is *seriously* wrong with the queue
02694           ACE_ERROR_RETURN ((LM_ERROR,
02695                              ACE_TEXT ("Unknown message priority status [%d]"),
02696                              (int) current_status),
02697                             -1);
02698         }
02699     }
02700 
02701   return 0;
02702 }
02703 
02704 // Refresh the late queue using the strategy specific priority status
02705 // function.
02706 
02707 template <ACE_SYNCH_DECL> int
02708 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
02709                                                              ACE_Time_Value *timeout)
02710 {
02711   return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item,
02712                                                               timeout);
02713 }
02714 
02715 // Private method to hide public base class method: just calls base
02716 // class method.
02717 
02718 template <ACE_SYNCH_DECL> int
02719 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
02720                                                         ACE_Time_Value *timeout)
02721 {
02722   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
02723   return this->enqueue_prio (new_item, timeout);
02724 }
02725 
02726 // Just call priority enqueue method: tail enqueue semantics for
02727 // dynamic message queues are unstable: the message may or may not be
02728 // where it was placed after the queue is refreshed prior to the next
02729 // enqueue or dequeue operation.
02730 
02731 template <ACE_SYNCH_DECL> int
02732 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
02733                                                         ACE_Time_Value *timeout)
02734 {
02735   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
02736   return this->enqueue_prio (new_item, timeout);
02737 }
02738 
02739 // Just call priority enqueue method: head enqueue semantics for
02740 // dynamic message queues are unstable: the message may or may not be
02741 // where it was placed after the queue is refreshed prior to the next
02742 // enqueue or dequeue operation.
02743 
02744 template <ACE_SYNCH_DECL>
02745 ACE_Message_Queue<ACE_SYNCH_USE> *
02746 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_static_message_queue (size_t hwm,
02747                                                                        size_t lwm,
02748                                                                        ACE_Notification_Strategy *ns)
02749 {
02750   ACE_Message_Queue<ACE_SYNCH_USE> *tmp = 0;
02751 
02752   ACE_NEW_RETURN (tmp,
02753                   ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
02754                   0);
02755   return tmp;
02756 }
02757 
02758 // Factory method for a statically prioritized ACE_Message_Queue.
02759 
02760 template <ACE_SYNCH_DECL>
02761 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
02762 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t hwm,
02763                                                                          size_t lwm,
02764                                                                          ACE_Notification_Strategy *ns,
02765                                                                          u_long static_bit_field_mask,
02766                                                                          u_long static_bit_field_shift,
02767                                                                          u_long dynamic_priority_max,
02768                                                                          u_long dynamic_priority_offset)
02769 {
02770   ACE_Deadline_Message_Strategy *adms = 0;
02771 
02772   ACE_NEW_RETURN (adms,
02773                   ACE_Deadline_Message_Strategy (static_bit_field_mask,
02774                                                  static_bit_field_shift,
02775                                                  dynamic_priority_max,
02776                                                  dynamic_priority_offset),
02777                   0);
02778 
02779   ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *tmp = 0;
02780   ACE_NEW_RETURN (tmp,
02781                   ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adms, hwm, lwm, ns),
02782                   0);
02783   return tmp;
02784 }
02785 
02786 // Factory method for a dynamically prioritized (by time to deadline)
02787 // ACE_Dynamic_Message_Queue.
02788 
02789 template <ACE_SYNCH_DECL>
02790 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
02791 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hwm,
02792                                                                        size_t lwm,
02793                                                                        ACE_Notification_Strategy *ns,
02794                                                                        u_long static_bit_field_mask,
02795                                                                        u_long static_bit_field_shift,
02796                                                                        u_long dynamic_priority_max,
02797                                                                        u_long dynamic_priority_offset)
02798 {
02799   ACE_Laxity_Message_Strategy *alms = 0;
02800 
02801   ACE_NEW_RETURN (alms,
02802                   ACE_Laxity_Message_Strategy (static_bit_field_mask,
02803                                                static_bit_field_shift,
02804                                                dynamic_priority_max,
02805                                                dynamic_priority_offset),
02806                   0);
02807 
02808   ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *tmp = 0;
02809   ACE_NEW_RETURN (tmp,
02810                   ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alms, hwm, lwm, ns),
02811                   0);
02812   return tmp;
02813 }
02814 
02815 // Factory method for a dynamically prioritized (by laxity)
02816 // <ACE_Dynamic_Message_Queue>.
02817 
02818 #if defined (ACE_VXWORKS)
02819 
02820 template <ACE_SYNCH_DECL>
02821 ACE_Message_Queue_Vx *
02822 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_Vx_message_queue (size_t max_messages,
02823                                                                    size_t max_message_length,
02824                                                                    ACE_Notification_Strategy *ns)
02825 {
02826   ACE_Message_Queue_Vx *tmp = 0;
02827 
02828   ACE_NEW_RETURN (tmp,
02829                   ACE_Message_Queue_Vx (max_messages, max_message_length, ns),
02830                   0);
02831   return tmp;
02832 }
02833   // factory method for a wrapped VxWorks message queue
02834 
02835 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
02836 
02837 template <ACE_SYNCH_DECL>
02838 ACE_Message_Queue_NT *
02839 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_NT_message_queue (size_t max_threads)
02840 {
02841   ACE_Message_Queue_NT *tmp = 0;
02842 
02843   ACE_NEW_RETURN (tmp,
02844                   ACE_Message_Queue_NT (max_threads);
02845                   0);
02846   return tmp;
02847 }
02848 
02849 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
02850 #endif /* defined (ACE_VXWORKS) */
02851 
02852 ACE_END_VERSIONED_NAMESPACE_DECL
02853 
02854 #endif /* !ACE_MESSAGE_QUEUE_T_CPP */

Generated on Tue Feb 2 17:18:40 2010 for ACE by  doxygen 1.4.7