Message_Queue_T.h

Go to the documentation of this file.
00001 /* -*- C++ -*- */
00002 
00003 //=============================================================================
00004 /**
00005  *  @file    Message_Queue_T.h
00006  *
00007  *  $Id: Message_Queue_T.h 81691 2008-05-14 11:09:21Z johnnyw $
00008  *
00009  *  @author Douglas C. Schmidt <schmidt@cs.wustl.edu>
00010  */
00011 //=============================================================================
00012 
00013 #ifndef ACE_MESSAGE_QUEUE_T_H
00014 #define ACE_MESSAGE_QUEUE_T_H
00015 
00016 #include /**/ "ace/pre.h"
00017 
00018 #include "ace/Message_Queue.h"
00019 #include "ace/Dynamic_Message_Strategy.h"
00020 #include "ace/Synch_Traits.h"
00021 #include "ace/Guard_T.h"
00022 
00023 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00024 # pragma once
00025 #endif /* ACE_LACKS_PRAGMA_ONCE */
00026 
00027 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00028 
00029 #if defined (ACE_VXWORKS)
00030 class ACE_Message_Queue_Vx;
00031 #endif /* defined (ACE_VXWORKS) */
00032 
00033 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
00034 class ACE_Message_Queue_NT;
00035 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO*/
00036 
00037 #if defined (ACE_HAS_MONITOR_POINTS) && ACE_HAS_MONITOR_POINTS == 1
00038 namespace ACE
00039 {
00040   namespace Monitor_Control
00041   {
00042     class Size_Monitor;
00043   }
00044 }
00045 #endif /* ACE_HAS_MONITOR_POINTS==1 */
00046 
00047 /**
00048  * @class ACE_Message_Queue
00049  *
00050  * @brief A message queueing facility with parameterized synchronization
00051  * capability. ACE_Message_Queue is modeled after the queueing facilities
00052  * in System V STREAMs.
00053  *
00054  * ACE_Message_Queue is the primary queueing facility for
00055  * messages in the ACE framework.  It's one template argument parameterizes
00056  * the queue's synchronization. The argument specifies a synchronization
00057  * strategy. The two main strategies available for ACE_SYNCH_DECL are:
00058  *   -# ACE_MT_SYNCH: all operations are thread-safe
00059  *   -# ACE_NULL_SYNCH: no synchronization and no locking overhead
00060  *
00061  * All data passing through ACE_Message_Queue is in the form of
00062  * ACE_Message_Block objects. @sa ACE_Message_Block.
00063  */
00064 template <ACE_SYNCH_DECL>
00065 class ACE_Message_Queue : public ACE_Message_Queue_Base
00066 {
00067 public:
00068   friend class ACE_Message_Queue_Iterator<ACE_SYNCH_USE>;
00069   friend class ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>;
00070 
00071   // = Traits
00072   typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE>
00073           ITERATOR;
00074   typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>
00075           REVERSE_ITERATOR;
00076 
00077   /**
00078    * @name Initialization methods
00079    */
00080   //@{
00081   /**
00082    * Initialize an ACE_Message_Queue.
00083    *
00084    * @param hwm High water mark. Determines how many bytes can be stored in a
00085    *        queue before it's considered full.  Supplier threads must block
00086    *        until the queue is no longer full.
00087    * @param lwm Low water mark. Determines how many bytes must be in the queue
00088    *        before supplier threads are allowed to enqueue additional
00089    *        data.  By default, the @a hwm equals @a lwm, which means
00090    *        that suppliers will be able to enqueue new messages as soon as
00091    *        a consumer removes any message from the queue.  Making the low
00092    *        water mark smaller than the high water mark forces consumers to
00093    *        drain more messages from the queue before suppliers can enqueue
00094    *        new messages, which can minimize the "silly window syndrome."
00095    * @param ns Notification strategy. Pointer to an object conforming to the
00096    *        ACE_Notification_Strategy interface. If set, the object's
00097    *        notify(void) method will be called each time data is added to
00098    *        this ACE_Message_Queue. @see ACE_Reactor_Notification_Strategy.
00099    */
00100   ACE_Message_Queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
00101                      size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
00102                      ACE_Notification_Strategy *ns = 0);
00103   virtual int open (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
00104                     size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
00105                     ACE_Notification_Strategy *ns = 0);
00106   //@}
00107 
00108   /// Releases all resources from the message queue and marks it deactivated.
00109   /// @sa flush().
00110   ///
00111   /// @retval The number of messages released from the queue; -1 on error.
00112   virtual int close (void);
00113 
00114   /// Releases all resources from the message queue and marks it deactivated.
00115   virtual ~ACE_Message_Queue (void);
00116 
00117   /**
00118    * Releases all resources from the message queue but does not mark it
00119    * deactivated.  This method holds the queue lock during this operation.
00120    * @sa close().
00121    *
00122    * @return The number of messages flushed; -1 on error.
00123    */
00124   virtual int flush (void);
00125 
00126   /**
00127    * Release all resources from the message queue but do not mark it
00128    * as deactivated.
00129    *
00130    * @pre The caller must be holding the queue lock before calling this
00131    * method.
00132    *
00133    * @return The number of messages flushed.
00134    */
00135   virtual int flush_i (void);
00136 
00137   /** @name Enqueue and dequeue methods
00138    *
00139    * The enqueue and dequeue methods accept a timeout value passed as
00140    * an ACE_Time_Value *. In all cases, if the timeout pointer is 0,
00141    * the caller will block until action is possible. If the timeout pointer
00142    * is non-zero, the call will wait (if needed, subject to water mark
00143    * settings) until the absolute time specified in the referenced
00144    * ACE_Time_Value object is reached. If the time is reached before the
00145    * desired action is possible, the method will return -1 with errno set
00146    * to @c EWOULDBLOCK. Regardless of the timeout setting, however,
00147    * these methods will also fail and return -1 when the queue is closed,
00148    * deactivated, pulsed, or when a signal occurs.
00149    *
00150    * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of
00151    * ACE_Message_Queue, enqueueing, dequeueing, and how these operations are
00152    * affected by queue state transitions.
00153    */
00154   //@{
00155   /**
00156    * Retrieve a pointer to the first ACE_Message_Block in the queue
00157    * without removing it.
00158    *
00159    * @note Because the block whose pointer is returned is still on the queue,
00160    *       another thread may dequeue the referenced block at any time,
00161    *       including before the calling thread examines the peeked-at block.
00162    *       Be very careful with this method in multithreaded queueing
00163    *       situations.
00164    *
00165    * @param first_item  Reference to an ACE_Message_Block * that will
00166    *                    point to the first block on the queue.  The block
00167    *                    remains on the queue until this or another thread
00168    *                    dequeues it.
00169    * @param timeout     The absolute time the caller will wait until
00170    *                    for a block to be queued.
00171    *
00172    * @retval >0 The number of ACE_Message_Blocks on the queue.
00173    * @retval -1 On failure.  errno holds the reason. Common errno values are:
00174    *            - EWOULDBLOCK: the timeout elapsed
00175    *            - ESHUTDOWN: the queue was deactivated or pulsed
00176    */
00177   virtual int peek_dequeue_head (ACE_Message_Block *&first_item,
00178                                  ACE_Time_Value *timeout = 0);
00179 
00180   /**
00181    * Enqueue an ACE_Message_Block into the queue in accordance with
00182    * the ACE_Message_Block's priority (0 is lowest priority).  FIFO
00183    * order is maintained when messages of the same priority are
00184    * inserted consecutively.
00185    *
00186    * @param new_item Pointer to an ACE_Message_Block that will be
00187    *                 added to the queue.  The block's @c msg_priority()
00188    *                 method will be called to obtain the queueing priority.
00189    * @param timeout  The absolute time the caller will wait until
00190    *                 for the block to be queued.
00191    *
00192    * @retval >0 The number of ACE_Message_Blocks on the queue after adding
00193    *             the specified block.
00194    * @retval -1 On failure.  errno holds the reason. Common errno values are:
00195    *            - EWOULDBLOCK: the timeout elapsed
00196    *            - ESHUTDOWN: the queue was deactivated or pulsed
00197    */
00198   virtual int enqueue_prio (ACE_Message_Block *new_item,
00199                             ACE_Time_Value *timeout = 0);
00200 
00201   /**
00202    * Enqueue an ACE_Message_Block into the queue in accordance with the
00203    * block's deadline time. FIFO order is maintained when messages of
00204    * the same deadline time are inserted consecutively.
00205    *
00206    * @param new_item Pointer to an ACE_Message_Block that will be
00207    *                 added to the queue.  The block's @c msg_deadline_time()
00208    *                 method will be called to obtain the relative queueing
00209    *                 position.
00210    * @param timeout  The absolute time the caller will wait until
00211    *                 for the block to be queued.
00212    *
00213    * @retval >0 The number of ACE_Message_Blocks on the queue after adding
00214    *             the specified block.
00215    * @retval -1 On failure.  errno holds the reason. Common errno values are:
00216    *            - EWOULDBLOCK: the timeout elapsed
00217    *            - ESHUTDOWN: the queue was deactivated or pulsed
00218    */
00219   virtual int enqueue_deadline (ACE_Message_Block *new_item,
00220                                 ACE_Time_Value *timeout = 0);
00221 
00222   /**
00223    * @deprecated This is an alias for enqueue_prio().  It's only here for
00224    * backwards compatibility and will go away in a subsequent release.
00225    * Please use enqueue_prio() instead.
00226    */
00227   virtual int enqueue (ACE_Message_Block *new_item,
00228                        ACE_Time_Value *timeout = 0);
00229 
00230   /**
00231    * Enqueue one or more ACE_Message_Block objects at the tail of the queue.
00232    * If the @a new_item @c next() pointer is non-zero, it is assumed to be the
00233    * start of a series of ACE_Message_Block objects connected via their
00234    * @c next() pointers. The series of blocks will be added to the queue in
00235    * the same order they are passed in as.
00236    *
00237    * @param new_item Pointer to an ACE_Message_Block that will be
00238    *                 added to the queue. If the block's @c next() pointer
00239    *                 is non-zero, all blocks chained from the @c next()
00240    *                 pointer are enqueued as well.
00241    * @param timeout  The absolute time the caller will wait until
00242    *                 for the block to be queued.
00243    *
00244    * @retval >0 The number of ACE_Message_Blocks on the queue after adding
00245    *             the specified block(s).
00246    * @retval -1 On failure.  errno holds the reason. Common errno values are:
00247    *            - EWOULDBLOCK: the timeout elapsed
00248    *            - ESHUTDOWN: the queue was deactivated or pulsed
00249    */
00250   virtual int enqueue_tail (ACE_Message_Block *new_item,
00251                             ACE_Time_Value *timeout = 0);
00252 
00253   /**
00254    * Enqueue one or more ACE_Message_Block objects at the head of the queue.
00255    * If the @a new_item @c next() pointer is non-zero, it is assumed to be the
00256    * start of a series of ACE_Message_Block objects connected via their
00257    * @c next() pointers. The series of blocks will be added to the queue in
00258    * the same order they are passed in as.
00259    *
00260    * @param new_item Pointer to an ACE_Message_Block that will be
00261    *                 added to the queue. If the block's @c next() pointer
00262    *                 is non-zero, all blocks chained from the @c next()
00263    *                 pointer are enqueued as well.
00264    * @param timeout  The absolute time the caller will wait until
00265    *                 for the block to be queued.
00266    *
00267    * @retval >0 The number of ACE_Message_Blocks on the queue after adding
00268    *             the specified block(s).
00269    * @retval -1 On failure.  errno holds the reason. Common errno values are:
00270    *            - EWOULDBLOCK: the timeout elapsed
00271    *            - ESHUTDOWN: the queue was deactivated or pulsed
00272    */
00273   virtual int enqueue_head (ACE_Message_Block *new_item,
00274                             ACE_Time_Value *timeout = 0);
00275 
00276   /// This method is an alias for the dequeue_head() method.
00277   virtual int dequeue (ACE_Message_Block *&first_item,
00278                        ACE_Time_Value *timeout = 0);
00279 
00280   /**
00281    * Dequeue the ACE_Message_Block at the head of the queue and return
00282    * a pointer to the dequeued block.
00283    *
00284    * @param first_item  Reference to an ACE_Message_Block * that will
00285    *                    be set to the address of the dequeued block.
00286    * @param timeout     The absolute time the caller will wait until
00287    *                    for a block to be dequeued.
00288    *
00289    * @retval >=0 The number of ACE_Message_Blocks remaining in the queue.
00290    * @retval -1 On failure.  errno holds the reason. Common errno values are:
00291    *            - EWOULDBLOCK: the timeout elapsed
00292    *            - ESHUTDOWN: the queue was deactivated or pulsed
00293    */
00294   virtual int dequeue_head (ACE_Message_Block *&first_item,
00295                             ACE_Time_Value *timeout = 0);
00296 
00297   /**
00298    * Dequeue the ACE_Message_Block that has the lowest priority (preserves
00299    * FIFO order for messages with the same priority) and return a pointer
00300    * to the dequeued block.
00301    *
00302    * @param first_item  Reference to an ACE_Message_Block * that will
00303    *                    be set to the address of the dequeued block.
00304    * @param timeout     The absolute time the caller will wait until
00305    *                    for a block to be dequeued.
00306    *
00307    * @retval >=0 The number of ACE_Message_Blocks remaining in the queue.
00308    * @retval -1 On failure.  errno holds the reason. Common errno values are:
00309    *            - EWOULDBLOCK: the timeout elapsed
00310    *            - ESHUTDOWN: the queue was deactivated or pulsed
00311    */
00312   virtual int dequeue_prio (ACE_Message_Block *&first_item,
00313                             ACE_Time_Value *timeout = 0);
00314 
00315   /**
00316    * Dequeue the ACE_Message_Block at the tail of the queue and return
00317    * a pointer to the dequeued block.
00318    *
00319    * @param dequeued  Reference to an ACE_Message_Block * that will
00320    *                  be set to the address of the dequeued block.
00321    * @param timeout   The absolute time the caller will wait until
00322    *                  for a block to be dequeued.
00323    *
00324    * @retval >=0 The number of ACE_Message_Blocks remaining in the queue.
00325    * @retval -1 On failure.  errno holds the reason. Common errno values are:
00326    *            - EWOULDBLOCK: the timeout elapsed
00327    *            - ESHUTDOWN: the queue was deactivated or pulsed
00328    */
00329   virtual int dequeue_tail (ACE_Message_Block *&dequeued,
00330                             ACE_Time_Value *timeout = 0);
00331 
00332   /**
00333    * Dequeue the ACE_Message_Block with the earliest deadline time and return
00334    * a pointer to the dequeued block.
00335    *
00336    * @param dequeued  Reference to an ACE_Message_Block * that will
00337    *                  be set to the address of the dequeued block.
00338    * @param timeout   The absolute time the caller will wait until
00339    *                  for a block to be dequeued.
00340    *
00341    * @retval >=0 The number of ACE_Message_Blocks remaining in the queue.
00342    * @retval -1 On failure.  errno holds the reason. Common errno values are:
00343    *            - EWOULDBLOCK: the timeout elapsed
00344    *            - ESHUTDOWN: the queue was deactivated or pulsed
00345    */
00346   virtual int dequeue_deadline (ACE_Message_Block *&dequeued,
00347                                 ACE_Time_Value *timeout = 0);
00348   //@}
00349 
00350   /** @name Queue statistics methods
00351    */
00352   //@{
00353 
00354   /// True if queue is full, else false.
00355   virtual bool is_full (void);
00356   /// True if queue is empty, else false.
00357   virtual bool is_empty (void);
00358 
00359   /**
00360    * Number of total bytes on the queue, i.e., sum of the message
00361    * block sizes.
00362    */
00363   virtual size_t message_bytes (void);
00364 
00365   /**
00366    * Number of total length on the queue, i.e., sum of the message
00367    * block lengths.
00368    */
00369   virtual size_t message_length (void);
00370 
00371   /**
00372    * Number of total messages on the queue.
00373    */
00374   virtual size_t message_count (void);
00375 
00376   // = Manual changes to these stats (used when queued message blocks
00377   // change size or lengths).
00378   /**
00379    * New value of the number of total bytes on the queue, i.e., sum of
00380    * the message block sizes.
00381    */
00382   virtual void message_bytes (size_t new_size);
00383   /**
00384    * New value of the number of total length on the queue, i.e., sum
00385    * of the message block lengths.
00386    */
00387   virtual void message_length (size_t new_length);
00388 
00389   //@}
00390 
00391 
00392   /** @name Water mark (flow control) methods
00393    */
00394   //@{
00395 
00396   /**
00397    * Get high watermark.
00398    */
00399   virtual size_t high_water_mark (void);
00400   /**
00401    * Set the high watermark, which determines how many bytes can be
00402    * stored in a queue before it's considered "full."
00403    */
00404   virtual void high_water_mark (size_t hwm);
00405 
00406   /**
00407    * Get low watermark.
00408    */
00409   virtual size_t low_water_mark (void);
00410   /**
00411    * Set the low watermark, which determines how many bytes must be in
00412    * the queue before supplier threads are allowed to enqueue
00413    * additional ACE_Message_Blocks.
00414    */
00415   virtual void low_water_mark (size_t lwm);
00416   //@}
00417 
00418   /** @name Activation and queue state methods
00419    * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of
00420    * queue states and transitions and how the transitions affect message
00421    * enqueueing and dequeueing operations.
00422    */
00423   //@{
00424 
00425   /**
00426    * Deactivate the queue and wakeup all threads waiting on the queue
00427    * so they can continue.  No messages are removed from the queue,
00428    * however.  Any other operations called until the queue is
00429    * activated again will immediately return -1 with @c errno ==
00430    * ESHUTDOWN.  Returns WAS_INACTIVE if queue was inactive before the
00431    * call and WAS_ACTIVE if queue was active before the call.
00432    */
00433   virtual int deactivate (void);
00434 
00435   /**
00436    * Reactivate the queue so that threads can enqueue and dequeue
00437    * messages again.  Returns the state of the queue before the call.
00438    */
00439   virtual int activate (void);
00440 
00441   /**
00442    * Pulse the queue to wake up any waiting threads.  Changes the
00443    * queue state to PULSED; future enqueue/dequeue operations proceed
00444    * as in ACTIVATED state.
00445    *
00446    * @return The queue's state before this call.
00447    */
00448   virtual int pulse (void);
00449 
00450   /// Returns the current state of the queue, which can be one of
00451   /// ACTIVATED, DEACTIVATED, or PULSED.
00452   virtual int state (void);
00453 
00454   /// Returns true if the state of the queue is <DEACTIVATED>,
00455   /// but false if the queue's is <ACTIVATED> or <PULSED>.
00456   virtual int deactivated (void);
00457   //@}
00458 
00459   /** @name Notification strategy methods
00460    */
00461   //@{
00462 
00463   /**
00464    * This hook is automatically invoked by <enqueue_head>,
00465    * <enqueue_tail>, and <enqueue_prio> when a new item is inserted
00466    * into the queue.  Subclasses can override this method to perform
00467    * specific notification strategies (e.g., signaling events for a
00468    * <WFMO_Reactor>, notifying a <Reactor>, etc.).  In a
00469    * multi-threaded application with concurrent consumers, there is no
00470    * guarantee that the queue will be still be non-empty by the time
00471    * the notification occurs.
00472    */
00473   virtual int notify (void);
00474 
00475   /// Get the notification strategy for the <Message_Queue>
00476   virtual ACE_Notification_Strategy *notification_strategy (void);
00477 
00478   /// Set the notification strategy for the <Message_Queue>
00479   virtual void notification_strategy (ACE_Notification_Strategy *s);
00480   //@}
00481 
00482   /// Returns a reference to the lock used by the ACE_Message_Queue.
00483   virtual ACE_SYNCH_MUTEX_T &lock (void);
00484 
00485   /// Dump the state of an object.
00486   virtual void dump (void) const;
00487 
00488   /// Declare the dynamic allocation hooks.
00489   ACE_ALLOC_HOOK_DECLARE;
00490 
00491 protected:
00492   // = Routines that actually do the enqueueing and dequeueing.
00493 
00494   // These routines assume that locks are held by the corresponding
00495   // public methods.  Since they are virtual, you can change the
00496   // queueing mechanism by subclassing from ACE_Message_Queue.
00497 
00498   /// Enqueue an <ACE_Message_Block *> in accordance with its priority.
00499   virtual int enqueue_i (ACE_Message_Block *new_item);
00500 
00501   /// Enqueue an <ACE_Message_Block *> in accordance with its deadline time.
00502   virtual int enqueue_deadline_i (ACE_Message_Block *new_item);
00503 
00504   /// Enqueue an <ACE_Message_Block *> at the end of the queue.
00505   virtual int enqueue_tail_i (ACE_Message_Block *new_item);
00506 
00507   /// Enqueue an <ACE_Message_Block *> at the head of the queue.
00508   virtual int enqueue_head_i (ACE_Message_Block *new_item);
00509 
00510   /// Dequeue and return the <ACE_Message_Block *> at the head of the
00511   /// queue.
00512   virtual int dequeue_head_i (ACE_Message_Block *&first_item);
00513 
00514   /// Dequeue and return the <ACE_Message_Block *> with the lowest
00515   /// priority.
00516   virtual int dequeue_prio_i (ACE_Message_Block *&dequeued);
00517 
00518   /// Dequeue and return the <ACE_Message_Block *> at the tail of the
00519   /// queue.
00520   virtual int dequeue_tail_i (ACE_Message_Block *&first_item);
00521 
00522   /// Dequeue and return the <ACE_Message_Block *> with the lowest
00523   /// deadline time.
00524   virtual int dequeue_deadline_i (ACE_Message_Block *&first_item);
00525 
00526   // = Check the boundary conditions (assumes locks are held).
00527 
00528   /// True if queue is full, else false.
00529   virtual bool is_full_i (void);
00530 
00531   /// True if queue is empty, else false.
00532   virtual bool is_empty_i (void);
00533 
00534   // = Implementation of the public <activate> and <deactivate> methods.
00535 
00536   // These methods assume locks are held.
00537 
00538   /**
00539    * Notifies all waiting threads that the queue has been deactivated
00540    * so they can wakeup and continue other processing.
00541    * No messages are removed from the queue.
00542    *
00543    * @param pulse  If 0, the queue's state is changed to DEACTIVATED
00544    *               and any other operations called until the queue is
00545    *               reactivated will immediately return -1 with
00546    *               errno == ESHUTDOWN.
00547    *               If not zero, only the waiting threads are notified and
00548    *               the queue's state changes to PULSED.
00549    *
00550    * @return The state of the queue before the call.
00551    */
00552   virtual int deactivate_i (int pulse = 0);
00553 
00554   /// Activate the queue.
00555   virtual int activate_i (void);
00556 
00557   // = Helper methods to factor out common #ifdef code.
00558 
00559   /// Wait for the queue to become non-full.
00560   virtual int wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
00561                                   ACE_Time_Value *timeout);
00562 
00563   /// Wait for the queue to become non-empty.
00564   virtual int wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
00565                                    ACE_Time_Value *timeout);
00566 
00567   /// Inform any threads waiting to enqueue that they can procede.
00568   virtual int signal_enqueue_waiters (void);
00569 
00570   /// Inform any threads waiting to dequeue that they can procede.
00571   virtual int signal_dequeue_waiters (void);
00572 
00573   /// Pointer to head of ACE_Message_Block list.
00574   ACE_Message_Block *head_;
00575 
00576   /// Pointer to tail of ACE_Message_Block list.
00577   ACE_Message_Block *tail_;
00578 
00579   /// Lowest number before unblocking occurs.
00580   size_t low_water_mark_;
00581 
00582   /// Greatest number of bytes before blocking.
00583   size_t high_water_mark_;
00584 
00585   /// Current number of bytes in the queue.
00586   size_t cur_bytes_;
00587 
00588   /// Current length of messages in the queue.
00589   size_t cur_length_;
00590 
00591   /// Current number of messages in the queue.
00592   size_t cur_count_;
00593 
00594   /// The notification strategy used when a new message is enqueued.
00595   ACE_Notification_Strategy *notification_strategy_;
00596 
00597   // = Synchronization primitives for controlling concurrent access.
00598   /// Protect queue from concurrent access.
00599   ACE_SYNCH_MUTEX_T lock_;
00600 
00601   /// Used to make threads sleep until the queue is no longer empty.
00602   ACE_SYNCH_CONDITION_T not_empty_cond_;
00603 
00604   /// Used to make threads sleep until the queue is no longer full.
00605   ACE_SYNCH_CONDITION_T not_full_cond_;
00606 
00607   /// Sends the size of the queue whenever it changes.
00608 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
00609   ACE::Monitor_Control::Size_Monitor *monitor_;
00610 #endif
00611 
00612 private:
00613 
00614   // = Disallow these operations.
00615   ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue<ACE_SYNCH_USE> &))
00616   ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue (const ACE_Message_Queue<ACE_SYNCH_USE> &))
00617 };
00618 
00619 // This typedef is used to get around a compiler bug in g++/vxworks.
00620 typedef ACE_Message_Queue<ACE_SYNCH> ACE_DEFAULT_MESSAGE_QUEUE_TYPE;
00621 
00622 
00623 /**
00624  * @class ACE_Message_Queue_Iterator
00625  *
00626  * @brief Iterator for the ACE_Message_Queue.
00627  */
00628 template <ACE_SYNCH_DECL>
00629 class ACE_Message_Queue_Iterator
00630 {
00631 public:
00632   // = Initialization method.
00633   ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue);
00634 
00635   // = Iteration methods.
00636   /// Pass back the @a entry that hasn't been seen in the queue.
00637   /// Returns 0 when all items have been seen, else 1.
00638   int next (ACE_Message_Block *&entry);
00639 
00640   /// Returns 1 when all items have been seen, else 0.
00641   int done (void) const;
00642 
00643   /// Move forward by one element in the queue.  Returns 0 when all the
00644   /// items in the set have been seen, else 1.
00645   int advance (void);
00646 
00647   /// Dump the state of an object.
00648   void dump (void) const;
00649 
00650   /// Declare the dynamic allocation hooks.
00651   ACE_ALLOC_HOOK_DECLARE;
00652 
00653 private:
00654   /// Message_Queue we are iterating over.
00655   ACE_Message_Queue <ACE_SYNCH_USE> &queue_;
00656 
00657   /// Keeps track of how far we've advanced...
00658   ACE_Message_Block *curr_;
00659 };
00660 
00661 /**
00662  * @class ACE_Message_Queue_Reverse_Iterator
00663  *
00664  * @brief Reverse Iterator for the ACE_Message_Queue.
00665  */
00666 template <ACE_SYNCH_DECL>
00667 class ACE_Message_Queue_Reverse_Iterator
00668 {
00669 public:
00670   // = Initialization method.
00671   ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue);
00672 
00673   // = Iteration methods.
00674   /// Pass back the @a entry that hasn't been seen in the queue.
00675   /// Returns 0 when all items have been seen, else 1.
00676   int next (ACE_Message_Block *&entry);
00677 
00678   /// Returns 1 when all items have been seen, else 0.
00679   int done (void) const;
00680 
00681   /// Move forward by one element in the queue.  Returns 0 when all the
00682   /// items in the set have been seen, else 1.
00683   int advance (void);
00684 
00685   /// Dump the state of an object.
00686   void dump (void) const;
00687 
00688   /// Declare the dynamic allocation hooks.
00689   ACE_ALLOC_HOOK_DECLARE;
00690 
00691 private:
00692   /// Message_Queue we are iterating over.
00693   ACE_Message_Queue <ACE_SYNCH_USE> &queue_;
00694 
00695   /// Keeps track of how far we've advanced...
00696   ACE_Message_Block *curr_;
00697 };
00698 
00699 /**
00700  * @class ACE_Dynamic_Message_Queue
00701  *
00702  * @brief A derived class which adapts the ACE_Message_Queue
00703  * class in order to maintain dynamic priorities for enqueued
00704  * <ACE_Message_Blocks> and manage the queue order according
00705  * to these dynamic priorities.
00706  *
00707  * The messages in the queue are managed so as to preserve
00708  * a logical ordering with minimal overhead per enqueue and
00709  * dequeue operation.  For this reason, the actual order of
00710  * messages in the linked list of the queue may differ from
00711  * their priority order.  As time passes, a message may change
00712  * from pending status to late status, and eventually to beyond
00713  * late status.  To minimize reordering overhead under this
00714  * design force, three separate boundaries are maintained
00715  * within the linked list of messages.  Messages are dequeued
00716  * preferentially from the head of the pending portion, then
00717  * the head of the late portion, and finally from the head
00718  * of the beyond late portion.  In this way, only the boundaries
00719  * need to be maintained (which can be done efficiently, as
00720  * aging messages maintain the same linked list order as they
00721  * progress from one status to the next), with no reordering
00722  * of the messages themselves, while providing correct priority
00723  * ordered dequeueing semantics.
00724  * Head and tail enqueue methods inherited from ACE_Message_Queue
00725  * are made private to prevent out-of-order messages from confusing
00726  * management of the various portions of the queue.  Messages in
00727  * the pending portion of the queue whose priority becomes late
00728  * (according to the specific dynamic strategy) advance into
00729  * the late portion of the queue.  Messages in the late portion
00730  * of the queue whose priority becomes later than can be represented
00731  * advance to the beyond_late portion of the queue.  These behaviors
00732  * support a limited schedule overrun, with pending messages prioritized
00733  * ahead of late messages, and late messages ahead of beyond late
00734  * messages.  These behaviors can be modified in derived classes by
00735  * providing alternative definitions for the appropriate virtual methods.
00736  * When filled with messages, the queue's linked list should look like:
00737  * H                                           T
00738  * |                                           |
00739  * B - B - B - B - L - L - L - P - P - P - P - P
00740  * |           |   |       |   |               |
00741  * BH          BT   LH     LT   PH             PT
00742  * Where the symbols are as follows:
00743  * H  = Head of the entire list
00744  * T  = Tail of the entire list
00745  * B  = Beyond late message
00746  * BH = Beyond late messages Head
00747  * BT = Beyond late messages Tail
00748  * L  = Late message
00749  * LH = Late messages Head
00750  * LT = Late messages Tail
00751  * P  = Pending message
00752  * PH = Pending messages Head
00753  * PT = Pending messages Tail
00754  * Caveat: the virtual methods enqueue_tail, enqueue_head,
00755  * and peek_dequeue_head have semantics for the static
00756  * message queues that cannot be guaranteed for dynamic
00757  * message queues.  The peek_dequeue_head method just
00758  * calls the base class method, while the two enqueue
00759  * methods call the priority enqueue method.  The
00760  * order of messages in the dynamic queue is a function
00761  * of message deadlines and how long they are in the
00762  * queues.  You can manipulate these in some cases to
00763  * ensure the correct semantics, but that is not a
00764  * very stable or portable approach (discouraged).
00765  */
00766 template <ACE_SYNCH_DECL>
00767 class ACE_Dynamic_Message_Queue : public ACE_Message_Queue<ACE_SYNCH_USE>
00768 {
00769 public:
00770   // = Initialization and termination methods.
00771   ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy,
00772                              size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
00773                              size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
00774                              ACE_Notification_Strategy * = 0);
00775 
00776   /// Close down the message queue and release all resources.
00777   virtual ~ACE_Dynamic_Message_Queue (void);
00778 
00779   /**
00780    * Detach all messages with status given in the passed flags from
00781    * the queue and return them by setting passed head and tail pointers
00782    * to the linked list they comprise.  This method is intended primarily
00783    * as a means of periodically harvesting messages that have missed
00784    * their deadlines, but is available in its most general form.  All
00785    * messages are returned in priority order, from head to tail, as of
00786    * the time this method was called.
00787    */
00788   virtual int remove_messages (ACE_Message_Block *&list_head,
00789                                ACE_Message_Block *&list_tail,
00790                                u_int status_flags);
00791 
00792   /**
00793    * Dequeue and return the <ACE_Message_Block *> at the head of the
00794    * queue.  Returns -1 on failure, else the number of items still on
00795    * the queue.
00796    */
00797   virtual int dequeue_head (ACE_Message_Block *&first_item,
00798                             ACE_Time_Value *timeout = 0);
00799 
00800   /// Dump the state of the queue.
00801   virtual void dump (void) const;
00802 
00803   /**
00804    * Just call priority enqueue method: tail enqueue semantics for dynamic
00805    * message queues are unstable: the message may or may not be where
00806    * it was placed after the queue is refreshed prior to the next
00807    * enqueue or dequeue operation.
00808    */
00809   virtual int enqueue_tail (ACE_Message_Block *new_item,
00810                             ACE_Time_Value *timeout = 0);
00811 
00812   /**
00813    * Just call priority enqueue method: head enqueue semantics for dynamic
00814    * message queues are unstable: the message may or may not be where
00815    * it was placed after the queue is refreshed prior to the next
00816    * enqueue or dequeue operation.
00817    */
00818   virtual int enqueue_head (ACE_Message_Block *new_item,
00819                             ACE_Time_Value *timeout = 0);
00820 
00821 
00822   /// Declare the dynamic allocation hooks.
00823   ACE_ALLOC_HOOK_DECLARE;
00824 
00825 protected:
00826 
00827   /**
00828    * Enqueue an <ACE_Message_Block *> in accordance with its priority.
00829    * priority may be *dynamic* or *static* or a combination or *both*
00830    * It calls the priority evaluation function passed into the Dynamic
00831    * Message Queue constructor to update the priorities of all
00832    * enqueued messages.
00833    */
00834   virtual int enqueue_i (ACE_Message_Block *new_item);
00835 
00836   /// Enqueue a message in priority order within a given priority status sublist
00837   virtual int sublist_enqueue_i (ACE_Message_Block *new_item,
00838                                  const ACE_Time_Value &current_time,
00839                                  ACE_Message_Block *&sublist_head,
00840                                  ACE_Message_Block *&sublist_tail,
00841                                  ACE_Dynamic_Message_Strategy::Priority_Status status);
00842 
00843   /**
00844    * Dequeue and return the <ACE_Message_Block *> at the head of the
00845    * logical queue.  Attempts first to dequeue from the pending
00846    * portion of the queue, or if that is empty from the late portion,
00847    * or if that is empty from the beyond late portion, or if that is
00848    * empty just sets the passed pointer to zero and returns -1.
00849    */
00850   virtual int dequeue_head_i (ACE_Message_Block *&first_item);
00851 
00852   /// Refresh the queue using the strategy
00853   /// specific priority status function.
00854   virtual int refresh_queue (const ACE_Time_Value & current_time);
00855 
00856   /// Refresh the pending queue using the strategy
00857   /// specific priority status function.
00858   virtual int refresh_pending_queue (const ACE_Time_Value & current_time);
00859 
00860   /// Refresh the late queue using the strategy
00861   /// specific priority status function.
00862   virtual int refresh_late_queue (const ACE_Time_Value & current_time);
00863 
00864   /// Pointer to head of the pending messages
00865   ACE_Message_Block *pending_head_;
00866 
00867   /// Pointer to tail of the pending messages
00868   ACE_Message_Block *pending_tail_;
00869 
00870   /// Pointer to head of the late messages
00871   ACE_Message_Block *late_head_;
00872 
00873   /// Pointer to tail of the late messages
00874   ACE_Message_Block *late_tail_;
00875 
00876   /// Pointer to head of the beyond late messages
00877   ACE_Message_Block *beyond_late_head_;
00878 
00879   /// Pointer to tail of the beyond late messages
00880   ACE_Message_Block *beyond_late_tail_;
00881 
00882   /// Pointer to a dynamic priority evaluation function.
00883   ACE_Dynamic_Message_Strategy &message_strategy_;
00884 
00885 private:
00886   // = Disallow public access to these operations.
00887 
00888   ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &))
00889   ACE_UNIMPLEMENTED_FUNC (ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &))
00890 
00891   // provide definitions for these (just call base class method),
00892   // but make them private so they're not accessible outside the class
00893 
00894   /// Private method to hide public base class method: just calls base class method
00895   virtual int peek_dequeue_head (ACE_Message_Block *&first_item,
00896                                  ACE_Time_Value *timeout = 0);
00897 
00898 };
00899 
00900 /**
00901  * @class ACE_Message_Queue_Factory
00902  *
00903  * @brief ACE_Message_Queue_Factory is a static factory class template which
00904  * provides a separate factory method for each of the major kinds of
00905  * priority based message dispatching: static, earliest deadline first
00906  * (EDF), and minimum laxity first (MLF).
00907  *
00908  * The ACE_Dynamic_Message_Queue class assumes responsibility for
00909  * releasing the resources of the strategy with which it was
00910  * constructed: the user of a message queue constructed by
00911  * any of these factory methods is only responsible for
00912  * ensuring destruction of the message queue itself.
00913  */
00914 template <ACE_SYNCH_DECL>
00915 class ACE_Message_Queue_Factory
00916 {
00917 public:
00918   /// Factory method for a statically prioritized ACE_Message_Queue
00919   static ACE_Message_Queue<ACE_SYNCH_USE> *
00920     create_static_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
00921                                  size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
00922                                  ACE_Notification_Strategy * = 0);
00923 
00924   /// Factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue
00925   static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
00926     create_deadline_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
00927                                    size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
00928                                    ACE_Notification_Strategy * = 0,
00929                                    u_long static_bit_field_mask = 0x3FFUL,        // 2^(10) - 1
00930                                    u_long static_bit_field_shift = 10,            // 10 low order bits
00931                                    u_long dynamic_priority_max = 0x3FFFFFUL,      // 2^(22)-1
00932                                    u_long dynamic_priority_offset =  0x200000UL); // 2^(22-1)
00933 
00934   /// Factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue
00935   static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
00936     create_laxity_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
00937                                  size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
00938                                  ACE_Notification_Strategy * = 0,
00939                                  u_long static_bit_field_mask = 0x3FFUL,        // 2^(10) - 1
00940                                  u_long static_bit_field_shift = 10,            // 10 low order bits
00941                                  u_long dynamic_priority_max = 0x3FFFFFUL,      // 2^(22)-1
00942                                  u_long dynamic_priority_offset =  0x200000UL); // 2^(22-1)
00943 
00944 
00945 #if defined (ACE_VXWORKS)
00946 
00947   /// Factory method for a wrapped VxWorks message queue
00948   static ACE_Message_Queue_Vx *
00949     create_Vx_message_queue (size_t max_messages, size_t max_message_length,
00950                              ACE_Notification_Strategy *ns = 0);
00951 
00952 #endif /* defined (ACE_VXWORKS) */
00953 
00954 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
00955 
00956   /// Factory method for a NT message queue.
00957   static ACE_Message_Queue_NT *
00958   create_NT_message_queue (size_t max_threads);
00959 
00960 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
00961 };
00962 
00963 /**
00964  * @class ACE_Message_Queue_Ex
00965  *
00966  * @brief A threaded message queueing facility, modeled after the
00967  *        queueing facilities in System V STREAMs.
00968  *
00969  * ACE_Message_Queue_Ex is a strongly-typed version of the
00970  * ACE_Message_Queue class. Rather than queueing in terms of ACE_Message_Block
00971  * objects, ACE_Message_Queue_Ex has a template argument to specify the
00972  * type of objects that are queued.
00973  *
00974  * The second template argument parameterizes the queue's synchronization.
00975  * The argument specifies a synchronization strategy. The two main
00976  * strategies available for ACE_SYNCH_DECL are:
00977  *   -# ACE_MT_SYNCH: all operations are thread-safe
00978  *   -# ACE_NULL_SYNCH: no synchronization and no locking overhead
00979  */
00980 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00981 class ACE_Message_Queue_Ex
00982 {
00983 public:
00984 
00985   enum
00986   {
00987     /// Default priority value. This is the lowest priority.
00988     DEFAULT_PRIORITY = 0
00989   };
00990 
00991 #if 0
00992   //  @@ Iterators are not implemented yet...
00993 
00994   friend class ACE_Message_Queue_Iterator<ACE_SYNCH_USE>;
00995   friend class ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>;
00996 
00997   // = Traits
00998   typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE>
00999           ITERATOR;
01000   typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>
01001           REVERSE_ITERATOR;
01002 #endif /* 0 */
01003 
01004   /**
01005    * @name Initialization methods
01006    */
01007   //@{
01008   /**
01009    * Initialize an ACE_Message_Queue_Ex.
01010    *
01011    * @param high_water_mark High water mark. Determines how many bytes can be
01012    *        stored in a queue before it's considered full.  Supplier threads
01013    *        must block until the queue is no longer full.
01014    * @param low_water_mark Low water mark. Determines how many bytes must be in
01015    *        the queue before supplier threads are allowed to enqueue additional
01016    *        data.  By default, the @a hwm equals @a lwm, which means
01017    *        that suppliers will be able to enqueue new messages as soon as
01018    *        a consumer removes any message from the queue.  Making the low
01019    *        water mark smaller than the high water mark forces consumers to
01020    *        drain more messages from the queue before suppliers can enqueue
01021    *        new messages, which can minimize the "silly window syndrome."
01022    * @param ns Notification strategy. Pointer to an object conforming to the
01023    *        ACE_Notification_Strategy interface. If set, the object's
01024    *        notify(void) method will be called each time data is added to
01025    *        this ACE_Message_Queue. @see ACE_Reactor_Notification_Strategy.
01026    */
01027   ACE_Message_Queue_Ex (size_t high_water_mark = ACE_Message_Queue_Base::DEFAULT_HWM,
01028                         size_t low_water_mark = ACE_Message_Queue_Base::DEFAULT_LWM,
01029                         ACE_Notification_Strategy * ns = 0);
01030   virtual int open (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
01031                     size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
01032                     ACE_Notification_Strategy * = 0);
01033   //@}
01034 
01035   /// Releases all resources from the message queue and marks it deactivated.
01036   /// @sa flush().
01037   ///
01038   /// @retval The number of messages released from the queue; -1 on error.
01039   virtual int close (void);
01040 
01041   /// Releases all resources from the message queue and marks it deactivated.
01042   virtual ~ACE_Message_Queue_Ex (void);
01043 
01044   /**
01045    * Releases all resources from the message queue but does not mark it
01046    * deactivated.  This method holds the queue lock during this operation.
01047    * @sa close().
01048    *
01049    * @return The number of messages flushed; -1 on error.
01050    */
01051   virtual int flush (void);
01052 
01053   /**
01054    * Release all resources from the message queue but do not mark it
01055    * as deactivated.
01056    *
01057    * @pre The caller must be holding the queue lock before calling this
01058    * method.
01059    *
01060    * @return The number of messages flushed.
01061    */
01062   virtual int flush_i (void);
01063 
01064   /** @name Enqueue and dequeue methods
01065    *
01066    * The enqueue and dequeue methods accept a timeout value passed as
01067    * an ACE_Time_Value *. In all cases, if the timeout pointer is 0,
01068    * the caller will block until action is possible. If the timeout pointer
01069    * is non-zero, the call will wait (if needed, subject to water mark
01070    * settings) until the absolute time specified in the referenced
01071    * ACE_Time_Value object is reached. If the time is reached before the
01072    * desired action is possible, the method will return -1 with errno set
01073    * to @c EWOULDBLOCK. Regardless of the timeout setting, however,
01074    * these methods will also fail and return -1 when the queue is closed,
01075    * deactivated, pulsed, or when a signal occurs.
01076    *
01077    * The time parameters are handled the same as in ACE_Message_Queue, so
01078    * you can see C++NPv2 Section 6.2 and APG Section 12.3 for a fuller
01079    * treatment of ACE_Message_Queue, enqueueing, dequeueing, and how these
01080    * operations are affected by queue state transitions.
01081    */
01082   //@{
01083   /**
01084    * Retrieve a pointer to the first item in the queue without removing it.
01085    *
01086    * @note Because the item whose pointer is returned is still on the queue,
01087    *       another thread may dequeue that item at any time,
01088    *       including before the calling thread examines the peeked-at item.
01089    *       Be very careful with this method in multithreaded queueing
01090    *       situations.
01091    *
01092    * @param first_item  Reference to an ACE_MESSAGE_TYPE * that will
01093    *                    point to the first item on the queue.  The item
01094    *                    remains on the queue until this or another thread
01095    *                    dequeues it.
01096    * @param timeout     The absolute time the caller will wait until
01097    *                    for an item to be queued.
01098    *
01099    * @retval >0 The number of items on the queue.
01100    * @retval -1 On failure.  errno holds the reason. Common errno values are:
01101    *            - EWOULDBLOCK: the timeout elapsed
01102    *            - ESHUTDOWN: the queue was deactivated or pulsed
01103    */
01104   virtual int peek_dequeue_head (ACE_MESSAGE_TYPE *&first_item,
01105                                  ACE_Time_Value *timeout = 0);
01106 
01107   /**
01108    * Enqueue an ACE_MESSAGE TYPE into the queue in accordance with
01109    * the specified priority (0 is lowest priority).  FIFO
01110    * order is maintained when items of the same priority are
01111    * inserted consecutively.
01112    *
01113    * @param new_item Pointer to an item that will be added to the queue.
01114    * @param timeout  The absolute time the caller will wait until
01115    *                 for the block to be queued.
01116    * @param priority The priority to use when enqueueing the item.
01117    *
01118    * @retval >0 The number of items on the queue after adding
01119    *             the specified item.
01120    * @retval -1 On failure.  errno holds the reason. Common errno values are:
01121    *            - EWOULDBLOCK: the timeout elapsed
01122    *            - ESHUTDOWN: the queue was deactivated or pulsed
01123    */
01124   virtual int enqueue_prio (ACE_MESSAGE_TYPE *new_item,
01125                             ACE_Time_Value *timeout = 0,
01126                             unsigned long priority = DEFAULT_PRIORITY);
01127 
01128   /**
01129    * This method acts just like enqueue_tail(). There's no deadline
01130    * time associated with items.
01131    */
01132   virtual int enqueue_deadline (ACE_MESSAGE_TYPE *new_item,
01133                                 ACE_Time_Value *timeout = 0);
01134 
01135   /**
01136    * @deprecated This is an alias for enqueue_prio().  It's only here for
01137    * backwards compatibility and will go away in a subsequent release.
01138    * Please use enqueue_prio() instead.
01139    */
01140   virtual int enqueue (ACE_MESSAGE_TYPE *new_item,
01141                        ACE_Time_Value *timeout = 0);
01142 
01143   /**
01144    * Enqueue an item at the tail of the queue.
01145    *
01146    * @param new_item Pointer to an item that will be added to the queue.
01147    * @param timeout  The absolute time the caller will wait until
01148    *                 for the item to be queued.
01149    *
01150    * @retval >0 The number of items on the queue after adding
01151    *             the specified item.
01152    * @retval -1 On failure.  errno holds the reason. Common errno values are:
01153    *            - EWOULDBLOCK: the timeout elapsed
01154    *            - ESHUTDOWN: the queue was deactivated or pulsed
01155    */
01156   virtual int enqueue_tail (ACE_MESSAGE_TYPE *new_item,
01157                             ACE_Time_Value *timeout = 0);
01158 
01159   /**
01160    * Enqueue an item at the head of the queue.
01161    *
01162    * @param new_item Pointer to an item that will be added to the queue.
01163    * @param timeout  The absolute time the caller will wait until
01164    *                 for the item to be queued.
01165    *
01166    * @retval >0 The number of items on the queue after adding
01167    *             the specified item.
01168    * @retval -1 On failure.  errno holds the reason. Common errno values are:
01169    *            - EWOULDBLOCK: the timeout elapsed
01170    *            - ESHUTDOWN: the queue was deactivated or pulsed
01171    */
01172   virtual int enqueue_head (ACE_MESSAGE_TYPE *new_item,
01173                             ACE_Time_Value *timeout = 0);
01174 
01175   /// This method is an alias for the following <dequeue_head> method.
01176   virtual int dequeue (ACE_MESSAGE_TYPE *&first_item,
01177                        ACE_Time_Value *timeout = 0);
01178 
01179   /**
01180    * Dequeue the item at the head of the queue and return a pointer to it.
01181    *
01182    * @param first_item  Reference to an ACE_MESSAGE_TYPE * that will
01183    *                    be set to the address of the dequeued item.
01184    * @param timeout     The absolute time the caller will wait until
01185    *                    for an item to be dequeued.
01186    *
01187    * @retval >=0 The number of items remaining in the queue.
01188    * @retval -1 On failure.  errno holds the reason. Common errno values are:
01189    *            - EWOULDBLOCK: the timeout elapsed
01190    *            - ESHUTDOWN: the queue was deactivated or pulsed
01191    */
01192   virtual int dequeue_head (ACE_MESSAGE_TYPE *&first_item,
01193                             ACE_Time_Value *timeout = 0);
01194 
01195   /**
01196    * Dequeue the item that has the lowest priority (preserves
01197    * FIFO order for items with the same priority) and return a pointer
01198    * to it.
01199    *
01200    * @param dequeued  Reference to an ACE_MESSAGE_TYPE * that will
01201    *                  be set to the address of the dequeued item.
01202    * @param timeout     The absolute time the caller will wait until
01203    *                    for an item to be dequeued.
01204    *
01205    * @retval >=0 The number of items remaining in the queue.
01206    * @retval -1 On failure.  errno holds the reason. Common errno values are:
01207    *            - EWOULDBLOCK: the timeout elapsed
01208    *            - ESHUTDOWN: the queue was deactivated or pulsed
01209    */
01210   virtual int dequeue_prio (ACE_MESSAGE_TYPE *&dequeued,
01211                             ACE_Time_Value *timeout = 0);
01212 
01213   /**
01214    * Dequeue the item at the tail of the queue and return a pointer to it.
01215    *
01216    * @param dequeued  Reference to an ACE_MESSAGE_TYPE * that will
01217    *                  be set to the address of the dequeued item.
01218    * @param timeout   The absolute time the caller will wait until
01219    *                  for an item to be dequeued.
01220    *
01221    * @retval >=0 The number of items remaining in the queue.
01222    * @retval -1 On failure.  errno holds the reason. Common errno values are:
01223    *            - EWOULDBLOCK: the timeout elapsed
01224    *            - ESHUTDOWN: the queue was deactivated or pulsed
01225    */
01226   virtual int dequeue_tail (ACE_MESSAGE_TYPE *&dequeued,
01227                             ACE_Time_Value *timeout = 0);
01228 
01229   /**
01230    * Because there's deadline associated with enqueue_deadline(), this
01231    * method will behave just as dequeue_head().
01232    */
01233   virtual int dequeue_deadline (ACE_MESSAGE_TYPE *&dequeued,
01234                                 ACE_Time_Value *timeout = 0);
01235   //@}
01236 
01237   /** @name Queue statistics methods
01238    */
01239   //@{
01240 
01241   /// True if queue is full, else false.
01242   virtual bool is_full (void);
01243 
01244   /// True if queue is empty, else false.
01245   virtual bool is_empty (void);
01246 
01247   /**
01248    * Number of total bytes on the queue, i.e., sum of the message
01249    * block sizes.
01250    */
01251   virtual size_t message_bytes (void);
01252   /**
01253    * Number of total length on the queue, i.e., sum of the message
01254    * block lengths.
01255    */
01256   virtual size_t message_length (void);
01257   /**
01258    * Number of total messages on the queue.
01259    */
01260   virtual size_t message_count (void);
01261 
01262   // = Manual changes to these stats (used when queued message blocks
01263   // change size or lengths).
01264   /**
01265    * New value of the number of total bytes on the queue, i.e., sum of
01266    * the message block sizes.
01267    */
01268   virtual void message_bytes (size_t new_size);
01269   /**
01270    * New value of the number of total length on the queue, i.e., sum
01271    * of the message block lengths.
01272    */
01273   virtual void message_length (size_t new_length);
01274 
01275   //@}
01276 
01277   /** @name Water mark (flow control) methods
01278    */
01279   //@{
01280 
01281   /**
01282    * Get high watermark.
01283    */
01284   virtual size_t high_water_mark (void);
01285   /**
01286    * Set the high watermark, which determines how many bytes can be
01287    * stored in a queue before it's considered "full."
01288    */
01289   virtual void high_water_mark (size_t hwm);
01290 
01291   /**
01292    * Get low watermark.
01293    */
01294   virtual size_t low_water_mark (void);
01295   /**
01296    * Set the low watermark, which determines how many bytes must be in
01297    * the queue before supplier threads are allowed to enqueue
01298    * additional <ACE_MESSAGE_TYPE>s.
01299    */
01300   virtual void low_water_mark (size_t lwm);
01301   //@}
01302 
01303   /** @name Activation and queue state methods
01304    * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of
01305    * queue states and transitions and how the transitions affect message
01306    * enqueueing and dequeueing operations.
01307    */
01308   //@{
01309 
01310   /**
01311    * Deactivate the queue and wakeup all threads waiting on the queue
01312    * so they can continue.  No messages are removed from the queue,
01313    * however.  Any other operations called until the queue is
01314    * activated again will immediately return -1 with @c errno ==
01315    * ESHUTDOWN.  Returns WAS_INACTIVE if queue was inactive before the
01316    * call and WAS_ACTIVE if queue was active before the call.
01317    */
01318   virtual int deactivate (void);
01319 
01320   /**
01321    * Reactivate the queue so that threads can enqueue and dequeue
01322    * messages again.  Returns the state of the queue before the call.
01323    */
01324   virtual int activate (void);
01325 
01326   /**
01327    * Pulse the queue to wake up any waiting threads.  Changes the
01328    * queue state to PULSED; future enqueue/dequeue operations proceed
01329    * as in ACTIVATED state.
01330    *
01331    * @retval  The queue's state before this call.
01332    */
01333   virtual int pulse (void);
01334 
01335   /// Returns the current state of the queue, which can be one of
01336   /// ACTIVATED, DEACTIVATED, or PULSED.
01337   virtual int state (void);
01338 
01339   /// Returns true if the state of the queue is DEACTIVATED,
01340   /// but false if the queue's state is ACTIVATED or PULSED.
01341   virtual int deactivated (void);
01342   //@}
01343 
01344   /** @name Notification strategy methods
01345    */
01346   //@{
01347 
01348   /**
01349    * This hook is automatically invoked by <enqueue_head>,
01350    * <enqueue_tail>, and <enqueue_prio> when a new item is inserted
01351    * into the queue.  Subclasses can override this method to perform
01352    * specific notification strategies (e.g., signaling events for a
01353    * <WFMO_Reactor>, notifying a <Reactor>, etc.).  In a
01354    * multi-threaded application with concurrent consumers, there is no
01355    * guarantee that the queue will be still be non-empty by the time
01356    * the notification occurs.
01357    */
01358   virtual int notify (void);
01359 
01360   /// Get the notification strategy for the <Message_Queue>
01361   virtual ACE_Notification_Strategy *notification_strategy (void);
01362 
01363   /// Set the notification strategy for the <Message_Queue>
01364   virtual void notification_strategy (ACE_Notification_Strategy *s);
01365   //@}
01366 
01367   /// Returns a reference to the lock used by the ACE_Message_Queue_Ex.
01368   virtual ACE_SYNCH_MUTEX_T &lock (void);
01369 
01370   /// Dump the state of an object.
01371   virtual void dump (void) const;
01372 
01373   /// Declare the dynamic allocation hooks.
01374   ACE_ALLOC_HOOK_DECLARE;
01375 
01376 protected:
01377   /// Implement this via an ACE_Message_Queue.
01378   ACE_Message_Queue<ACE_SYNCH_USE> queue_;
01379 };
01380 
01381 /**
01382  * @class ACE_Message_Queue_Ex_N
01383  *
01384  * @brief A threaded message queueing facility, modeled after the
01385  *        queueing facilities in System V STREAMs which can enqueue
01386  *        multiple messages in one call.
01387  *
01388  * As ACE_Message_Queue_Ex, ACE_Message_Queue_Ex_N is a strongly-typed
01389  * version of the ACE_Message_Queue. If @c ACE_SYNCH_DECL is @c ACE_MT_SYNCH
01390  * then all operations are thread-safe. Otherwise, if it's @c ACE_NULL_SYNCH
01391  * then there's no locking overhead.
01392  *
01393  * The @c ACE_MESSAGE_TYPE messages that are sent to this
01394  * queue can be chained. Messages are expected to have a
01395  * @c next method that returns the next message in the chain;
01396  * ACE_Message_Queue_Ex_N uses this method to run through
01397  * all the incoming messages and enqueue them in one call.
01398  */
01399 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
01400 class ACE_Message_Queue_Ex_N : public ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>
01401 {
01402 public:
01403   // = Initialization and termination methods.
01404 
01405   /**
01406    * Initialize an ACE_Message_Queue_Ex_N.  The @a high_water_mark
01407    * determines how many bytes can be stored in a queue before it's
01408    * considered "full."  Supplier threads must block until the queue
01409    * is no longer full.  The @a low_water_mark determines how many
01410    * bytes must be in the queue before supplier threads are allowed to
01411    * enqueue additional messages.  By default, the @a high_water_mark
01412    * equals the @a low_water_mark, which means that suppliers will be
01413    * able to enqueue new messages as soon as a consumer removes any message
01414    * from the queue.  Making the @a low_water_mark smaller than the
01415    * @a high_water_mark forces consumers to drain more messages from the
01416    * queue before suppliers can enqueue new messages, which can minimize
01417    * the "silly window syndrome."
01418    */
01419   ACE_Message_Queue_Ex_N (size_t high_water_mark = ACE_Message_Queue_Base::DEFAULT_HWM,
01420                           size_t low_water_mark = ACE_Message_Queue_Base::DEFAULT_LWM,
01421                           ACE_Notification_Strategy * ns = 0);
01422 
01423   /// Close down the message queue and release all resources.
01424   virtual ~ACE_Message_Queue_Ex_N (void);
01425 
01426   /**
01427    * Enqueue one or more @c ACE_MESSAGE_TYPE objects at the head of the queue.
01428    * If the @a new_item @c next() pointer is non-zero, it is assumed to be the
01429    * start of a series of @c ACE_MESSAGE_TYPE objects connected via their
01430    * @c next() pointers. The series of blocks will be added to the queue in
01431    * the same order they are passed in as.
01432    *
01433    * @param new_item Pointer to an @c ACE_MESSAGE_TYPE that will be
01434    *                 added to the queue. If the block's @c next() pointer
01435    *                 is non-zero, all blocks chained from the @c next()
01436    *                 pointer are enqueued as well.
01437    * @param tv       The absolute time the caller will wait until
01438    *                 for the block to be queued.
01439    *
01440    * @retval >0 The number of @c ACE_MESSAGE_TYPE objects on the queue after
01441    *             adding the specified block(s).
01442    * @retval -1 On failure.  errno holds the reason. Common errno values are:
01443    *            - EWOULDBLOCK: the timeout elapsed
01444    *            - ESHUTDOWN: the queue was deactivated or pulsed
01445    */
01446   virtual int enqueue_head (ACE_MESSAGE_TYPE *new_item, ACE_Time_Value *tv = 0);
01447 
01448   /**
01449    * Enqueue one or more @c ACE_MESSAGE_TYPE objects at the tail of the queue.
01450    * If the @a new_item @c next() pointer is non-zero, it is assumed to be the
01451    * start of a series of @c ACE_MESSAGE_TYPE objects connected via their
01452    * @c next() pointers. The series of blocks will be added to the queue in
01453    * the same order they are passed in as.
01454    *
01455    * @param new_item Pointer to an @c ACE_MESSAGE_TYPE that will be
01456    *                 added to the queue. If the block's @c next() pointer
01457    *                 is non-zero, all blocks chained from the @c next()
01458    *                 pointer are enqueued as well.
01459    * @param tv       The absolute time the caller will wait until
01460    *                 for the block to be queued.
01461    *
01462    * @retval >0 The number of @c ACE_MESSAGE_TYPE objects on the queue after
01463    *             adding the specified block(s).
01464    * @retval -1 On failure.  errno holds the reason. Common errno values are:
01465    *            - EWOULDBLOCK: the timeout elapsed
01466    *            - ESHUTDOWN: the queue was deactivated or pulsed
01467    */
01468   virtual int enqueue_tail (ACE_MESSAGE_TYPE *new_item, ACE_Time_Value *tv = 0);
01469 
01470   /// Declare the dynamic allocation hooks.
01471   ACE_ALLOC_HOOK_DECLARE;
01472 
01473 protected:
01474   /**
01475    * An helper method that wraps the incoming chain messages
01476    * with ACE_Message_Blocks.
01477    */
01478   ACE_Message_Block *wrap_with_mbs_i (ACE_MESSAGE_TYPE *new_item);
01479 };
01480 
01481 ACE_END_VERSIONED_NAMESPACE_DECL
01482 
01483 #if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
01484 #include "ace/Message_Queue_T.cpp"
01485 #endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
01486 
01487 #if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
01488 #pragma implementation ("Message_Queue_T.cpp")
01489 #endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
01490 
01491 #include /**/ "ace/post.h"
01492 
01493 #endif /* ACE_MESSAGE_QUEUE_T_H */

Generated on Tue Feb 2 17:18:40 2010 for ACE by  doxygen 1.4.7