Message_Queue.h

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 
00003 //=============================================================================
00004 /**
00005  *  @file    Message_Queue.h
00006  *
00007  *  Message_Queue.h,v 4.87 2006/05/30 10:57:22 jwillemsen Exp
00008  *
00009  *  @author Douglas C. Schmidt <schmidt@cs.wustl.edu>
00010  */
00011 //=============================================================================
00012 
00013 #ifndef ACE_MESSAGE_QUEUE_H
00014 #define ACE_MESSAGE_QUEUE_H
00015 #include /**/ "ace/pre.h"
00016 
00017 #include "ace/Message_Block.h"
00018 
00019 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00020 # pragma once
00021 #endif /* ACE_LACKS_PRAGMA_ONCE */
00022 
00023 #include "ace/IO_Cntl_Msg.h"
00024 #if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0)
00025 # include "ace/Thread_Mutex.h"   /* Needed in ACE_Message_Queue_NT */
00026 #endif
00027 
00028 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00029 
00030 // Forward decls.
00031 class ACE_Notification_Strategy;
00032 template <ACE_SYNCH_DECL> class ACE_Message_Queue_Iterator;
00033 template <ACE_SYNCH_DECL> class ACE_Message_Queue_Reverse_Iterator;
00034 
00035 /**
00036  * @class ACE_Message_Queue_Base
00037  *
00038  * @brief Base class for ACE_Message_Queue, which is the central
00039  * queueing facility for messages in the ACE framework.
00040  *
00041  * For all the <ACE_Time_Value> pointer parameters the caller will
00042  * block until action is possible if <timeout> == 0.  Otherwise, it
00043  * will wait until the absolute time specified in *<timeout>
00044  * elapses.
00045  *
00046  * A queue is always in one of three states:
00047  * . ACTIVATED
00048  * . DEACTIVATED
00049  * . PULSED
00050  */
00051 class ACE_Export ACE_Message_Queue_Base
00052 {
00053 public:
00054   enum
00055   {
00056     // Default high and low watermarks.
00057 
00058     /// Default high watermark (16 K).
00059     DEFAULT_HWM = 16 * 1024,
00060     /// Default low watermark (same as high water mark).
00061     DEFAULT_LWM = 16 * 1024,
00062 
00063     // Queue states.  Before PULSED state was added, the activate()
00064     // and deactivate() methods returned WAS_INACTIVE or WAS_ACTIVE
00065     // to indicate the previous condition.  Now those methods
00066     // return the state the queue was previously in.  WAS_ACTIVE
00067     // and WAS_INACTIVE are defined to match previous semantics for
00068     // applications that don't use the PULSED state.
00069 
00070     /// @deprecated Use ACTIVATED instead.
00071     WAS_ACTIVE = 1,
00072     /// Message queue is active and processing normally
00073     ACTIVATED = 1,
00074 
00075     /// @deprecated Use DEACTIVATED instead.
00076     WAS_INACTIVE = 2,
00077     /// Queue is deactivated; no enqueue or dequeue operations allowed.
00078     DEACTIVATED = 2,
00079 
00080     /// Message queue was pulsed; enqueue and dequeue may proceed normally.
00081     PULSED = 3
00082 
00083   };
00084 
00085   ACE_Message_Queue_Base (void);
00086 
00087   /// Close down the message queue and release all resources.
00088   virtual int close (void) = 0;
00089 
00090   /// Close down the message queue and release all resources.
00091   virtual ~ACE_Message_Queue_Base (void);
00092 
00093   // = Enqueue and dequeue methods.
00094 
00095   /**
00096    * Retrieve the first ACE_Message_Block without removing it.  Note
00097    * that <timeout> uses <{absolute}> time rather than <{relative}>
00098    * time.  If the <timeout> elapses without receiving a message -1 is
00099    * returned and <errno> is set to <EWOULDBLOCK>.  If the queue is
00100    * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>.
00101    * Otherwise, returns -1 on failure, else the number of items still
00102    * on the queue.
00103    */
00104   virtual int peek_dequeue_head (ACE_Message_Block *&first_item,
00105                                  ACE_Time_Value *timeout = 0) = 0;
00106 
00107   /**
00108    * Enqueue a <ACE_Message_Block *> into the tail of the queue.
00109    * Returns number of items in queue if the call succeeds or -1
00110    * otherwise.  These calls return -1 when queue is closed,
00111    * deactivated (in which case <errno> == <ESHUTDOWN>), when a signal
00112    * occurs (in which case <errno> == <EINTR>, or if the time
00113    * specified in timeout elapses (in which case <errno> ==
00114    * <EWOULDBLOCK>).
00115    */
00116   virtual int enqueue_tail (ACE_Message_Block *new_item,
00117                             ACE_Time_Value *timeout = 0) = 0;
00118   virtual int enqueue (ACE_Message_Block *new_item,
00119                        ACE_Time_Value *timeout = 0) = 0;
00120 
00121   /**
00122    * Dequeue and return the <ACE_Message_Block *> at the head of the
00123    * queue.  Returns number of items in queue if the call succeeds or
00124    * -1 otherwise.  These calls return -1 when queue is closed,
00125    * deactivated (in which case <errno> == <ESHUTDOWN>), when a signal
00126    * occurs (in which case <errno> == <EINTR>, or if the time
00127    * specified in timeout elapses (in which case <errno> ==
00128    * <EWOULDBLOCK>).
00129    */
00130   virtual int dequeue_head (ACE_Message_Block *&first_item,
00131                             ACE_Time_Value *timeout = 0) = 0;
00132   virtual int dequeue (ACE_Message_Block *&first_item,
00133                        ACE_Time_Value *timeout = 0) = 0;
00134 
00135   // = Check if queue is full/empty.
00136   /// True if queue is full, else false.
00137   virtual int is_full (void) = 0;
00138 
00139   /// True if queue is empty, else false.
00140   virtual int is_empty (void) = 0;
00141 
00142   // = Queue statistic methods.
00143 
00144   /// Number of total bytes on the queue, i.e., sum of the message
00145   /// block sizes.
00146   virtual size_t message_bytes (void) = 0;
00147 
00148   /// Number of total length on the queue, i.e., sum of the message
00149   /// block lengths.
00150   virtual size_t message_length (void) = 0;
00151 
00152   /// Number of total messages on the queue.
00153   virtual size_t message_count (void) = 0;
00154 
00155   /// New value of the number of total bytes on the queue, i.e.,
00156   /// sum of the message block sizes.
00157   virtual void message_bytes (size_t new_size) = 0;
00158 
00159   /// New value of the number of total length on the queue, i.e.,
00160   /// sum of the message block lengths.
00161   virtual void message_length (size_t new_length) = 0;
00162 
00163   // = Activation control methods.
00164 
00165   /**
00166    * Deactivate the queue and wake up all threads waiting on the queue
00167    * so they can continue.  No messages are removed from the queue,
00168    * however.  Any other operations called until the queue is
00169    * activated again will immediately return -1 with @c errno
00170    * ESHUTDOWN.
00171    *
00172    * @retval  The queue's state before this call.
00173    */
00174   virtual int deactivate (void) = 0;
00175 
00176   /**
00177    * Reactivate the queue so that threads can enqueue and dequeue
00178    * messages again.
00179    *
00180    * @retval  The queue's state before this call.
00181    */
00182   virtual int activate (void) = 0;
00183 
00184   /**
00185    * Pulse the queue to wake up any waiting threads.  Changes the
00186    * queue state to PULSED; future enqueue/dequeue operations proceed
00187    * as in ACTIVATED state.
00188    *
00189    * @retval  The queue's state before this call.
00190    */
00191   virtual int pulse (void) = 0;
00192 
00193   /// Returns the current state of the queue.
00194   virtual int state (void);
00195 
00196   /// Returns 1 if the state of the queue is DEACTIVATED,
00197   /// and 0 if the queue's state is ACTIVATED or PULSED.
00198   virtual int deactivated (void) = 0;
00199 
00200   /// Get the notification strategy for the <Message_Queue>
00201   virtual ACE_Notification_Strategy *notification_strategy (void) = 0;
00202 
00203   /// Set the notification strategy for the <Message_Queue>
00204   virtual void notification_strategy (ACE_Notification_Strategy *s) = 0;
00205 
00206   // = Notification hook.
00207 
00208   /// Dump the state of an object.
00209   virtual void dump (void) const = 0;
00210 
00211   /// Declare the dynamic allocation hooks.
00212   ACE_ALLOC_HOOK_DECLARE;
00213 
00214 private:
00215   // = Disallow copying and assignment.
00216   ACE_Message_Queue_Base (const ACE_Message_Queue_Base &);
00217   void operator= (const ACE_Message_Queue_Base &);
00218 
00219 protected:
00220   /// Indicates the state of the queue, which can be
00221   /// <ACTIVATED>, <DEACTIVATED>, or <PULSED>.
00222   int state_;
00223 
00224 };
00225 
00226 ACE_END_VERSIONED_NAMESPACE_DECL
00227 
00228 // Include the templates here.
00229 #include "ace/Message_Queue_T.h"
00230 
00231 #if defined (ACE_VXWORKS)
00232 # include /**/ <msgQLib.h>
00233 # include "ace/Null_Mutex.h"
00234 # include "ace/Null_Condition.h"
00235 
00236 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00237 
00238 /**
00239  * @class ACE_Message_Queue_Vx
00240  *
00241  * @brief Wrapper for VxWorks message queues.
00242  *
00243  * Specialization of ACE_Message_Queue to simply wrap VxWorks
00244  * MsgQ.  It does not use any synchronization, because it relies
00245  * on the native MsgQ implementation to take care of that.  The
00246  * only system calls that it uses are VxWorks msgQLib calls, so
00247  * it is suitable for use in interrupt service routines.
00248  * @note *Many* ACE_Message_Queue features are not supported with
00249  * this specialization, including:
00250  * * The two size arguments to the constructor and <open> are
00251  * interpreted differently.  The first is interpreted as the
00252  * maximum number of bytes in a message.  The second is
00253  * interpreted as the maximum number of messages that can be
00254  * queued.
00255  * * <dequeue_head> *requires* that the ACE_Message_Block
00256  * pointer argument point to an ACE_Message_Block that was
00257  * allocated by the caller.  It must be big enough to support
00258  * the received message, without using continuation. The
00259  * pointer argument is not modified.
00260  * * Message priority.  MSG_Q_FIFO is hard-coded.
00261  * * enqueue method timeouts.
00262  * * <peek_dequeue_head>.
00263  * * <ACE_Message_Queue_Iterators>.
00264  * * The ability to change low and high water marks after creation.
00265  * * <Message_Block> chains.  The continuation field of ACE_Message_Block
00266  * *   is ignored; only the first block of a fragment chain is
00267  * *   recognized.
00268  */
00269 class ACE_Message_Queue_Vx : public ACE_Message_Queue<ACE_NULL_SYNCH>
00270 {
00271 public:
00272   // = Initialization and termination methods.
00273   ACE_Message_Queue_Vx (size_t max_messages,
00274                         size_t max_message_length,
00275                         ACE_Notification_Strategy * = 0);
00276 
00277   // Create a message queue with all the defaults.
00278   /// Create a message queue with all the defaults.
00279   virtual int open (size_t max_messages,
00280                     size_t max_message_length,
00281                     ACE_Notification_Strategy * = 0);
00282 
00283   /// Close down the message queue and release all resources.
00284   virtual int close (void);
00285 
00286   /// Close down the message queue and release all resources.
00287   virtual ~ACE_Message_Queue_Vx (void);
00288 
00289   // = Queue statistic methods.
00290   /**
00291    * Number of total bytes on the queue, i.e., sum of the message
00292    * block sizes.
00293    */
00294   virtual size_t message_bytes (void);
00295 
00296   /**
00297    * Number of total length on the queue, i.e., sum of the message
00298    * block lengths.
00299    */
00300   virtual size_t message_length (void);
00301 
00302   /**
00303    * Number of total messages on the queue.
00304    */
00305   virtual size_t message_count (void);
00306 
00307   // = Manual changes to these stats (used when queued message blocks
00308   // change size or lengths).
00309   /**
00310    * New value of the number of total bytes on the queue, i.e., sum of
00311    * the message block sizes.
00312    */
00313   virtual void message_bytes (size_t new_size);
00314   /**
00315    * New value of the number of total length on the queue, i.e., sum
00316    * of the message block lengths.
00317    */
00318   virtual void message_length (size_t new_length);
00319 
00320   // = Flow control routines
00321 
00322   /// Get high watermark.
00323   virtual size_t high_water_mark (void);
00324 
00325   /// Set high watermark.
00326   virtual void high_water_mark (size_t hwm);
00327 
00328   /// Get low watermark.
00329   virtual size_t low_water_mark (void);
00330 
00331   /// Set low watermark.
00332   virtual void low_water_mark (size_t lwm);
00333 
00334   // = Activation control methods.
00335 
00336   /// Dump the state of an object.
00337   void dump (void) const;
00338 
00339   /// Declare the dynamic allocation hooks.
00340   ACE_ALLOC_HOOK_DECLARE;
00341 
00342 protected:
00343   /// Enqueue an <ACE_Message_Block *> in accordance with its priority.
00344   virtual int enqueue_i (ACE_Message_Block *new_item);
00345 
00346   /// Enqueue an <ACE_Message_Block *> in accordance with its deadline time.
00347   virtual int enqueue_deadline_i (ACE_Message_Block *new_item);
00348 
00349   /// Enqueue an <ACE_Message_Block *> at the end of the queue.
00350   virtual int enqueue_tail_i (ACE_Message_Block *new_item);
00351 
00352   /// Enqueue an <ACE_Message_Block *> at the head of the queue.
00353   virtual int enqueue_head_i (ACE_Message_Block *new_item);
00354 
00355   /// Dequeue and return the <ACE_Message_Block *> at the head of the
00356   /// queue.
00357   virtual int dequeue_head_i (ACE_Message_Block *&first_item);
00358 
00359   /// Dequeue and return the <ACE_Message_Block *> with the lowest
00360   /// priority.
00361   virtual int dequeue_prio_i (ACE_Message_Block *&dequeued);
00362 
00363   /// Dequeue and return the <ACE_Message_Block *> at the tail of the
00364   /// queue.
00365   virtual int dequeue_tail_i (ACE_Message_Block *&dequeued);
00366 
00367   /// Dequeue and return the <ACE_Message_Block *> that has the lowest
00368   /// deadline time.
00369   virtual int dequeue_deadline_i (ACE_Message_Block *&dequeued);
00370 
00371   // = Check the boundary conditions (assumes locks are held).
00372   /// True if queue is full, else false.
00373   virtual int is_full_i (void);
00374 
00375   /// True if queue is empty, else false.
00376   virtual int is_empty_i (void);
00377 
00378   // = Implementation of public <activate>/<deactivate> methods above.
00379 
00380   // These methods assume locks are held.
00381 
00382   // = Helper methods to factor out common #ifdef code.
00383   /// Wait for the queue to become non-full.
00384   virtual int wait_not_full_cond (ACE_Guard<ACE_Null_Mutex> &mon,
00385                                   ACE_Time_Value *tv);
00386 
00387   /// Wait for the queue to become non-empty.
00388   virtual int wait_not_empty_cond (ACE_Guard<ACE_Null_Mutex> &mon,
00389                                    ACE_Time_Value *tv);
00390 
00391   /// Inform any threads waiting to enqueue that they can procede.
00392   virtual int signal_enqueue_waiters (void);
00393 
00394   /// Inform any threads waiting to dequeue that they can procede.
00395   virtual int signal_dequeue_waiters (void);
00396 
00397   /// Access the underlying msgQ.
00398   MSG_Q_ID msgq (void);
00399 
00400 private:
00401 
00402   // Disallow copying and assignment.
00403   ACE_Message_Queue_Vx (const ACE_Message_Queue_Vx &);
00404   void operator= (const ACE_Message_Queue_Vx &);
00405 
00406   ACE_UNIMPLEMENTED_FUNC (virtual int peek_dequeue_head
00407                             (ACE_Message_Block *&first_item,
00408                              ACE_Time_Value *tv = 0))
00409 
00410 private:
00411   /// Maximum number of messages that can be queued.
00412   int max_messages_;
00413 
00414   /// Maximum message size, in bytes.
00415   int max_message_length_;
00416 
00417   /// Native message queue options.
00418   int options_;
00419 
00420 };
00421 
00422 ACE_END_VERSIONED_NAMESPACE_DECL
00423 
00424 #endif /* ACE_VXWORKS */
00425 
00426 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00427 
00428 #if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0)
00429 /**
00430  * @class ACE_Message_Queue_NT
00431  *
00432  * @brief Message Queue implementation using IO completion port on NT.
00433  *
00434  * Implementation of a strip-downed ACE_Message_Queue using NT's
00435  * IO completion port mechanism.
00436  * @note *Many* ACE_Message_Queue features are not supported with
00437  * this implementation, including:
00438  * * <open> method have different signatures.
00439  * * <dequeue_head> *requires* that the ACE_Message_Block
00440  * pointer argument point to an ACE_Message_Block that was
00441  * allocated by the caller.
00442  * * <peek_dequeue_head>.
00443  * * <ACE_Message_Queue_Iterators>.
00444  * * No flow control.
00445  */
00446 class ACE_Export ACE_Message_Queue_NT : public ACE_Message_Queue_Base
00447 {
00448 public:
00449   // = Initialization and termination methods.
00450   ACE_Message_Queue_NT (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM);
00451 
00452   /**
00453    * Initialize the Message Queue by creating a new NT I/O completion
00454    * port.  The first arguemnt specifies the number of threads
00455    * released by the MQ that are allowed to run concurrently.  Return
00456    * 0 when succeeds, -1 otherwise.
00457    */
00458   virtual int open (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM);
00459 
00460   /// Close down the underlying I/O completion port.  You need to
00461   /// re-open the MQ after this function is executed.
00462   virtual int close (void);
00463 
00464   /// Close down the message queue and release all resources.
00465   virtual ~ACE_Message_Queue_NT (void);
00466 
00467   // = Enqueue and dequeue methods.
00468 
00469   /**
00470    * Enqueue an <ACE_Message_Block *> at the end of the queue.
00471    * Returns -1 on failure, else the number of items still on the
00472    * queue.
00473    */
00474   virtual int enqueue_tail (ACE_Message_Block *new_item,
00475                             ACE_Time_Value *timeout = 0);
00476   virtual int enqueue (ACE_Message_Block *new_item,
00477                        ACE_Time_Value *timeout = 0);
00478 
00479   /**
00480    * Dequeue and return the <ACE_Message_Block *> at the head of the
00481    * queue.  Returns -1 on failure, else the number of items still on
00482    * the queue.
00483    */
00484   virtual int dequeue_head (ACE_Message_Block *&first_item,
00485                             ACE_Time_Value *timeout = 0);
00486   virtual int dequeue (ACE_Message_Block *&first_item,
00487                        ACE_Time_Value *timeout = 0);
00488 
00489   // = Check if queue is full/empty.
00490   /**
00491    * Always return false.
00492    */
00493 
00494   virtual int is_full (void);
00495   /**
00496    * True if queue is empty, else false.  Notice the return value is
00497    * only transient.
00498    */
00499   virtual int is_empty (void);
00500 
00501   // = Queue statistic methods (transient.)
00502   /**
00503    * Number of total bytes on the queue, i.e., sum of the message
00504    * block sizes.
00505    */
00506   virtual size_t message_bytes (void);
00507 
00508   /**
00509    * Number of total length on the queue, i.e., sum of the message
00510    * block lengths.
00511    */
00512   virtual size_t message_length (void);
00513 
00514   /**
00515    * Number of total messages on the queue.
00516    */
00517   virtual size_t message_count (void);
00518 
00519   // = Manual changes to these stats (used when queued message blocks
00520   // change size or lengths).
00521   /**
00522    * New value of the number of total bytes on the queue, i.e., sum of
00523    * the message block sizes.
00524    */
00525   virtual void message_bytes (size_t new_size);
00526 
00527   /**
00528    * New value of the number of total length on the queue, i.e., sum
00529    * of the message block lengths.
00530    */
00531   virtual void message_length (size_t new_length);
00532 
00533   /// Get the max concurrent thread number.
00534   virtual DWORD max_threads (void);
00535 
00536   // = Activation control methods.
00537 
00538   /**
00539    * Deactivate the queue and wake up all threads waiting on the queue
00540    * so they can continue.  No messages are removed from the queue,
00541    * however.  Any other operations called until the queue is
00542    * activated again will immediately return -1 with @c errno
00543    * ESHUTDOWN.
00544    *
00545    * @retval  The queue's state before this call.
00546    */
00547   virtual int deactivate (void);
00548 
00549   /**
00550    * Reactivate the queue so that threads can enqueue and dequeue
00551    * messages again.  Returns the state of the queue before the call.
00552    */
00553   virtual int activate (void);
00554 
00555   /**
00556    * Pulse the queue to wake up any waiting threads.  Changes the
00557    * queue state to PULSED; future enqueue/dequeue operations proceed
00558    * as in ACTIVATED state.
00559    *
00560    * @retval  The queue's state before this call.
00561    */
00562   virtual int pulse (void);
00563 
00564   /// Returns true if the state of the queue is <DEACTIVATED>,
00565   /// but false if the queue's is <ACTIVATED> or <PULSED>.
00566   virtual int deactivated (void);
00567 
00568   // = Not currently implemented...
00569   int peek_dequeue_head (ACE_Message_Block *&first_item,
00570                          ACE_Time_Value *timeout = 0);
00571   ACE_Notification_Strategy *notification_strategy (void);
00572   void notification_strategy (ACE_Notification_Strategy *s);
00573 
00574   // = Notification hook.
00575 
00576   /// Dump the state of an object.
00577   virtual void dump (void) const;
00578 
00579   /// Get the handle to the underlying completion port.
00580   virtual ACE_HANDLE completion_port (void);
00581 
00582   /// Declare the dynamic allocation hooks.
00583   ACE_ALLOC_HOOK_DECLARE;
00584 
00585 private:
00586 
00587   // Disallow copying and assignment.
00588   ACE_Message_Queue_NT (const ACE_Message_Queue_NT &);
00589   void operator= (const ACE_Message_Queue_NT &);
00590 
00591 private:
00592   // = Internal states.
00593 
00594   /// Maximum threads that can be released (and run) concurrently.
00595   DWORD max_cthrs_;
00596 
00597   /// Current number of threads waiting to dequeue messages.
00598   DWORD cur_thrs_;
00599 
00600   /// Current number of bytes in queue.
00601   size_t cur_bytes_;
00602 
00603   /// Current length of messages in queue.
00604   size_t cur_length_;
00605 
00606   /// Current number of messages in the queue.
00607   size_t cur_count_;
00608 
00609   /**
00610    * Synchronizer.  This should really be an ACE_Recursive_Thread_Mutex
00611    * but since this class is only supported on NT, it's okay to use
00612    * ACE_Thread_Mutex here.
00613    */
00614   ACE_Thread_Mutex lock_;
00615 
00616   /// Underlying NT IoCompletionPort.
00617   ACE_HANDLE completion_port_;
00618 
00619 };
00620 #endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */
00621 
00622 ACE_END_VERSIONED_NAMESPACE_DECL
00623 
00624 #if defined (__ACE_INLINE__)
00625 #include "ace/Message_Queue.inl"
00626 #endif /* __ACE_INLINE__ */
00627 
00628 #include /**/ "ace/post.h"
00629 #endif /* ACE_MESSAGE_QUEUE_H */

Generated on Thu Nov 9 09:41:56 2006 for ACE by doxygen 1.3.6