00001 // -*- C++ -*- 00002 00003 //============================================================================= 00004 /** 00005 * @file Message_Queue_NT.h 00006 * 00007 * $Id: Message_Queue_NT.h 82723 2008-09-16 09:35:44Z johnnyw $ 00008 * 00009 * @author Douglas C. Schmidt <schmidt@cs.wustl.edu> 00010 */ 00011 //============================================================================= 00012 00013 #ifndef ACE_MESSAGE_QUEUE_NT_H 00014 #define ACE_MESSAGE_QUEUE_NT_H 00015 #include /**/ "ace/pre.h" 00016 00017 #include "ace/Message_Queue.h" 00018 00019 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00020 # pragma once 00021 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00022 00023 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) 00024 # include "ace/Synch_Traits.h" /* Needed in ACE_Message_Queue_NT */ 00025 # include "ace/Thread_Mutex.h" /* Needed in ACE_Message_Queue_NT */ 00026 #endif 00027 00028 ACE_BEGIN_VERSIONED_NAMESPACE_DECL 00029 00030 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) 00031 /** 00032 * @class ACE_Message_Queue_NT 00033 * 00034 * @brief Message Queue implementation using IO completion port on NT. 00035 * 00036 * Implementation of a strip-downed ACE_Message_Queue using NT's 00037 * IO completion port mechanism. 00038 * @note *Many* ACE_Message_Queue features are not supported with 00039 * this implementation, including: 00040 * * <open> method have different signatures. 00041 * * <dequeue_head> *requires* that the ACE_Message_Block 00042 * pointer argument point to an ACE_Message_Block that was 00043 * allocated by the caller. 00044 * * <peek_dequeue_head>. 00045 * * <ACE_Message_Queue_Iterators>. 00046 * * No flow control. 00047 */ 00048 class ACE_Export ACE_Message_Queue_NT : public ACE_Message_Queue_Base 00049 { 00050 public: 00051 // = Initialization and termination methods. 00052 ACE_Message_Queue_NT (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM); 00053 00054 /** 00055 * Initialize the Message Queue by creating a new NT I/O completion 00056 * port. The first arguemnt specifies the number of threads 00057 * released by the MQ that are allowed to run concurrently. Return 00058 * 0 when succeeds, -1 otherwise. 00059 */ 00060 virtual int open (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM); 00061 00062 /// Close down the underlying I/O completion port. You need to 00063 /// re-open the MQ after this function is executed. 00064 virtual int close (void); 00065 00066 /// Close down the message queue and release all resources. 00067 virtual ~ACE_Message_Queue_NT (void); 00068 00069 // = Enqueue and dequeue methods. 00070 00071 /** 00072 * Enqueue an ACE_Message_Block * at the end of the queue. 00073 * Returns -1 on failure, else the number of items still on the 00074 * queue. 00075 */ 00076 virtual int enqueue_tail (ACE_Message_Block *new_item, 00077 ACE_Time_Value *timeout = 0); 00078 virtual int enqueue (ACE_Message_Block *new_item, 00079 ACE_Time_Value *timeout = 0); 00080 00081 /** 00082 * Dequeue and return the ACE_Message_Block * at the head of the 00083 * queue. Returns -1 on failure, else the number of items still on 00084 * the queue. 00085 */ 00086 virtual int dequeue_head (ACE_Message_Block *&first_item, 00087 ACE_Time_Value *timeout = 0); 00088 virtual int dequeue (ACE_Message_Block *&first_item, 00089 ACE_Time_Value *timeout = 0); 00090 00091 // = Check if queue is full/empty. 00092 /** 00093 * Always return false. 00094 */ 00095 00096 virtual bool is_full (void); 00097 /** 00098 * True if queue is empty, else false. Notice the return value is 00099 * only transient. 00100 */ 00101 virtual bool is_empty (void); 00102 00103 // = Queue statistic methods (transient.) 00104 /** 00105 * Number of total bytes on the queue, i.e., sum of the message 00106 * block sizes. 00107 */ 00108 virtual size_t message_bytes (void); 00109 00110 /** 00111 * Number of total length on the queue, i.e., sum of the message 00112 * block lengths. 00113 */ 00114 virtual size_t message_length (void); 00115 00116 /** 00117 * Number of total messages on the queue. 00118 */ 00119 virtual size_t message_count (void); 00120 00121 // = Manual changes to these stats (used when queued message blocks 00122 // change size or lengths). 00123 /** 00124 * New value of the number of total bytes on the queue, i.e., sum of 00125 * the message block sizes. 00126 */ 00127 virtual void message_bytes (size_t new_size); 00128 00129 /** 00130 * New value of the number of total length on the queue, i.e., sum 00131 * of the message block lengths. 00132 */ 00133 virtual void message_length (size_t new_length); 00134 00135 /// Get the max concurrent thread number. 00136 virtual DWORD max_threads (void); 00137 00138 // = Activation control methods. 00139 00140 /** 00141 * Deactivate the queue and wake up all threads waiting on the queue 00142 * so they can continue. No messages are removed from the queue, 00143 * however. Any other operations called until the queue is 00144 * activated again will immediately return -1 with @c errno 00145 * ESHUTDOWN. 00146 * 00147 * @retval The queue's state before this call. 00148 */ 00149 virtual int deactivate (void); 00150 00151 /** 00152 * Reactivate the queue so that threads can enqueue and dequeue 00153 * messages again. Returns the state of the queue before the call. 00154 */ 00155 virtual int activate (void); 00156 00157 /** 00158 * Pulse the queue to wake up any waiting threads. Changes the 00159 * queue state to PULSED; future enqueue/dequeue operations proceed 00160 * as in ACTIVATED state. 00161 * 00162 * @retval The queue's state before this call. 00163 */ 00164 virtual int pulse (void); 00165 00166 /// Returns true if the state of the queue is <DEACTIVATED>, 00167 /// but false if the queue's is <ACTIVATED> or <PULSED>. 00168 virtual int deactivated (void); 00169 00170 // = Not currently implemented... 00171 int peek_dequeue_head (ACE_Message_Block *&first_item, 00172 ACE_Time_Value *timeout = 0); 00173 ACE_Notification_Strategy *notification_strategy (void); 00174 void notification_strategy (ACE_Notification_Strategy *s); 00175 00176 // = Notification hook. 00177 00178 /// Dump the state of an object. 00179 virtual void dump (void) const; 00180 00181 /// Get the handle to the underlying completion port. 00182 virtual ACE_HANDLE completion_port (void); 00183 00184 /// Declare the dynamic allocation hooks. 00185 ACE_ALLOC_HOOK_DECLARE; 00186 00187 private: 00188 00189 // Disallow copying and assignment. 00190 ACE_Message_Queue_NT (const ACE_Message_Queue_NT &); 00191 void operator= (const ACE_Message_Queue_NT &); 00192 00193 private: 00194 // = Internal states. 00195 00196 /// Maximum threads that can be released (and run) concurrently. 00197 DWORD max_cthrs_; 00198 00199 /// Current number of threads waiting to dequeue messages. 00200 DWORD cur_thrs_; 00201 00202 /// Current number of bytes in queue. 00203 size_t cur_bytes_; 00204 00205 /// Current length of messages in queue. 00206 size_t cur_length_; 00207 00208 /// Current number of messages in the queue. 00209 size_t cur_count_; 00210 00211 /** 00212 * Synchronizer. This should really be an ACE_Recursive_Thread_Mutex 00213 * but since this class is only supported on NT, it's okay to use 00214 * ACE_Thread_Mutex here. 00215 */ 00216 ACE_SYNCH_MUTEX lock_; 00217 00218 /// Underlying NT IoCompletionPort. 00219 ACE_HANDLE completion_port_; 00220 00221 }; 00222 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */ 00223 00224 ACE_END_VERSIONED_NAMESPACE_DECL 00225 00226 #if defined (__ACE_INLINE__) 00227 #include "ace/Message_Queue_NT.inl" 00228 #endif /* __ACE_INLINE__ */ 00229 00230 #include /**/ "ace/post.h" 00231 #endif /* ACE_MESSAGE_QUEUE_NT_H */