Message_Queue_T.h

Go to the documentation of this file.
00001 /* -*- C++ -*- */
00002 
00003 //=============================================================================
00004 /**
00005  *  @file    Message_Queue_T.h
00006  *
00007  *  $Id: Message_Queue_T.h 79384 2007-08-17 13:13:33Z johnnyw $
00008  *
00009  *  @author Douglas C. Schmidt <schmidt@cs.wustl.edu>
00010  */
00011 //=============================================================================
00012 
00013 #ifndef ACE_MESSAGE_QUEUE_T_H
00014 #define ACE_MESSAGE_QUEUE_T_H
00015 #include /**/ "ace/pre.h"
00016 
00017 #include "ace/Message_Queue.h"
00018 #include "ace/Synch_Traits.h"
00019 #include "ace/Guard_T.h"
00020 
00021 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00022 # pragma once
00023 #endif /* ACE_LACKS_PRAGMA_ONCE */
00024 
00025 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00026 
00027 #if defined (ACE_VXWORKS)
00028 class ACE_Message_Queue_Vx;
00029 #endif /* defined (ACE_VXWORKS) */
00030 
00031 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
00032 class ACE_Message_Queue_NT;
00033 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO*/
00034 
00035 /**
00036  * @class ACE_Message_Queue
00037  *
00038  * @brief A message queueing facility with parameterized synchronization
00039  * capability. ACE_Message_Queue is modeled after the queueing facilities
00040  * in System V STREAMs.
00041  *
00042  * ACE_Message_Queue is the primary queueing facility for
00043  * messages in the ACE framework.  It's one template argument parameterizes
00044  * the queue's synchronization. The argument specifies a synchronization
00045  * strategy. The two main strategies available for ACE_SYNCH_DECL are:
00046  *   -# ACE_MT_SYNCH: all operations are thread-safe
00047  *   -# ACE_NULL_SYNCH: no synchronization and no locking overhead
00048  *
00049  * All data passing through ACE_Message_Queue is in the form of
00050  * ACE_Message_Block objects. @sa ACE_Message_Block.
00051  */
00052 template <ACE_SYNCH_DECL>
00053 class ACE_Message_Queue : public ACE_Message_Queue_Base
00054 {
00055 public:
00056   friend class ACE_Message_Queue_Iterator<ACE_SYNCH_USE>;
00057   friend class ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>;
00058 
00059   // = Traits
00060   typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE>
00061           ITERATOR;
00062   typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>
00063           REVERSE_ITERATOR;
00064 
00065   /**
00066    * @name Initialization methods
00067    */
00068   //@{
00069   /**
00070    * Initialize an ACE_Message_Queue.
00071    *
00072    * @param hwm High water mark. Determines how many bytes can be stored in a
00073    *        queue before it's considered full.  Supplier threads must block
00074    *        until the queue is no longer full.
00075    * @param lwm Low water mark. Determines how many bytes must be in the queue
00076    *        before supplier threads are allowed to enqueue additional
00077    *        data.  By default, the @a hwm equals @a lwm, which means
00078    *        that suppliers will be able to enqueue new messages as soon as
00079    *        a consumer removes any message from the queue.  Making the low
00080    *        water mark smaller than the high water mark forces consumers to
00081    *        drain more messages from the queue before suppliers can enqueue
00082    *        new messages, which can minimize the "silly window syndrome."
00083    * @param ns Notification strategy. Pointer to an object conforming to the
00084    *        ACE_Notification_Strategy interface. If set, the object's
00085    *        notify(void) method will be called each time data is added to
00086    *        this ACE_Message_Queue. @see ACE_Reactor_Notification_Strategy.
00087    */
00088   ACE_Message_Queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
00089                      size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
00090                      ACE_Notification_Strategy *ns = 0);
00091   virtual int open (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
00092                     size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
00093                     ACE_Notification_Strategy *ns = 0);
00094   //@}
00095 
00096   /// Releases all resources from the message queue and marks it deactivated.
00097   /// @sa flush().
00098   ///
00099   /// @retval The number of messages released from the queue; -1 on error.
00100   virtual int close (void);
00101 
00102   /// Releases all resources from the message queue and marks it deactivated.
00103   virtual ~ACE_Message_Queue (void);
00104 
00105   /**
00106    * Releases all resources from the message queue but does not mark it
00107    * deactivated.  This method holds the queue lock during this operation.
00108    * @sa close().
00109    *
00110    * @return The number of messages flushed; -1 on error.
00111    */
00112   virtual int flush (void);
00113 
00114   /**
00115    * Release all resources from the message queue but do not mark it
00116    * as deactivated.
00117    *
00118    * @pre The caller must be holding the queue lock before calling this
00119    * method.
00120    *
00121    * @return The number of messages flushed.
00122    */
00123   virtual int flush_i (void);
00124 
00125   /** @name Enqueue and dequeue methods
00126    *
00127    * The enqueue and dequeue methods accept a timeout value passed as
00128    * an ACE_Time_Value *. In all cases, if the timeout pointer is 0,
00129    * the caller will block until action is possible. If the timeout pointer
00130    * is non-zero, the call will wait (if needed, subject to water mark
00131    * settings) until the absolute time specified in the referenced
00132    * ACE_Time_Value object is reached. If the time is reached before the
00133    * desired action is possible, the method will return -1 with errno set
00134    * to @c EWOULDBLOCK. Regardless of the timeout setting, however,
00135    * these methods will also fail and return -1 when the queue is closed,
00136    * deactivated, pulsed, or when a signal occurs.
00137    *
00138    * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of
00139    * ACE_Message_Queue, enqueueing, dequeueing, and how these operations are
00140    * affected by queue state transitions.
00141    */
00142   //@{
00143   /**
00144    * Retrieve a pointer to the first ACE_Message_Block in the queue
00145    * without removing it.
00146    *
00147    * @note Because the block whose pointer is returned is still on the queue,
00148    *       another thread may dequeue the referenced block at any time,
00149    *       including before the calling thread examines the peeked-at block.
00150    *       Be very careful with this method in multithreaded queueing
00151    *       situations.
00152    *
00153    * @param first_item  Reference to an ACE_Message_Block * that will
00154    *                    point to the first block on the queue.  The block
00155    *                    remains on the queue until this or another thread
00156    *                    dequeues it.
00157    * @param timeout     The absolute time the caller will wait until
00158    *                    for a block to be queued.
00159    *
00160    * @retval >0 The number of ACE_Message_Blocks on the queue.
00161    * @retval -1 On failure.  errno holds the reason. Common errno values are:
00162    *            - EWOULDBLOCK: the timeout elapsed
00163    *            - ESHUTDOWN: the queue was deactivated or pulsed
00164    */
00165   virtual int peek_dequeue_head (ACE_Message_Block *&first_item,
00166                                  ACE_Time_Value *timeout = 0);
00167 
00168   /**
00169    * Enqueue an ACE_Message_Block into the queue in accordance with
00170    * the ACE_Message_Block's priority (0 is lowest priority).  FIFO
00171    * order is maintained when messages of the same priority are
00172    * inserted consecutively.
00173    *
00174    * @param new_item Pointer to an ACE_Message_Block that will be
00175    *                 added to the queue.  The block's @c msg_priority()
00176    *                 method will be called to obtain the queueing priority.
00177    * @param timeout  The absolute time the caller will wait until
00178    *                 for the block to be queued.
00179    *
00180    * @retval >0 The number of ACE_Message_Blocks on the queue after adding
00181    *             the specified block.
00182    * @retval -1 On failure.  errno holds the reason. Common errno values are:
00183    *            - EWOULDBLOCK: the timeout elapsed
00184    *            - ESHUTDOWN: the queue was deactivated or pulsed
00185    */
00186   virtual int enqueue_prio (ACE_Message_Block *new_item,
00187                             ACE_Time_Value *timeout = 0);
00188 
00189   /**
00190    * Enqueue an ACE_Message_Block into the queue in accordance with the
00191    * block's deadline time. FIFO order is maintained when messages of
00192    * the same deadline time are inserted consecutively.
00193    *
00194    * @param new_item Pointer to an ACE_Message_Block that will be
00195    *                 added to the queue.  The block's @c msg_deadline_time()
00196    *                 method will be called to obtain the relative queueing
00197    *                 position.
00198    * @param timeout  The absolute time the caller will wait until
00199    *                 for the block to be queued.
00200    *
00201    * @retval >0 The number of ACE_Message_Blocks on the queue after adding
00202    *             the specified block.
00203    * @retval -1 On failure.  errno holds the reason. Common errno values are:
00204    *            - EWOULDBLOCK: the timeout elapsed
00205    *            - ESHUTDOWN: the queue was deactivated or pulsed
00206    */
00207   virtual int enqueue_deadline (ACE_Message_Block *new_item,
00208                                 ACE_Time_Value *timeout = 0);
00209 
00210   /**
00211    * @deprecated This is an alias for enqueue_prio().  It's only here for
00212    * backwards compatibility and will go away in a subsequent release.
00213    * Please use enqueue_prio() instead.
00214    */
00215   virtual int enqueue (ACE_Message_Block *new_item,
00216                        ACE_Time_Value *timeout = 0);
00217 
00218   /**
00219    * Enqueue one or more ACE_Message_Block objects at the tail of the queue.
00220    * If the @a new_item @c next() pointer is non-zero, it is assumed to be the
00221    * start of a series of ACE_Message_Block objects connected via their
00222    * @c next() pointers. The series of blocks will be added to the queue in
00223    * the same order they are passed in as.
00224    *
00225    * @param new_item Pointer to an ACE_Message_Block that will be
00226    *                 added to the queue. If the block's @c next() pointer
00227    *                 is non-zero, all blocks chained from the @c next()
00228    *                 pointer are enqueued as well.
00229    * @param timeout  The absolute time the caller will wait until
00230    *                 for the block to be queued.
00231    *
00232    * @retval >0 The number of ACE_Message_Blocks on the queue after adding
00233    *             the specified block(s).
00234    * @retval -1 On failure.  errno holds the reason. Common errno values are:
00235    *            - EWOULDBLOCK: the timeout elapsed
00236    *            - ESHUTDOWN: the queue was deactivated or pulsed
00237    */
00238   virtual int enqueue_tail (ACE_Message_Block *new_item,
00239                             ACE_Time_Value *timeout = 0);
00240 
00241   /**
00242    * Enqueue one or more ACE_Message_Block objects at the head of the queue.
00243    * If the @a new_item @c next() pointer is non-zero, it is assumed to be the
00244    * start of a series of ACE_Message_Block objects connected via their
00245    * @c next() pointers. The series of blocks will be added to the queue in
00246    * the same order they are passed in as.
00247    *
00248    * @param new_item Pointer to an ACE_Message_Block that will be
00249    *                 added to the queue. If the block's @c next() pointer
00250    *                 is non-zero, all blocks chained from the @c next()
00251    *                 pointer are enqueued as well.
00252    * @param timeout  The absolute time the caller will wait until
00253    *                 for the block to be queued.
00254    *
00255    * @retval >0 The number of ACE_Message_Blocks on the queue after adding
00256    *             the specified block(s).
00257    * @retval -1 On failure.  errno holds the reason. Common errno values are:
00258    *            - EWOULDBLOCK: the timeout elapsed
00259    *            - ESHUTDOWN: the queue was deactivated or pulsed
00260    */
00261   virtual int enqueue_head (ACE_Message_Block *new_item,
00262                             ACE_Time_Value *timeout = 0);
00263 
00264   /// This method is an alias for the dequeue_head() method.
00265   virtual int dequeue (ACE_Message_Block *&first_item,
00266                        ACE_Time_Value *timeout = 0);
00267 
00268   /**
00269    * Dequeue the ACE_Message_Block at the head of the queue and return
00270    * a pointer to the dequeued block.
00271    *
00272    * @param first_item  Reference to an ACE_Message_Block * that will
00273    *                    be set to the address of the dequeued block.
00274    * @param timeout     The absolute time the caller will wait until
00275    *                    for a block to be dequeued.
00276    *
00277    * @retval >=0 The number of ACE_Message_Blocks remaining in the queue.
00278    * @retval -1 On failure.  errno holds the reason. Common errno values are:
00279    *            - EWOULDBLOCK: the timeout elapsed
00280    *            - ESHUTDOWN: the queue was deactivated or pulsed
00281    */
00282   virtual int dequeue_head (ACE_Message_Block *&first_item,
00283                             ACE_Time_Value *timeout = 0);
00284 
00285   /**
00286    * Dequeue the ACE_Message_Block that has the lowest priority (preserves
00287    * FIFO order for messages with the same priority) and return a pointer
00288    * to the dequeued block.
00289    *
00290    * @param first_item  Reference to an ACE_Message_Block * that will
00291    *                    be set to the address of the dequeued block.
00292    * @param timeout     The absolute time the caller will wait until
00293    *                    for a block to be dequeued.
00294    *
00295    * @retval >=0 The number of ACE_Message_Blocks remaining in the queue.
00296    * @retval -1 On failure.  errno holds the reason. Common errno values are:
00297    *            - EWOULDBLOCK: the timeout elapsed
00298    *            - ESHUTDOWN: the queue was deactivated or pulsed
00299    */
00300   virtual int dequeue_prio (ACE_Message_Block *&first_item,
00301                             ACE_Time_Value *timeout = 0);
00302 
00303   /**
00304    * Dequeue the ACE_Message_Block at the tail of the queue and return
00305    * a pointer to the dequeued block.
00306    *
00307    * @param dequeued  Reference to an ACE_Message_Block * that will
00308    *                  be set to the address of the dequeued block.
00309    * @param timeout   The absolute time the caller will wait until
00310    *                  for a block to be dequeued.
00311    *
00312    * @retval >=0 The number of ACE_Message_Blocks remaining in the queue.
00313    * @retval -1 On failure.  errno holds the reason. Common errno values are:
00314    *            - EWOULDBLOCK: the timeout elapsed
00315    *            - ESHUTDOWN: the queue was deactivated or pulsed
00316    */
00317   virtual int dequeue_tail (ACE_Message_Block *&dequeued,
00318                             ACE_Time_Value *timeout = 0);
00319 
00320   /**
00321    * Dequeue the ACE_Message_Block with the earliest deadline time and return
00322    * a pointer to the dequeued block.
00323    *
00324    * @param dequeued  Reference to an ACE_Message_Block * that will
00325    *                  be set to the address of the dequeued block.
00326    * @param timeout   The absolute time the caller will wait until
00327    *                  for a block to be dequeued.
00328    *
00329    * @retval >=0 The number of ACE_Message_Blocks remaining in the queue.
00330    * @retval -1 On failure.  errno holds the reason. Common errno values are:
00331    *            - EWOULDBLOCK: the timeout elapsed
00332    *            - ESHUTDOWN: the queue was deactivated or pulsed
00333    */
00334   virtual int dequeue_deadline (ACE_Message_Block *&dequeued,
00335                                 ACE_Time_Value *timeout = 0);
00336   //@}
00337 
00338   /** @name Queue statistics methods
00339    */
00340   //@{
00341 
00342   /// True if queue is full, else false.
00343   virtual int is_full (void);
00344   /// True if queue is empty, else false.
00345   virtual int is_empty (void);
00346 
00347   /**
00348    * Number of total bytes on the queue, i.e., sum of the message
00349    * block sizes.
00350    */
00351   virtual size_t message_bytes (void);
00352 
00353   /**
00354    * Number of total length on the queue, i.e., sum of the message
00355    * block lengths.
00356    */
00357   virtual size_t message_length (void);
00358 
00359   /**
00360    * Number of total messages on the queue.
00361    */
00362   virtual size_t message_count (void);
00363 
00364   // = Manual changes to these stats (used when queued message blocks
00365   // change size or lengths).
00366   /**
00367    * New value of the number of total bytes on the queue, i.e., sum of
00368    * the message block sizes.
00369    */
00370   virtual void message_bytes (size_t new_size);
00371   /**
00372    * New value of the number of total length on the queue, i.e., sum
00373    * of the message block lengths.
00374    */
00375   virtual void message_length (size_t new_length);
00376 
00377   //@}
00378 
00379 
00380   /** @name Water mark (flow control) methods
00381    */
00382   //@{
00383 
00384   /**
00385    * Get high watermark.
00386    */
00387   virtual size_t high_water_mark (void);
00388   /**
00389    * Set the high watermark, which determines how many bytes can be
00390    * stored in a queue before it's considered "full."
00391    */
00392   virtual void high_water_mark (size_t hwm);
00393 
00394   /**
00395    * Get low watermark.
00396    */
00397   virtual size_t low_water_mark (void);
00398   /**
00399    * Set the low watermark, which determines how many bytes must be in
00400    * the queue before supplier threads are allowed to enqueue
00401    * additional ACE_Message_Blocks.
00402    */
00403   virtual void low_water_mark (size_t lwm);
00404   //@}
00405 
00406   /** @name Activation and queue state methods
00407    * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of
00408    * queue states and transitions and how the transitions affect message
00409    * enqueueing and dequeueing operations.
00410    */
00411   //@{
00412 
00413   /**
00414    * Deactivate the queue and wakeup all threads waiting on the queue
00415    * so they can continue.  No messages are removed from the queue,
00416    * however.  Any other operations called until the queue is
00417    * activated again will immediately return -1 with @c errno ==
00418    * ESHUTDOWN.  Returns WAS_INACTIVE if queue was inactive before the
00419    * call and WAS_ACTIVE if queue was active before the call.
00420    */
00421   virtual int deactivate (void);
00422 
00423   /**
00424    * Reactivate the queue so that threads can enqueue and dequeue
00425    * messages again.  Returns the state of the queue before the call.
00426    */
00427   virtual int activate (void);
00428 
00429   /**
00430    * Pulse the queue to wake up any waiting threads.  Changes the
00431    * queue state to PULSED; future enqueue/dequeue operations proceed
00432    * as in ACTIVATED state.
00433    *
00434    * @return The queue's state before this call.
00435    */
00436   virtual int pulse (void);
00437 
00438   /// Returns the current state of the queue, which can be one of
00439   /// ACTIVATED, DEACTIVATED, or PULSED.
00440   virtual int state (void);
00441 
00442   /// Returns true if the state of the queue is <DEACTIVATED>,
00443   /// but false if the queue's is <ACTIVATED> or <PULSED>.
00444   virtual int deactivated (void);
00445   //@}
00446 
00447   /** @name Notification strategy methods
00448    */
00449   //@{
00450 
00451   /**
00452    * This hook is automatically invoked by <enqueue_head>,
00453    * <enqueue_tail>, and <enqueue_prio> when a new item is inserted
00454    * into the queue.  Subclasses can override this method to perform
00455    * specific notification strategies (e.g., signaling events for a
00456    * <WFMO_Reactor>, notifying a <Reactor>, etc.).  In a
00457    * multi-threaded application with concurrent consumers, there is no
00458    * guarantee that the queue will be still be non-empty by the time
00459    * the notification occurs.
00460    */
00461   virtual int notify (void);
00462 
00463   /// Get the notification strategy for the <Message_Queue>
00464   virtual ACE_Notification_Strategy *notification_strategy (void);
00465 
00466   /// Set the notification strategy for the <Message_Queue>
00467   virtual void notification_strategy (ACE_Notification_Strategy *s);
00468   //@}
00469 
00470   /// Returns a reference to the lock used by the ACE_Message_Queue.
00471   virtual ACE_SYNCH_MUTEX_T &lock (void);
00472 
00473   /// Dump the state of an object.
00474   virtual void dump (void) const;
00475 
00476   /// Declare the dynamic allocation hooks.
00477   ACE_ALLOC_HOOK_DECLARE;
00478 
00479 protected:
00480   // = Routines that actually do the enqueueing and dequeueing.
00481 
00482   // These routines assume that locks are held by the corresponding
00483   // public methods.  Since they are virtual, you can change the
00484   // queueing mechanism by subclassing from ACE_Message_Queue.
00485 
00486   /// Enqueue an <ACE_Message_Block *> in accordance with its priority.
00487   virtual int enqueue_i (ACE_Message_Block *new_item);
00488 
00489   /// Enqueue an <ACE_Message_Block *> in accordance with its deadline time.
00490   virtual int enqueue_deadline_i (ACE_Message_Block *new_item);
00491 
00492   /// Enqueue an <ACE_Message_Block *> at the end of the queue.
00493   virtual int enqueue_tail_i (ACE_Message_Block *new_item);
00494 
00495   /// Enqueue an <ACE_Message_Block *> at the head of the queue.
00496   virtual int enqueue_head_i (ACE_Message_Block *new_item);
00497 
00498   /// Dequeue and return the <ACE_Message_Block *> at the head of the
00499   /// queue.
00500   virtual int dequeue_head_i (ACE_Message_Block *&first_item);
00501 
00502   /// Dequeue and return the <ACE_Message_Block *> with the lowest
00503   /// priority.
00504   virtual int dequeue_prio_i (ACE_Message_Block *&dequeued);
00505 
00506   /// Dequeue and return the <ACE_Message_Block *> at the tail of the
00507   /// queue.
00508   virtual int dequeue_tail_i (ACE_Message_Block *&first_item);
00509 
00510   /// Dequeue and return the <ACE_Message_Block *> with the lowest
00511   /// deadline time.
00512   virtual int dequeue_deadline_i (ACE_Message_Block *&first_item);
00513 
00514   // = Check the boundary conditions (assumes locks are held).
00515 
00516   /// True if queue is full, else false.
00517   virtual int is_full_i (void);
00518 
00519   /// True if queue is empty, else false.
00520   virtual int is_empty_i (void);
00521 
00522   // = Implementation of the public <activate> and <deactivate> methods.
00523 
00524   // These methods assume locks are held.
00525 
00526   /**
00527    * Notifies all waiting threads that the queue has been deactivated
00528    * so they can wakeup and continue other processing.
00529    * No messages are removed from the queue.
00530    *
00531    * @param pulse  If 0, the queue's state is changed to DEACTIVATED
00532    *               and any other operations called until the queue is
00533    *               reactivated will immediately return -1 with
00534    *               errno == ESHUTDOWN.
00535    *               If not zero, only the waiting threads are notified and
00536    *               the queue's state changes to PULSED.
00537    *
00538    * @return The state of the queue before the call.
00539    */
00540   virtual int deactivate_i (int pulse = 0);
00541 
00542   /// Activate the queue.
00543   virtual int activate_i (void);
00544 
00545   // = Helper methods to factor out common #ifdef code.
00546 
00547   /// Wait for the queue to become non-full.
00548   virtual int wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
00549                                   ACE_Time_Value *timeout);
00550 
00551   /// Wait for the queue to become non-empty.
00552   virtual int wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
00553                                    ACE_Time_Value *timeout);
00554 
00555   /// Inform any threads waiting to enqueue that they can procede.
00556   virtual int signal_enqueue_waiters (void);
00557 
00558   /// Inform any threads waiting to dequeue that they can procede.
00559   virtual int signal_dequeue_waiters (void);
00560 
00561   /// Pointer to head of ACE_Message_Block list.
00562   ACE_Message_Block *head_;
00563 
00564   /// Pointer to tail of ACE_Message_Block list.
00565   ACE_Message_Block *tail_;
00566 
00567   /// Lowest number before unblocking occurs.
00568   size_t low_water_mark_;
00569 
00570   /// Greatest number of bytes before blocking.
00571   size_t high_water_mark_;
00572 
00573   /// Current number of bytes in the queue.
00574   size_t cur_bytes_;
00575 
00576   /// Current length of messages in the queue.
00577   size_t cur_length_;
00578 
00579   /// Current number of messages in the queue.
00580   size_t cur_count_;
00581 
00582   /// The notification strategy used when a new message is enqueued.
00583   ACE_Notification_Strategy *notification_strategy_;
00584 
00585   // = Synchronization primitives for controlling concurrent access.
00586   /// Protect queue from concurrent access.
00587   ACE_SYNCH_MUTEX_T lock_;
00588 
00589   /// Used to make threads sleep until the queue is no longer empty.
00590   ACE_SYNCH_CONDITION_T not_empty_cond_;
00591 
00592   /// Used to make threads sleep until the queue is no longer full.
00593   ACE_SYNCH_CONDITION_T not_full_cond_;
00594 
00595 private:
00596 
00597   // = Disallow these operations.
00598   ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue<ACE_SYNCH_USE> &))
00599   ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue (const ACE_Message_Queue<ACE_SYNCH_USE> &))
00600 };
00601 
00602 // This typedef is used to get around a compiler bug in g++/vxworks.
00603 typedef ACE_Message_Queue<ACE_SYNCH> ACE_DEFAULT_MESSAGE_QUEUE_TYPE;
00604 
00605 
00606 /**
00607  * @class ACE_Message_Queue_Iterator
00608  *
00609  * @brief Iterator for the ACE_Message_Queue.
00610  */
00611 template <ACE_SYNCH_DECL>
00612 class ACE_Message_Queue_Iterator
00613 {
00614 public:
00615   // = Initialization method.
00616   ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue);
00617 
00618   // = Iteration methods.
00619   /// Pass back the <entry> that hasn't been seen in the queue.
00620   /// Returns 0 when all items have been seen, else 1.
00621   int next (ACE_Message_Block *&entry);
00622 
00623   /// Returns 1 when all items have been seen, else 0.
00624   int done (void) const;
00625 
00626   /// Move forward by one element in the queue.  Returns 0 when all the
00627   /// items in the set have been seen, else 1.
00628   int advance (void);
00629 
00630   /// Dump the state of an object.
00631   void dump (void) const;
00632 
00633   /// Declare the dynamic allocation hooks.
00634   ACE_ALLOC_HOOK_DECLARE;
00635 
00636 private:
00637   /// Message_Queue we are iterating over.
00638   ACE_Message_Queue <ACE_SYNCH_USE> &queue_;
00639 
00640   /// Keeps track of how far we've advanced...
00641   ACE_Message_Block *curr_;
00642 };
00643 
00644 /**
00645  * @class ACE_Message_Queue_Reverse_Iterator
00646  *
00647  * @brief Reverse Iterator for the ACE_Message_Queue.
00648  */
00649 template <ACE_SYNCH_DECL>
00650 class ACE_Message_Queue_Reverse_Iterator
00651 {
00652 public:
00653   // = Initialization method.
00654   ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue);
00655 
00656   // = Iteration methods.
00657   /// Pass back the <entry> that hasn't been seen in the queue.
00658   /// Returns 0 when all items have been seen, else 1.
00659   int next (ACE_Message_Block *&entry);
00660 
00661   /// Returns 1 when all items have been seen, else 0.
00662   int done (void) const;
00663 
00664   /// Move forward by one element in the queue.  Returns 0 when all the
00665   /// items in the set have been seen, else 1.
00666   int advance (void);
00667 
00668   /// Dump the state of an object.
00669   void dump (void) const;
00670 
00671   /// Declare the dynamic allocation hooks.
00672   ACE_ALLOC_HOOK_DECLARE;
00673 
00674 private:
00675   /// Message_Queue we are iterating over.
00676   ACE_Message_Queue <ACE_SYNCH_USE> &queue_;
00677 
00678   /// Keeps track of how far we've advanced...
00679   ACE_Message_Block *curr_;
00680 };
00681 
00682 /**
00683  * @class ACE_Dynamic_Message_Queue
00684  *
00685  * @brief A derived class which adapts the ACE_Message_Queue
00686  * class in order to maintain dynamic priorities for enqueued
00687  * <ACE_Message_Blocks> and manage the queue order according
00688  * to these dynamic priorities.
00689  *
00690  * The messages in the queue are managed so as to preserve
00691  * a logical ordering with minimal overhead per enqueue and
00692  * dequeue operation.  For this reason, the actual order of
00693  * messages in the linked list of the queue may differ from
00694  * their priority order.  As time passes, a message may change
00695  * from pending status to late status, and eventually to beyond
00696  * late status.  To minimize reordering overhead under this
00697  * design force, three separate boundaries are maintained
00698  * within the linked list of messages.  Messages are dequeued
00699  * preferentially from the head of the pending portion, then
00700  * the head of the late portion, and finally from the head
00701  * of the beyond late portion.  In this way, only the boundaries
00702  * need to be maintained (which can be done efficiently, as
00703  * aging messages maintain the same linked list order as they
00704  * progress from one status to the next), with no reordering
00705  * of the messages themselves, while providing correct priority
00706  * ordered dequeueing semantics.
00707  * Head and tail enqueue methods inherited from ACE_Message_Queue
00708  * are made private to prevent out-of-order messages from confusing
00709  * management of the various portions of the queue.  Messages in
00710  * the pending portion of the queue whose priority becomes late
00711  * (according to the specific dynamic strategy) advance into
00712  * the late portion of the queue.  Messages in the late portion
00713  * of the queue whose priority becomes later than can be represented
00714  * advance to the beyond_late portion of the queue.  These behaviors
00715  * support a limited schedule overrun, with pending messages prioritized
00716  * ahead of late messages, and late messages ahead of beyond late
00717  * messages.  These behaviors can be modified in derived classes by
00718  * providing alternative definitions for the appropriate virtual methods.
00719  * When filled with messages, the queue's linked list should look like:
00720  * H                                           T
00721  * |                                           |
00722  * B - B - B - B - L - L - L - P - P - P - P - P
00723  * |           |   |       |   |               |
00724  * BH          BT   LH     LT   PH             PT
00725  * Where the symbols are as follows:
00726  * H  = Head of the entire list
00727  * T  = Tail of the entire list
00728  * B  = Beyond late message
00729  * BH = Beyond late messages Head
00730  * BT = Beyond late messages Tail
00731  * L  = Late message
00732  * LH = Late messages Head
00733  * LT = Late messages Tail
00734  * P  = Pending message
00735  * PH = Pending messages Head
00736  * PT = Pending messages Tail
00737  * Caveat: the virtual methods enqueue_tail, enqueue_head,
00738  * and peek_dequeue_head have semantics for the static
00739  * message queues that cannot be guaranteed for dynamic
00740  * message queues.  The peek_dequeue_head method just
00741  * calls the base class method, while the two enqueue
00742  * methods call the priority enqueue method.  The
00743  * order of messages in the dynamic queue is a function
00744  * of message deadlines and how long they are in the
00745  * queues.  You can manipulate these in some cases to
00746  * ensure the correct semantics, but that is not a
00747  * very stable or portable approach (discouraged).
00748  */
00749 template <ACE_SYNCH_DECL>
00750 class ACE_Dynamic_Message_Queue : public ACE_Message_Queue<ACE_SYNCH_USE>
00751 {
00752 public:
00753   // = Initialization and termination methods.
00754   ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy,
00755                              size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
00756                              size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
00757                              ACE_Notification_Strategy * = 0);
00758 
00759   /// Close down the message queue and release all resources.
00760   virtual ~ACE_Dynamic_Message_Queue (void);
00761 
00762   /**
00763    * Detach all messages with status given in the passed flags from
00764    * the queue and return them by setting passed head and tail pointers
00765    * to the linked list they comprise.  This method is intended primarily
00766    * as a means of periodically harvesting messages that have missed
00767    * their deadlines, but is available in its most general form.  All
00768    * messages are returned in priority order, from head to tail, as of
00769    * the time this method was called.
00770    */
00771   virtual int remove_messages (ACE_Message_Block *&list_head,
00772                                ACE_Message_Block *&list_tail,
00773                                u_int status_flags);
00774 
00775   /**
00776    * Dequeue and return the <ACE_Message_Block *> at the head of the
00777    * queue.  Returns -1 on failure, else the number of items still on
00778    * the queue.
00779    */
00780   virtual int dequeue_head (ACE_Message_Block *&first_item,
00781                             ACE_Time_Value *timeout = 0);
00782 
00783   /// Dump the state of the queue.
00784   virtual void dump (void) const;
00785 
00786   /**
00787    * Just call priority enqueue method: tail enqueue semantics for dynamic
00788    * message queues are unstable: the message may or may not be where
00789    * it was placed after the queue is refreshed prior to the next
00790    * enqueue or dequeue operation.
00791    */
00792   virtual int enqueue_tail (ACE_Message_Block *new_item,
00793                             ACE_Time_Value *timeout = 0);
00794 
00795   /**
00796    * Just call priority enqueue method: head enqueue semantics for dynamic
00797    * message queues are unstable: the message may or may not be where
00798    * it was placed after the queue is refreshed prior to the next
00799    * enqueue or dequeue operation.
00800    */
00801   virtual int enqueue_head (ACE_Message_Block *new_item,
00802                             ACE_Time_Value *timeout = 0);
00803 
00804 
00805   /// Declare the dynamic allocation hooks.
00806   ACE_ALLOC_HOOK_DECLARE;
00807 
00808 protected:
00809 
00810   /**
00811    * Enqueue an <ACE_Message_Block *> in accordance with its priority.
00812    * priority may be *dynamic* or *static* or a combination or *both*
00813    * It calls the priority evaluation function passed into the Dynamic
00814    * Message Queue constructor to update the priorities of all
00815    * enqueued messages.
00816    */
00817   virtual int enqueue_i (ACE_Message_Block *new_item);
00818 
00819   /// Enqueue a message in priority order within a given priority status sublist
00820   virtual int sublist_enqueue_i (ACE_Message_Block *new_item,
00821                                  const ACE_Time_Value &current_time,
00822                                  ACE_Message_Block *&sublist_head,
00823                                  ACE_Message_Block *&sublist_tail,
00824                                  ACE_Dynamic_Message_Strategy::Priority_Status status);
00825 
00826   /**
00827    * Dequeue and return the <ACE_Message_Block *> at the head of the
00828    * logical queue.  Attempts first to dequeue from the pending
00829    * portion of the queue, or if that is empty from the late portion,
00830    * or if that is empty from the beyond late portion, or if that is
00831    * empty just sets the passed pointer to zero and returns -1.
00832    */
00833   virtual int dequeue_head_i (ACE_Message_Block *&first_item);
00834 
00835   /// Refresh the queue using the strategy
00836   /// specific priority status function.
00837   virtual int refresh_queue (const ACE_Time_Value & current_time);
00838 
00839   /// Refresh the pending queue using the strategy
00840   /// specific priority status function.
00841   virtual int refresh_pending_queue (const ACE_Time_Value & current_time);
00842 
00843   /// Refresh the late queue using the strategy
00844   /// specific priority status function.
00845   virtual int refresh_late_queue (const ACE_Time_Value & current_time);
00846 
00847   /// Pointer to head of the pending messages
00848   ACE_Message_Block *pending_head_;
00849 
00850   /// Pointer to tail of the pending messages
00851   ACE_Message_Block *pending_tail_;
00852 
00853   /// Pointer to head of the late messages
00854   ACE_Message_Block *late_head_;
00855 
00856   /// Pointer to tail of the late messages
00857   ACE_Message_Block *late_tail_;
00858 
00859   /// Pointer to head of the beyond late messages
00860   ACE_Message_Block *beyond_late_head_;
00861 
00862   /// Pointer to tail of the beyond late messages
00863   ACE_Message_Block *beyond_late_tail_;
00864 
00865   /// Pointer to a dynamic priority evaluation function.
00866   ACE_Dynamic_Message_Strategy &message_strategy_;
00867 
00868 private:
00869   // = Disallow public access to these operations.
00870 
00871   ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &))
00872   ACE_UNIMPLEMENTED_FUNC (ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &))
00873 
00874   // provide definitions for these (just call base class method),
00875   // but make them private so they're not accessible outside the class
00876 
00877   /// Private method to hide public base class method: just calls base class method
00878   virtual int peek_dequeue_head (ACE_Message_Block *&first_item,
00879                                  ACE_Time_Value *timeout = 0);
00880 
00881 };
00882 
00883 /**
00884  * @class ACE_Message_Queue_Factory
00885  *
00886  * @brief ACE_Message_Queue_Factory is a static factory class template which
00887  * provides a separate factory method for each of the major kinds of
00888  * priority based message dispatching: static, earliest deadline first
00889  * (EDF), and minimum laxity first (MLF).
00890  *
00891  * The ACE_Dynamic_Message_Queue class assumes responsibility for
00892  * releasing the resources of the strategy with which it was
00893  * constructed: the user of a message queue constructed by
00894  * any of these factory methods is only responsible for
00895  * ensuring destruction of the message queue itself.
00896  */
00897 template <ACE_SYNCH_DECL>
00898 class ACE_Message_Queue_Factory
00899 {
00900 public:
00901   /// Factory method for a statically prioritized ACE_Message_Queue
00902   static ACE_Message_Queue<ACE_SYNCH_USE> *
00903     create_static_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
00904                                  size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
00905                                  ACE_Notification_Strategy * = 0);
00906 
00907   /// Factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue
00908   static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
00909     create_deadline_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
00910                                    size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
00911                                    ACE_Notification_Strategy * = 0,
00912                                    u_long static_bit_field_mask = 0x3FFUL,        // 2^(10) - 1
00913                                    u_long static_bit_field_shift = 10,            // 10 low order bits
00914                                    u_long dynamic_priority_max = 0x3FFFFFUL,      // 2^(22)-1
00915                                    u_long dynamic_priority_offset =  0x200000UL); // 2^(22-1)
00916 
00917   /// Factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue
00918   static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
00919     create_laxity_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
00920                                  size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
00921                                  ACE_Notification_Strategy * = 0,
00922                                  u_long static_bit_field_mask = 0x3FFUL,        // 2^(10) - 1
00923                                  u_long static_bit_field_shift = 10,            // 10 low order bits
00924                                  u_long dynamic_priority_max = 0x3FFFFFUL,      // 2^(22)-1
00925                                  u_long dynamic_priority_offset =  0x200000UL); // 2^(22-1)
00926 
00927 
00928 #if defined (ACE_VXWORKS)
00929 
00930   /// Factory method for a wrapped VxWorks message queue
00931   static ACE_Message_Queue_Vx *
00932     create_Vx_message_queue (size_t max_messages, size_t max_message_length,
00933                              ACE_Notification_Strategy *ns = 0);
00934 
00935 #endif /* defined (ACE_VXWORKS) */
00936 
00937 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
00938 
00939   /// Factory method for a NT message queue.
00940   static ACE_Message_Queue_NT *
00941   create_NT_message_queue (size_t max_threads);
00942 
00943 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
00944 };
00945 
00946 /**
00947  * @class ACE_Message_Queue_Ex
00948  *
00949  * @brief A threaded message queueing facility, modeled after the
00950  *        queueing facilities in System V STREAMs.
00951  *
00952  * ACE_Message_Queue_Ex is a strongly-typed version of the
00953  * ACE_Message_Queue class. Rather than queueing in terms of ACE_Message_Block
00954  * objects, ACE_Message_Queue_Ex has a template argument to specify the
00955  * type of objects that are queued.
00956  *
00957  * The second template argument parameterizes the queue's synchronization.
00958  * The argument specifies a synchronization strategy. The two main
00959  * strategies available for ACE_SYNCH_DECL are:
00960  *   -# ACE_MT_SYNCH: all operations are thread-safe
00961  *   -# ACE_NULL_SYNCH: no synchronization and no locking overhead
00962  */
00963 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00964 class ACE_Message_Queue_Ex
00965 {
00966 public:
00967 
00968   enum
00969   {
00970     /// Default priority value. This is the lowest priority.
00971     DEFAULT_PRIORITY = 0
00972   };
00973 
00974 #if 0
00975   //  @@ Iterators are not implemented yet...
00976 
00977   friend class ACE_Message_Queue_Iterator<ACE_SYNCH_USE>;
00978   friend class ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>;
00979 
00980   // = Traits
00981   typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE>
00982           ITERATOR;
00983   typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>
00984           REVERSE_ITERATOR;
00985 #endif /* 0 */
00986 
00987   /**
00988    * @name Initialization methods
00989    */
00990   //@{
00991   /**
00992    * Initialize an ACE_Message_Queue_Ex.
00993    *
00994    * @param high_water_mark High water mark. Determines how many bytes can be
00995    *        stored in a queue before it's considered full.  Supplier threads
00996    *        must block until the queue is no longer full.
00997    * @param low_water_mark Low water mark. Determines how many bytes must be in
00998    *        the queue before supplier threads are allowed to enqueue additional
00999    *        data.  By default, the @a hwm equals @a lwm, which means
01000    *        that suppliers will be able to enqueue new messages as soon as
01001    *        a consumer removes any message from the queue.  Making the low
01002    *        water mark smaller than the high water mark forces consumers to
01003    *        drain more messages from the queue before suppliers can enqueue
01004    *        new messages, which can minimize the "silly window syndrome."
01005    * @param ns Notification strategy. Pointer to an object conforming to the
01006    *        ACE_Notification_Strategy interface. If set, the object's
01007    *        notify(void) method will be called each time data is added to
01008    *        this ACE_Message_Queue. @see ACE_Reactor_Notification_Strategy.
01009    */
01010   ACE_Message_Queue_Ex (size_t high_water_mark = ACE_Message_Queue_Base::DEFAULT_HWM,
01011                         size_t low_water_mark = ACE_Message_Queue_Base::DEFAULT_LWM,
01012                         ACE_Notification_Strategy * ns = 0);
01013   virtual int open (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
01014                     size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
01015                     ACE_Notification_Strategy * = 0);
01016   //@}
01017 
01018   /// Releases all resources from the message queue and marks it deactivated.
01019   /// @sa flush().
01020   ///
01021   /// @retval The number of messages released from the queue; -1 on error.
01022   virtual int close (void);
01023 
01024   /// Releases all resources from the message queue and marks it deactivated.
01025   virtual ~ACE_Message_Queue_Ex (void);
01026 
01027   /**
01028    * Releases all resources from the message queue but does not mark it
01029    * deactivated.  This method holds the queue lock during this operation.
01030    * @sa close().
01031    *
01032    * @return The number of messages flushed; -1 on error.
01033    */
01034   virtual int flush (void);
01035 
01036   /**
01037    * Release all resources from the message queue but do not mark it
01038    * as deactivated.
01039    *
01040    * @pre The caller must be holding the queue lock before calling this
01041    * method.
01042    *
01043    * @return The number of messages flushed.
01044    */
01045   virtual int flush_i (void);
01046 
01047   /** @name Enqueue and dequeue methods
01048    *
01049    * The enqueue and dequeue methods accept a timeout value passed as
01050    * an ACE_Time_Value *. In all cases, if the timeout pointer is 0,
01051    * the caller will block until action is possible. If the timeout pointer
01052    * is non-zero, the call will wait (if needed, subject to water mark
01053    * settings) until the absolute time specified in the referenced
01054    * ACE_Time_Value object is reached. If the time is reached before the
01055    * desired action is possible, the method will return -1 with errno set
01056    * to @c EWOULDBLOCK. Regardless of the timeout setting, however,
01057    * these methods will also fail and return -1 when the queue is closed,
01058    * deactivated, pulsed, or when a signal occurs.
01059    *
01060    * The time parameters are handled the same as in ACE_Message_Queue, so
01061    * you can see C++NPv2 Section 6.2 and APG Section 12.3 for a fuller
01062    * treatment of ACE_Message_Queue, enqueueing, dequeueing, and how these
01063    * operations are affected by queue state transitions.
01064    */
01065   //@{
01066   /**
01067    * Retrieve a pointer to the first item in the queue without removing it.
01068    *
01069    * @note Because the item whose pointer is returned is still on the queue,
01070    *       another thread may dequeue that item at any time,
01071    *       including before the calling thread examines the peeked-at item.
01072    *       Be very careful with this method in multithreaded queueing
01073    *       situations.
01074    *
01075    * @param first_item  Reference to an ACE_MESSAGE_TYPE * that will
01076    *                    point to the first item on the queue.  The item
01077    *                    remains on the queue until this or another thread
01078    *                    dequeues it.
01079    * @param timeout     The absolute time the caller will wait until
01080    *                    for an item to be queued.
01081    *
01082    * @retval >0 The number of items on the queue.
01083    * @retval -1 On failure.  errno holds the reason. Common errno values are:
01084    *            - EWOULDBLOCK: the timeout elapsed
01085    *            - ESHUTDOWN: the queue was deactivated or pulsed
01086    */
01087   virtual int peek_dequeue_head (ACE_MESSAGE_TYPE *&first_item,
01088                                  ACE_Time_Value *timeout = 0);
01089 
01090   /**
01091    * Enqueue an ACE_MESSAGE TYPE into the queue in accordance with
01092    * the specified priority (0 is lowest priority).  FIFO
01093    * order is maintained when items of the same priority are
01094    * inserted consecutively.
01095    *
01096    * @param new_item Pointer to an item that will be added to the queue.
01097    * @param timeout  The absolute time the caller will wait until
01098    *                 for the block to be queued.
01099    * @param priority The priority to use when enqueueing the item.
01100    *
01101    * @retval >0 The number of items on the queue after adding
01102    *             the specified item.
01103    * @retval -1 On failure.  errno holds the reason. Common errno values are:
01104    *            - EWOULDBLOCK: the timeout elapsed
01105    *            - ESHUTDOWN: the queue was deactivated or pulsed
01106    */
01107   virtual int enqueue_prio (ACE_MESSAGE_TYPE *new_item,
01108                             ACE_Time_Value *timeout = 0,
01109                             unsigned long priority = DEFAULT_PRIORITY);
01110 
01111   /**
01112    * This method acts just like enqueue_tail(). There's no deadline
01113    * time associated with items.
01114    */
01115   virtual int enqueue_deadline (ACE_MESSAGE_TYPE *new_item,
01116                                 ACE_Time_Value *timeout = 0);
01117 
01118   /**
01119    * @deprecated This is an alias for enqueue_prio().  It's only here for
01120    * backwards compatibility and will go away in a subsequent release.
01121    * Please use enqueue_prio() instead.
01122    */
01123   virtual int enqueue (ACE_MESSAGE_TYPE *new_item,
01124                        ACE_Time_Value *timeout = 0);
01125 
01126   /**
01127    * Enqueue an item at the tail of the queue.
01128    *
01129    * @param new_item Pointer to an item that will be added to the queue.
01130    * @param timeout  The absolute time the caller will wait until
01131    *                 for the item to be queued.
01132    *
01133    * @retval >0 The number of items on the queue after adding
01134    *             the specified item.
01135    * @retval -1 On failure.  errno holds the reason. Common errno values are:
01136    *            - EWOULDBLOCK: the timeout elapsed
01137    *            - ESHUTDOWN: the queue was deactivated or pulsed
01138    */
01139   virtual int enqueue_tail (ACE_MESSAGE_TYPE *new_item,
01140                             ACE_Time_Value *timeout = 0);
01141 
01142   /**
01143    * Enqueue an item at the head of the queue.
01144    *
01145    * @param new_item Pointer to an item that will be added to the queue.
01146    * @param timeout  The absolute time the caller will wait until
01147    *                 for the item to be queued.
01148    *
01149    * @retval >0 The number of items on the queue after adding
01150    *             the specified item.
01151    * @retval -1 On failure.  errno holds the reason. Common errno values are:
01152    *            - EWOULDBLOCK: the timeout elapsed
01153    *            - ESHUTDOWN: the queue was deactivated or pulsed
01154    */
01155   virtual int enqueue_head (ACE_MESSAGE_TYPE *new_item,
01156                             ACE_Time_Value *timeout = 0);
01157 
01158   /// This method is an alias for the following <dequeue_head> method.
01159   virtual int dequeue (ACE_MESSAGE_TYPE *&first_item,
01160                        ACE_Time_Value *timeout = 0);
01161 
01162   /**
01163    * Dequeue the item at the head of the queue and return a pointer to it.
01164    *
01165    * @param first_item  Reference to an ACE_MESSAGE_TYPE * that will
01166    *                    be set to the address of the dequeued item.
01167    * @param timeout     The absolute time the caller will wait until
01168    *                    for an item to be dequeued.
01169    *
01170    * @retval >=0 The number of items remaining in the queue.
01171    * @retval -1 On failure.  errno holds the reason. Common errno values are:
01172    *            - EWOULDBLOCK: the timeout elapsed
01173    *            - ESHUTDOWN: the queue was deactivated or pulsed
01174    */
01175   virtual int dequeue_head (ACE_MESSAGE_TYPE *&first_item,
01176                             ACE_Time_Value *timeout = 0);
01177 
01178   /**
01179    * Dequeue the item that has the lowest priority (preserves
01180    * FIFO order for items with the same priority) and return a pointer
01181    * to it.
01182    *
01183    * @param dequeued  Reference to an ACE_MESSAGE_TYPE * that will
01184    *                  be set to the address of the dequeued item.
01185    * @param timeout     The absolute time the caller will wait until
01186    *                    for an item to be dequeued.
01187    *
01188    * @retval >=0 The number of items remaining in the queue.
01189    * @retval -1 On failure.  errno holds the reason. Common errno values are:
01190    *            - EWOULDBLOCK: the timeout elapsed
01191    *            - ESHUTDOWN: the queue was deactivated or pulsed
01192    */
01193   virtual int dequeue_prio (ACE_MESSAGE_TYPE *&dequeued,
01194                             ACE_Time_Value *timeout = 0);
01195 
01196   /**
01197    * Dequeue the item at the tail of the queue and return a pointer to it.
01198    *
01199    * @param dequeued  Reference to an ACE_MESSAGE_TYPE * that will
01200    *                  be set to the address of the dequeued item.
01201    * @param timeout   The absolute time the caller will wait until
01202    *                  for an item to be dequeued.
01203    *
01204    * @retval >=0 The number of items remaining in the queue.
01205    * @retval -1 On failure.  errno holds the reason. Common errno values are:
01206    *            - EWOULDBLOCK: the timeout elapsed
01207    *            - ESHUTDOWN: the queue was deactivated or pulsed
01208    */
01209   virtual int dequeue_tail (ACE_MESSAGE_TYPE *&dequeued,
01210                             ACE_Time_Value *timeout = 0);
01211 
01212   /**
01213    * Because there's deadline associated with enqueue_deadline(), this
01214    * method will behave just as dequeue_head().
01215    */
01216   virtual int dequeue_deadline (ACE_MESSAGE_TYPE *&dequeued,
01217                                 ACE_Time_Value *timeout = 0);
01218   //@}
01219 
01220   /** @name Queue statistics methods
01221    */
01222   //@{
01223 
01224   /// True if queue is full, else false.
01225   virtual int is_full (void);
01226   /// True if queue is empty, else false.
01227   virtual int is_empty (void);
01228 
01229   /**
01230    * Number of total bytes on the queue, i.e., sum of the message
01231    * block sizes.
01232    */
01233   virtual size_t message_bytes (void);
01234   /**
01235    * Number of total length on the queue, i.e., sum of the message
01236    * block lengths.
01237    */
01238   virtual size_t message_length (void);
01239   /**
01240    * Number of total messages on the queue.
01241    */
01242   virtual size_t message_count (void);
01243 
01244   // = Manual changes to these stats (used when queued message blocks
01245   // change size or lengths).
01246   /**
01247    * New value of the number of total bytes on the queue, i.e., sum of
01248    * the message block sizes.
01249    */
01250   virtual void message_bytes (size_t new_size);
01251   /**
01252    * New value of the number of total length on the queue, i.e., sum
01253    * of the message block lengths.
01254    */
01255   virtual void message_length (size_t new_length);
01256 
01257   //@}
01258 
01259   /** @name Water mark (flow control) methods
01260    */
01261   //@{
01262 
01263   /**
01264    * Get high watermark.
01265    */
01266   virtual size_t high_water_mark (void);
01267   /**
01268    * Set the high watermark, which determines how many bytes can be
01269    * stored in a queue before it's considered "full."
01270    */
01271   virtual void high_water_mark (size_t hwm);
01272 
01273   /**
01274    * Get low watermark.
01275    */
01276   virtual size_t low_water_mark (void);
01277   /**
01278    * Set the low watermark, which determines how many bytes must be in
01279    * the queue before supplier threads are allowed to enqueue
01280    * additional <ACE_MESSAGE_TYPE>s.
01281    */
01282   virtual void low_water_mark (size_t lwm);
01283   //@}
01284 
01285   /** @name Activation and queue state methods
01286    * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of
01287    * queue states and transitions and how the transitions affect message
01288    * enqueueing and dequeueing operations.
01289    */
01290   //@{
01291 
01292   /**
01293    * Deactivate the queue and wakeup all threads waiting on the queue
01294    * so they can continue.  No messages are removed from the queue,
01295    * however.  Any other operations called until the queue is
01296    * activated again will immediately return -1 with @c errno ==
01297    * ESHUTDOWN.  Returns WAS_INACTIVE if queue was inactive before the
01298    * call and WAS_ACTIVE if queue was active before the call.
01299    */
01300   virtual int deactivate (void);
01301 
01302   /**
01303    * Reactivate the queue so that threads can enqueue and dequeue
01304    * messages again.  Returns the state of the queue before the call.
01305    */
01306   virtual int activate (void);
01307 
01308   /**
01309    * Pulse the queue to wake up any waiting threads.  Changes the
01310    * queue state to PULSED; future enqueue/dequeue operations proceed
01311    * as in ACTIVATED state.
01312    *
01313    * @retval  The queue's state before this call.
01314    */
01315   virtual int pulse (void);
01316 
01317   /// Returns the current state of the queue, which can be one of
01318   /// ACTIVATED, DEACTIVATED, or PULSED.
01319   virtual int state (void);
01320 
01321   /// Returns true if the state of the queue is DEACTIVATED,
01322   /// but false if the queue's state is ACTIVATED or PULSED.
01323   virtual int deactivated (void);
01324   //@}
01325 
01326   /** @name Notification strategy methods
01327    */
01328   //@{
01329 
01330   /**
01331    * This hook is automatically invoked by <enqueue_head>,
01332    * <enqueue_tail>, and <enqueue_prio> when a new item is inserted
01333    * into the queue.  Subclasses can override this method to perform
01334    * specific notification strategies (e.g., signaling events for a
01335    * <WFMO_Reactor>, notifying a <Reactor>, etc.).  In a
01336    * multi-threaded application with concurrent consumers, there is no
01337    * guarantee that the queue will be still be non-empty by the time
01338    * the notification occurs.
01339    */
01340   virtual int notify (void);
01341 
01342   /// Get the notification strategy for the <Message_Queue>
01343   virtual ACE_Notification_Strategy *notification_strategy (void);
01344 
01345   /// Set the notification strategy for the <Message_Queue>
01346   virtual void notification_strategy (ACE_Notification_Strategy *s);
01347   //@}
01348 
01349   /// Returns a reference to the lock used by the ACE_Message_Queue_Ex.
01350   virtual ACE_SYNCH_MUTEX_T &lock (void);
01351 
01352   /// Dump the state of an object.
01353   virtual void dump (void) const;
01354 
01355   /// Declare the dynamic allocation hooks.
01356   ACE_ALLOC_HOOK_DECLARE;
01357 
01358 protected:
01359   /// Implement this via an ACE_Message_Queue.
01360   ACE_Message_Queue<ACE_SYNCH_USE> queue_;
01361 };
01362 
01363 /**
01364  * @class ACE_Message_Queue_Ex_N
01365  *
01366  * @brief A threaded message queueing facility, modeled after the
01367  *        queueing facilities in System V STREAMs which can enqueue
01368  *        multiple messages in one call.
01369  *
01370  * As ACE_Message_Queue_Ex, ACE_Message_Queue_Ex_N is a strongly-typed
01371  * version of the ACE_Message_Queue. If @c ACE_SYNCH_DECL is @c ACE_MT_SYNCH
01372  * then all operations are thread-safe. Otherwise, if it's @c ACE_NULL_SYNCH
01373  * then there's no locking overhead.
01374  *
01375  * The @c ACE_MESSAGE_TYPE messages that are sent to this
01376  * queue can be chained. Messages are expected to have a
01377  * @c next method that returns the next message in the chain;
01378  * ACE_Message_Queue_Ex_N uses this method to run through
01379  * all the incoming messages and enqueue them in one call.
01380  */
01381 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
01382 class ACE_Message_Queue_Ex_N : public ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>
01383 {
01384 public:
01385   // = Initialization and termination methods.
01386 
01387   /**
01388    * Initialize an ACE_Message_Queue_Ex_N.  The @a high_water_mark
01389    * determines how many bytes can be stored in a queue before it's
01390    * considered "full."  Supplier threads must block until the queue
01391    * is no longer full.  The @a low_water_mark determines how many
01392    * bytes must be in the queue before supplier threads are allowed to
01393    * enqueue additional messages.  By default, the @a high_water_mark
01394    * equals the @a low_water_mark, which means that suppliers will be
01395    * able to enqueue new messages as soon as a consumer removes any message
01396    * from the queue.  Making the @a low_water_mark smaller than the
01397    * @a high_water_mark forces consumers to drain more messages from the
01398    * queue before suppliers can enqueue new messages, which can minimize
01399    * the "silly window syndrome."
01400    */
01401   ACE_Message_Queue_Ex_N (size_t high_water_mark = ACE_Message_Queue_Base::DEFAULT_HWM,
01402                           size_t low_water_mark = ACE_Message_Queue_Base::DEFAULT_LWM,
01403                           ACE_Notification_Strategy * ns = 0);
01404 
01405   /// Close down the message queue and release all resources.
01406   virtual ~ACE_Message_Queue_Ex_N (void);
01407 
01408   /**
01409    * Enqueue one or more @c ACE_MESSAGE_TYPE objects at the head of the queue.
01410    * If the @a new_item @c next() pointer is non-zero, it is assumed to be the
01411    * start of a series of @c ACE_MESSAGE_TYPE objects connected via their
01412    * @c next() pointers. The series of blocks will be added to the queue in
01413    * the same order they are passed in as.
01414    *
01415    * @param new_item Pointer to an @c ACE_MESSAGE_TYPE that will be
01416    *                 added to the queue. If the block's @c next() pointer
01417    *                 is non-zero, all blocks chained from the @c next()
01418    *                 pointer are enqueued as well.
01419    * @param tv       The absolute time the caller will wait until
01420    *                 for the block to be queued.
01421    *
01422    * @retval >0 The number of @c ACE_MESSAGE_TYPE objects on the queue after
01423    *             adding the specified block(s).
01424    * @retval -1 On failure.  errno holds the reason. Common errno values are:
01425    *            - EWOULDBLOCK: the timeout elapsed
01426    *            - ESHUTDOWN: the queue was deactivated or pulsed
01427    */
01428   virtual int enqueue_head (ACE_MESSAGE_TYPE *new_item, ACE_Time_Value *tv = 0);
01429 
01430   /**
01431    * Enqueue one or more @c ACE_MESSAGE_TYPE objects at the tail of the queue.
01432    * If the @a new_item @c next() pointer is non-zero, it is assumed to be the
01433    * start of a series of @c ACE_MESSAGE_TYPE objects connected via their
01434    * @c next() pointers. The series of blocks will be added to the queue in
01435    * the same order they are passed in as.
01436    *
01437    * @param new_item Pointer to an @c ACE_MESSAGE_TYPE that will be
01438    *                 added to the queue. If the block's @c next() pointer
01439    *                 is non-zero, all blocks chained from the @c next()
01440    *                 pointer are enqueued as well.
01441    * @param tv       The absolute time the caller will wait until
01442    *                 for the block to be queued.
01443    *
01444    * @retval >0 The number of @c ACE_MESSAGE_TYPE objects on the queue after
01445    *             adding the specified block(s).
01446    * @retval -1 On failure.  errno holds the reason. Common errno values are:
01447    *            - EWOULDBLOCK: the timeout elapsed
01448    *            - ESHUTDOWN: the queue was deactivated or pulsed
01449    */
01450   virtual int enqueue_tail (ACE_MESSAGE_TYPE *new_item, ACE_Time_Value *tv = 0);
01451 
01452   /// Declare the dynamic allocation hooks.
01453   ACE_ALLOC_HOOK_DECLARE;
01454 
01455 protected:
01456   /**
01457    * An helper method that wraps the incoming chain messages
01458    * with ACE_Message_Blocks.
01459    */
01460   ACE_Message_Block *wrap_with_mbs_i (ACE_MESSAGE_TYPE *new_item);
01461 };
01462 
01463 ACE_END_VERSIONED_NAMESPACE_DECL
01464 
01465 #if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
01466 #include "ace/Message_Queue_T.cpp"
01467 #endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
01468 
01469 #if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
01470 #pragma implementation ("Message_Queue_T.cpp")
01471 #endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
01472 
01473 #include /**/ "ace/post.h"
01474 #endif /* ACE_MESSAGE_QUEUE_T_H */

Generated on Sun Jan 27 12:05:31 2008 for ACE by doxygen 1.3.6