Message_Queue.h

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 
00003 //=============================================================================
00004 /**
00005  *  @file    Message_Queue.h
00006  *
00007  *  $Id: Message_Queue.h 78651 2007-06-29 12:14:31Z johnnyw $
00008  *
00009  *  @author Douglas C. Schmidt <schmidt@cs.wustl.edu>
00010  */
00011 //=============================================================================
00012 
00013 #ifndef ACE_MESSAGE_QUEUE_H
00014 #define ACE_MESSAGE_QUEUE_H
00015 #include /**/ "ace/pre.h"
00016 
00017 #include "ace/Message_Block.h"
00018 
00019 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00020 # pragma once
00021 #endif /* ACE_LACKS_PRAGMA_ONCE */
00022 
00023 #include "ace/IO_Cntl_Msg.h"
00024 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
00025 # include "ace/Synch_Traits.h"   /* Needed in ACE_Message_Queue_NT */
00026 # include "ace/Thread_Mutex.h"   /* Needed in ACE_Message_Queue_NT */
00027 #endif
00028 
00029 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00030 
00031 // Forward decls.
00032 class ACE_Notification_Strategy;
00033 template <ACE_SYNCH_DECL> class ACE_Message_Queue_Iterator;
00034 template <ACE_SYNCH_DECL> class ACE_Message_Queue_Reverse_Iterator;
00035 
00036 /**
00037  * @class ACE_Message_Queue_Base
00038  *
00039  * @brief Base class for ACE_Message_Queue, which is the central
00040  * queueing facility for messages in the ACE framework.
00041  *
00042  * For all the ACE_Time_Value pointer parameters the caller will
00043  * block until action is possible if @a timeout == 0.  Otherwise, it
00044  * will wait until the absolute time specified in *@a timeout
00045  * elapses.
00046  *
00047  * A queue is always in one of three states:
00048  * . ACTIVATED
00049  * . DEACTIVATED
00050  * . PULSED
00051  */
00052 class ACE_Export ACE_Message_Queue_Base
00053 {
00054 public:
00055   enum
00056   {
00057     // Default high and low watermarks.
00058 
00059     /// Default high watermark (16 K).
00060     DEFAULT_HWM = 16 * 1024,
00061     /// Default low watermark (same as high water mark).
00062     DEFAULT_LWM = 16 * 1024,
00063 
00064     // Queue states.  Before PULSED state was added, the activate()
00065     // and deactivate() methods returned WAS_INACTIVE or WAS_ACTIVE
00066     // to indicate the previous condition.  Now those methods
00067     // return the state the queue was previously in.  WAS_ACTIVE
00068     // and WAS_INACTIVE are defined to match previous semantics for
00069     // applications that don't use the PULSED state.
00070 
00071     /// @deprecated Use ACTIVATED instead.
00072     WAS_ACTIVE = 1,
00073     /// Message queue is active and processing normally
00074     ACTIVATED = 1,
00075 
00076     /// @deprecated Use DEACTIVATED instead.
00077     WAS_INACTIVE = 2,
00078     /// Queue is deactivated; no enqueue or dequeue operations allowed.
00079     DEACTIVATED = 2,
00080 
00081     /// Message queue was pulsed; enqueue and dequeue may proceed normally.
00082     PULSED = 3
00083 
00084   };
00085 
00086   ACE_Message_Queue_Base (void);
00087 
00088   /// Close down the message queue and release all resources.
00089   virtual int close (void) = 0;
00090 
00091   /// Close down the message queue and release all resources.
00092   virtual ~ACE_Message_Queue_Base (void);
00093 
00094   // = Enqueue and dequeue methods.
00095 
00096   /**
00097    * Retrieve the first ACE_Message_Block without removing it.  Note
00098    * that @a timeout uses <{absolute}> time rather than <{relative}>
00099    * time.  If the @a timeout elapses without receiving a message -1 is
00100    * returned and @c errno is set to @c EWOULDBLOCK.  If the queue is
00101    * deactivated -1 is returned and @c errno is set to <ESHUTDOWN>.
00102    * Otherwise, returns -1 on failure, else the number of items still
00103    * on the queue.
00104    */
00105   virtual int peek_dequeue_head (ACE_Message_Block *&first_item,
00106                                  ACE_Time_Value *timeout = 0) = 0;
00107 
00108   /**
00109    * Enqueue a <ACE_Message_Block *> into the tail of the queue.
00110    * Returns number of items in queue if the call succeeds or -1
00111    * otherwise.  These calls return -1 when queue is closed,
00112    * deactivated (in which case @c errno == <ESHUTDOWN>), when a signal
00113    * occurs (in which case @c errno == <EINTR>, or if the time
00114    * specified in timeout elapses (in which case @c errno ==
00115    * @c EWOULDBLOCK).
00116    */
00117   virtual int enqueue_tail (ACE_Message_Block *new_item,
00118                             ACE_Time_Value *timeout = 0) = 0;
00119   virtual int enqueue (ACE_Message_Block *new_item,
00120                        ACE_Time_Value *timeout = 0) = 0;
00121 
00122   /**
00123    * Dequeue and return the <ACE_Message_Block *> at the head of the
00124    * queue.  Returns number of items in queue if the call succeeds or
00125    * -1 otherwise.  These calls return -1 when queue is closed,
00126    * deactivated (in which case @c errno == <ESHUTDOWN>), when a signal
00127    * occurs (in which case @c errno == <EINTR>, or if the time
00128    * specified in timeout elapses (in which case @c errno ==
00129    * @c EWOULDBLOCK).
00130    */
00131   virtual int dequeue_head (ACE_Message_Block *&first_item,
00132                             ACE_Time_Value *timeout = 0) = 0;
00133   virtual int dequeue (ACE_Message_Block *&first_item,
00134                        ACE_Time_Value *timeout = 0) = 0;
00135 
00136   // = Check if queue is full/empty.
00137   /// True if queue is full, else false.
00138   virtual int is_full (void) = 0;
00139 
00140   /// True if queue is empty, else false.
00141   virtual int is_empty (void) = 0;
00142 
00143   // = Queue statistic methods.
00144 
00145   /// Number of total bytes on the queue, i.e., sum of the message
00146   /// block sizes.
00147   virtual size_t message_bytes (void) = 0;
00148 
00149   /// Number of total length on the queue, i.e., sum of the message
00150   /// block lengths.
00151   virtual size_t message_length (void) = 0;
00152 
00153   /// Number of total messages on the queue.
00154   virtual size_t message_count (void) = 0;
00155 
00156   /// New value of the number of total bytes on the queue, i.e.,
00157   /// sum of the message block sizes.
00158   virtual void message_bytes (size_t new_size) = 0;
00159 
00160   /// New value of the number of total length on the queue, i.e.,
00161   /// sum of the message block lengths.
00162   virtual void message_length (size_t new_length) = 0;
00163 
00164   // = Activation control methods.
00165 
00166   /**
00167    * Deactivate the queue and wake up all threads waiting on the queue
00168    * so they can continue.  No messages are removed from the queue,
00169    * however.  Any other operations called until the queue is
00170    * activated again will immediately return -1 with @c errno
00171    * ESHUTDOWN.
00172    *
00173    * @retval  The queue's state before this call.
00174    */
00175   virtual int deactivate (void) = 0;
00176 
00177   /**
00178    * Reactivate the queue so that threads can enqueue and dequeue
00179    * messages again.
00180    *
00181    * @retval  The queue's state before this call.
00182    */
00183   virtual int activate (void) = 0;
00184 
00185   /**
00186    * Pulse the queue to wake up any waiting threads.  Changes the
00187    * queue state to PULSED; future enqueue/dequeue operations proceed
00188    * as in ACTIVATED state.
00189    *
00190    * @retval  The queue's state before this call.
00191    */
00192   virtual int pulse (void) = 0;
00193 
00194   /// Returns the current state of the queue.
00195   virtual int state (void);
00196 
00197   /// Returns 1 if the state of the queue is DEACTIVATED,
00198   /// and 0 if the queue's state is ACTIVATED or PULSED.
00199   virtual int deactivated (void) = 0;
00200 
00201   /// Get the notification strategy for the <Message_Queue>
00202   virtual ACE_Notification_Strategy *notification_strategy (void) = 0;
00203 
00204   /// Set the notification strategy for the <Message_Queue>
00205   virtual void notification_strategy (ACE_Notification_Strategy *s) = 0;
00206 
00207   // = Notification hook.
00208 
00209   /// Dump the state of an object.
00210   virtual void dump (void) const = 0;
00211 
00212   /// Declare the dynamic allocation hooks.
00213   ACE_ALLOC_HOOK_DECLARE;
00214 
00215 private:
00216   // = Disallow copying and assignment.
00217   ACE_Message_Queue_Base (const ACE_Message_Queue_Base &);
00218   void operator= (const ACE_Message_Queue_Base &);
00219 
00220 protected:
00221   /// Indicates the state of the queue, which can be
00222   /// <ACTIVATED>, <DEACTIVATED>, or <PULSED>.
00223   int state_;
00224 
00225 };
00226 
00227 ACE_END_VERSIONED_NAMESPACE_DECL
00228 
00229 // Include the templates here.
00230 #include "ace/Message_Queue_T.h"
00231 
00232 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00233 
00234 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
00235 /**
00236  * @class ACE_Message_Queue_NT
00237  *
00238  * @brief Message Queue implementation using IO completion port on NT.
00239  *
00240  * Implementation of a strip-downed ACE_Message_Queue using NT's
00241  * IO completion port mechanism.
00242  * @note *Many* ACE_Message_Queue features are not supported with
00243  * this implementation, including:
00244  * * <open> method have different signatures.
00245  * * <dequeue_head> *requires* that the ACE_Message_Block
00246  * pointer argument point to an ACE_Message_Block that was
00247  * allocated by the caller.
00248  * * <peek_dequeue_head>.
00249  * * <ACE_Message_Queue_Iterators>.
00250  * * No flow control.
00251  */
00252 class ACE_Export ACE_Message_Queue_NT : public ACE_Message_Queue_Base
00253 {
00254 public:
00255   // = Initialization and termination methods.
00256   ACE_Message_Queue_NT (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM);
00257 
00258   /**
00259    * Initialize the Message Queue by creating a new NT I/O completion
00260    * port.  The first arguemnt specifies the number of threads
00261    * released by the MQ that are allowed to run concurrently.  Return
00262    * 0 when succeeds, -1 otherwise.
00263    */
00264   virtual int open (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM);
00265 
00266   /// Close down the underlying I/O completion port.  You need to
00267   /// re-open the MQ after this function is executed.
00268   virtual int close (void);
00269 
00270   /// Close down the message queue and release all resources.
00271   virtual ~ACE_Message_Queue_NT (void);
00272 
00273   // = Enqueue and dequeue methods.
00274 
00275   /**
00276    * Enqueue an <ACE_Message_Block *> at the end of the queue.
00277    * Returns -1 on failure, else the number of items still on the
00278    * queue.
00279    */
00280   virtual int enqueue_tail (ACE_Message_Block *new_item,
00281                             ACE_Time_Value *timeout = 0);
00282   virtual int enqueue (ACE_Message_Block *new_item,
00283                        ACE_Time_Value *timeout = 0);
00284 
00285   /**
00286    * Dequeue and return the <ACE_Message_Block *> at the head of the
00287    * queue.  Returns -1 on failure, else the number of items still on
00288    * the queue.
00289    */
00290   virtual int dequeue_head (ACE_Message_Block *&first_item,
00291                             ACE_Time_Value *timeout = 0);
00292   virtual int dequeue (ACE_Message_Block *&first_item,
00293                        ACE_Time_Value *timeout = 0);
00294 
00295   // = Check if queue is full/empty.
00296   /**
00297    * Always return false.
00298    */
00299 
00300   virtual int is_full (void);
00301   /**
00302    * True if queue is empty, else false.  Notice the return value is
00303    * only transient.
00304    */
00305   virtual int is_empty (void);
00306 
00307   // = Queue statistic methods (transient.)
00308   /**
00309    * Number of total bytes on the queue, i.e., sum of the message
00310    * block sizes.
00311    */
00312   virtual size_t message_bytes (void);
00313 
00314   /**
00315    * Number of total length on the queue, i.e., sum of the message
00316    * block lengths.
00317    */
00318   virtual size_t message_length (void);
00319 
00320   /**
00321    * Number of total messages on the queue.
00322    */
00323   virtual size_t message_count (void);
00324 
00325   // = Manual changes to these stats (used when queued message blocks
00326   // change size or lengths).
00327   /**
00328    * New value of the number of total bytes on the queue, i.e., sum of
00329    * the message block sizes.
00330    */
00331   virtual void message_bytes (size_t new_size);
00332 
00333   /**
00334    * New value of the number of total length on the queue, i.e., sum
00335    * of the message block lengths.
00336    */
00337   virtual void message_length (size_t new_length);
00338 
00339   /// Get the max concurrent thread number.
00340   virtual DWORD max_threads (void);
00341 
00342   // = Activation control methods.
00343 
00344   /**
00345    * Deactivate the queue and wake up all threads waiting on the queue
00346    * so they can continue.  No messages are removed from the queue,
00347    * however.  Any other operations called until the queue is
00348    * activated again will immediately return -1 with @c errno
00349    * ESHUTDOWN.
00350    *
00351    * @retval  The queue's state before this call.
00352    */
00353   virtual int deactivate (void);
00354 
00355   /**
00356    * Reactivate the queue so that threads can enqueue and dequeue
00357    * messages again.  Returns the state of the queue before the call.
00358    */
00359   virtual int activate (void);
00360 
00361   /**
00362    * Pulse the queue to wake up any waiting threads.  Changes the
00363    * queue state to PULSED; future enqueue/dequeue operations proceed
00364    * as in ACTIVATED state.
00365    *
00366    * @retval  The queue's state before this call.
00367    */
00368   virtual int pulse (void);
00369 
00370   /// Returns true if the state of the queue is <DEACTIVATED>,
00371   /// but false if the queue's is <ACTIVATED> or <PULSED>.
00372   virtual int deactivated (void);
00373 
00374   // = Not currently implemented...
00375   int peek_dequeue_head (ACE_Message_Block *&first_item,
00376                          ACE_Time_Value *timeout = 0);
00377   ACE_Notification_Strategy *notification_strategy (void);
00378   void notification_strategy (ACE_Notification_Strategy *s);
00379 
00380   // = Notification hook.
00381 
00382   /// Dump the state of an object.
00383   virtual void dump (void) const;
00384 
00385   /// Get the handle to the underlying completion port.
00386   virtual ACE_HANDLE completion_port (void);
00387 
00388   /// Declare the dynamic allocation hooks.
00389   ACE_ALLOC_HOOK_DECLARE;
00390 
00391 private:
00392 
00393   // Disallow copying and assignment.
00394   ACE_Message_Queue_NT (const ACE_Message_Queue_NT &);
00395   void operator= (const ACE_Message_Queue_NT &);
00396 
00397 private:
00398   // = Internal states.
00399 
00400   /// Maximum threads that can be released (and run) concurrently.
00401   DWORD max_cthrs_;
00402 
00403   /// Current number of threads waiting to dequeue messages.
00404   DWORD cur_thrs_;
00405 
00406   /// Current number of bytes in queue.
00407   size_t cur_bytes_;
00408 
00409   /// Current length of messages in queue.
00410   size_t cur_length_;
00411 
00412   /// Current number of messages in the queue.
00413   size_t cur_count_;
00414 
00415   /**
00416    * Synchronizer.  This should really be an ACE_Recursive_Thread_Mutex
00417    * but since this class is only supported on NT, it's okay to use
00418    * ACE_Thread_Mutex here.
00419    */
00420   ACE_SYNCH_MUTEX lock_;
00421 
00422   /// Underlying NT IoCompletionPort.
00423   ACE_HANDLE completion_port_;
00424 
00425 };
00426 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
00427 
00428 ACE_END_VERSIONED_NAMESPACE_DECL
00429 
00430 #if defined (__ACE_INLINE__)
00431 #include "ace/Message_Queue.inl"
00432 #endif /* __ACE_INLINE__ */
00433 
00434 #include /**/ "ace/post.h"
00435 #endif /* ACE_MESSAGE_QUEUE_H */

Generated on Sun Jan 27 12:05:31 2008 for ACE by doxygen 1.3.6