00001 // -*- C++ -*- 00002 00003 //============================================================================= 00004 /** 00005 * @file Message_Queue.h 00006 * 00007 * $Id: Message_Queue.h 78651 2007-06-29 12:14:31Z johnnyw $ 00008 * 00009 * @author Douglas C. Schmidt <schmidt@cs.wustl.edu> 00010 */ 00011 //============================================================================= 00012 00013 #ifndef ACE_MESSAGE_QUEUE_H 00014 #define ACE_MESSAGE_QUEUE_H 00015 #include /**/ "ace/pre.h" 00016 00017 #include "ace/Message_Block.h" 00018 00019 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00020 # pragma once 00021 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00022 00023 #include "ace/IO_Cntl_Msg.h" 00024 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) 00025 # include "ace/Synch_Traits.h" /* Needed in ACE_Message_Queue_NT */ 00026 # include "ace/Thread_Mutex.h" /* Needed in ACE_Message_Queue_NT */ 00027 #endif 00028 00029 ACE_BEGIN_VERSIONED_NAMESPACE_DECL 00030 00031 // Forward decls. 00032 class ACE_Notification_Strategy; 00033 template <ACE_SYNCH_DECL> class ACE_Message_Queue_Iterator; 00034 template <ACE_SYNCH_DECL> class ACE_Message_Queue_Reverse_Iterator; 00035 00036 /** 00037 * @class ACE_Message_Queue_Base 00038 * 00039 * @brief Base class for ACE_Message_Queue, which is the central 00040 * queueing facility for messages in the ACE framework. 00041 * 00042 * For all the ACE_Time_Value pointer parameters the caller will 00043 * block until action is possible if @a timeout == 0. Otherwise, it 00044 * will wait until the absolute time specified in *@a timeout 00045 * elapses. 00046 * 00047 * A queue is always in one of three states: 00048 * . ACTIVATED 00049 * . DEACTIVATED 00050 * . PULSED 00051 */ 00052 class ACE_Export ACE_Message_Queue_Base 00053 { 00054 public: 00055 enum 00056 { 00057 // Default high and low watermarks. 00058 00059 /// Default high watermark (16 K). 00060 DEFAULT_HWM = 16 * 1024, 00061 /// Default low watermark (same as high water mark). 00062 DEFAULT_LWM = 16 * 1024, 00063 00064 // Queue states. Before PULSED state was added, the activate() 00065 // and deactivate() methods returned WAS_INACTIVE or WAS_ACTIVE 00066 // to indicate the previous condition. Now those methods 00067 // return the state the queue was previously in. WAS_ACTIVE 00068 // and WAS_INACTIVE are defined to match previous semantics for 00069 // applications that don't use the PULSED state. 00070 00071 /// @deprecated Use ACTIVATED instead. 00072 WAS_ACTIVE = 1, 00073 /// Message queue is active and processing normally 00074 ACTIVATED = 1, 00075 00076 /// @deprecated Use DEACTIVATED instead. 00077 WAS_INACTIVE = 2, 00078 /// Queue is deactivated; no enqueue or dequeue operations allowed. 00079 DEACTIVATED = 2, 00080 00081 /// Message queue was pulsed; enqueue and dequeue may proceed normally. 00082 PULSED = 3 00083 00084 }; 00085 00086 ACE_Message_Queue_Base (void); 00087 00088 /// Close down the message queue and release all resources. 00089 virtual int close (void) = 0; 00090 00091 /// Close down the message queue and release all resources. 00092 virtual ~ACE_Message_Queue_Base (void); 00093 00094 // = Enqueue and dequeue methods. 00095 00096 /** 00097 * Retrieve the first ACE_Message_Block without removing it. Note 00098 * that @a timeout uses <{absolute}> time rather than <{relative}> 00099 * time. If the @a timeout elapses without receiving a message -1 is 00100 * returned and @c errno is set to @c EWOULDBLOCK. If the queue is 00101 * deactivated -1 is returned and @c errno is set to <ESHUTDOWN>. 00102 * Otherwise, returns -1 on failure, else the number of items still 00103 * on the queue. 00104 */ 00105 virtual int peek_dequeue_head (ACE_Message_Block *&first_item, 00106 ACE_Time_Value *timeout = 0) = 0; 00107 00108 /** 00109 * Enqueue a <ACE_Message_Block *> into the tail of the queue. 00110 * Returns number of items in queue if the call succeeds or -1 00111 * otherwise. These calls return -1 when queue is closed, 00112 * deactivated (in which case @c errno == <ESHUTDOWN>), when a signal 00113 * occurs (in which case @c errno == <EINTR>, or if the time 00114 * specified in timeout elapses (in which case @c errno == 00115 * @c EWOULDBLOCK). 00116 */ 00117 virtual int enqueue_tail (ACE_Message_Block *new_item, 00118 ACE_Time_Value *timeout = 0) = 0; 00119 virtual int enqueue (ACE_Message_Block *new_item, 00120 ACE_Time_Value *timeout = 0) = 0; 00121 00122 /** 00123 * Dequeue and return the <ACE_Message_Block *> at the head of the 00124 * queue. Returns number of items in queue if the call succeeds or 00125 * -1 otherwise. These calls return -1 when queue is closed, 00126 * deactivated (in which case @c errno == <ESHUTDOWN>), when a signal 00127 * occurs (in which case @c errno == <EINTR>, or if the time 00128 * specified in timeout elapses (in which case @c errno == 00129 * @c EWOULDBLOCK). 00130 */ 00131 virtual int dequeue_head (ACE_Message_Block *&first_item, 00132 ACE_Time_Value *timeout = 0) = 0; 00133 virtual int dequeue (ACE_Message_Block *&first_item, 00134 ACE_Time_Value *timeout = 0) = 0; 00135 00136 // = Check if queue is full/empty. 00137 /// True if queue is full, else false. 00138 virtual int is_full (void) = 0; 00139 00140 /// True if queue is empty, else false. 00141 virtual int is_empty (void) = 0; 00142 00143 // = Queue statistic methods. 00144 00145 /// Number of total bytes on the queue, i.e., sum of the message 00146 /// block sizes. 00147 virtual size_t message_bytes (void) = 0; 00148 00149 /// Number of total length on the queue, i.e., sum of the message 00150 /// block lengths. 00151 virtual size_t message_length (void) = 0; 00152 00153 /// Number of total messages on the queue. 00154 virtual size_t message_count (void) = 0; 00155 00156 /// New value of the number of total bytes on the queue, i.e., 00157 /// sum of the message block sizes. 00158 virtual void message_bytes (size_t new_size) = 0; 00159 00160 /// New value of the number of total length on the queue, i.e., 00161 /// sum of the message block lengths. 00162 virtual void message_length (size_t new_length) = 0; 00163 00164 // = Activation control methods. 00165 00166 /** 00167 * Deactivate the queue and wake up all threads waiting on the queue 00168 * so they can continue. No messages are removed from the queue, 00169 * however. Any other operations called until the queue is 00170 * activated again will immediately return -1 with @c errno 00171 * ESHUTDOWN. 00172 * 00173 * @retval The queue's state before this call. 00174 */ 00175 virtual int deactivate (void) = 0; 00176 00177 /** 00178 * Reactivate the queue so that threads can enqueue and dequeue 00179 * messages again. 00180 * 00181 * @retval The queue's state before this call. 00182 */ 00183 virtual int activate (void) = 0; 00184 00185 /** 00186 * Pulse the queue to wake up any waiting threads. Changes the 00187 * queue state to PULSED; future enqueue/dequeue operations proceed 00188 * as in ACTIVATED state. 00189 * 00190 * @retval The queue's state before this call. 00191 */ 00192 virtual int pulse (void) = 0; 00193 00194 /// Returns the current state of the queue. 00195 virtual int state (void); 00196 00197 /// Returns 1 if the state of the queue is DEACTIVATED, 00198 /// and 0 if the queue's state is ACTIVATED or PULSED. 00199 virtual int deactivated (void) = 0; 00200 00201 /// Get the notification strategy for the <Message_Queue> 00202 virtual ACE_Notification_Strategy *notification_strategy (void) = 0; 00203 00204 /// Set the notification strategy for the <Message_Queue> 00205 virtual void notification_strategy (ACE_Notification_Strategy *s) = 0; 00206 00207 // = Notification hook. 00208 00209 /// Dump the state of an object. 00210 virtual void dump (void) const = 0; 00211 00212 /// Declare the dynamic allocation hooks. 00213 ACE_ALLOC_HOOK_DECLARE; 00214 00215 private: 00216 // = Disallow copying and assignment. 00217 ACE_Message_Queue_Base (const ACE_Message_Queue_Base &); 00218 void operator= (const ACE_Message_Queue_Base &); 00219 00220 protected: 00221 /// Indicates the state of the queue, which can be 00222 /// <ACTIVATED>, <DEACTIVATED>, or <PULSED>. 00223 int state_; 00224 00225 }; 00226 00227 ACE_END_VERSIONED_NAMESPACE_DECL 00228 00229 // Include the templates here. 00230 #include "ace/Message_Queue_T.h" 00231 00232 ACE_BEGIN_VERSIONED_NAMESPACE_DECL 00233 00234 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) 00235 /** 00236 * @class ACE_Message_Queue_NT 00237 * 00238 * @brief Message Queue implementation using IO completion port on NT. 00239 * 00240 * Implementation of a strip-downed ACE_Message_Queue using NT's 00241 * IO completion port mechanism. 00242 * @note *Many* ACE_Message_Queue features are not supported with 00243 * this implementation, including: 00244 * * <open> method have different signatures. 00245 * * <dequeue_head> *requires* that the ACE_Message_Block 00246 * pointer argument point to an ACE_Message_Block that was 00247 * allocated by the caller. 00248 * * <peek_dequeue_head>. 00249 * * <ACE_Message_Queue_Iterators>. 00250 * * No flow control. 00251 */ 00252 class ACE_Export ACE_Message_Queue_NT : public ACE_Message_Queue_Base 00253 { 00254 public: 00255 // = Initialization and termination methods. 00256 ACE_Message_Queue_NT (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM); 00257 00258 /** 00259 * Initialize the Message Queue by creating a new NT I/O completion 00260 * port. The first arguemnt specifies the number of threads 00261 * released by the MQ that are allowed to run concurrently. Return 00262 * 0 when succeeds, -1 otherwise. 00263 */ 00264 virtual int open (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM); 00265 00266 /// Close down the underlying I/O completion port. You need to 00267 /// re-open the MQ after this function is executed. 00268 virtual int close (void); 00269 00270 /// Close down the message queue and release all resources. 00271 virtual ~ACE_Message_Queue_NT (void); 00272 00273 // = Enqueue and dequeue methods. 00274 00275 /** 00276 * Enqueue an <ACE_Message_Block *> at the end of the queue. 00277 * Returns -1 on failure, else the number of items still on the 00278 * queue. 00279 */ 00280 virtual int enqueue_tail (ACE_Message_Block *new_item, 00281 ACE_Time_Value *timeout = 0); 00282 virtual int enqueue (ACE_Message_Block *new_item, 00283 ACE_Time_Value *timeout = 0); 00284 00285 /** 00286 * Dequeue and return the <ACE_Message_Block *> at the head of the 00287 * queue. Returns -1 on failure, else the number of items still on 00288 * the queue. 00289 */ 00290 virtual int dequeue_head (ACE_Message_Block *&first_item, 00291 ACE_Time_Value *timeout = 0); 00292 virtual int dequeue (ACE_Message_Block *&first_item, 00293 ACE_Time_Value *timeout = 0); 00294 00295 // = Check if queue is full/empty. 00296 /** 00297 * Always return false. 00298 */ 00299 00300 virtual int is_full (void); 00301 /** 00302 * True if queue is empty, else false. Notice the return value is 00303 * only transient. 00304 */ 00305 virtual int is_empty (void); 00306 00307 // = Queue statistic methods (transient.) 00308 /** 00309 * Number of total bytes on the queue, i.e., sum of the message 00310 * block sizes. 00311 */ 00312 virtual size_t message_bytes (void); 00313 00314 /** 00315 * Number of total length on the queue, i.e., sum of the message 00316 * block lengths. 00317 */ 00318 virtual size_t message_length (void); 00319 00320 /** 00321 * Number of total messages on the queue. 00322 */ 00323 virtual size_t message_count (void); 00324 00325 // = Manual changes to these stats (used when queued message blocks 00326 // change size or lengths). 00327 /** 00328 * New value of the number of total bytes on the queue, i.e., sum of 00329 * the message block sizes. 00330 */ 00331 virtual void message_bytes (size_t new_size); 00332 00333 /** 00334 * New value of the number of total length on the queue, i.e., sum 00335 * of the message block lengths. 00336 */ 00337 virtual void message_length (size_t new_length); 00338 00339 /// Get the max concurrent thread number. 00340 virtual DWORD max_threads (void); 00341 00342 // = Activation control methods. 00343 00344 /** 00345 * Deactivate the queue and wake up all threads waiting on the queue 00346 * so they can continue. No messages are removed from the queue, 00347 * however. Any other operations called until the queue is 00348 * activated again will immediately return -1 with @c errno 00349 * ESHUTDOWN. 00350 * 00351 * @retval The queue's state before this call. 00352 */ 00353 virtual int deactivate (void); 00354 00355 /** 00356 * Reactivate the queue so that threads can enqueue and dequeue 00357 * messages again. Returns the state of the queue before the call. 00358 */ 00359 virtual int activate (void); 00360 00361 /** 00362 * Pulse the queue to wake up any waiting threads. Changes the 00363 * queue state to PULSED; future enqueue/dequeue operations proceed 00364 * as in ACTIVATED state. 00365 * 00366 * @retval The queue's state before this call. 00367 */ 00368 virtual int pulse (void); 00369 00370 /// Returns true if the state of the queue is <DEACTIVATED>, 00371 /// but false if the queue's is <ACTIVATED> or <PULSED>. 00372 virtual int deactivated (void); 00373 00374 // = Not currently implemented... 00375 int peek_dequeue_head (ACE_Message_Block *&first_item, 00376 ACE_Time_Value *timeout = 0); 00377 ACE_Notification_Strategy *notification_strategy (void); 00378 void notification_strategy (ACE_Notification_Strategy *s); 00379 00380 // = Notification hook. 00381 00382 /// Dump the state of an object. 00383 virtual void dump (void) const; 00384 00385 /// Get the handle to the underlying completion port. 00386 virtual ACE_HANDLE completion_port (void); 00387 00388 /// Declare the dynamic allocation hooks. 00389 ACE_ALLOC_HOOK_DECLARE; 00390 00391 private: 00392 00393 // Disallow copying and assignment. 00394 ACE_Message_Queue_NT (const ACE_Message_Queue_NT &); 00395 void operator= (const ACE_Message_Queue_NT &); 00396 00397 private: 00398 // = Internal states. 00399 00400 /// Maximum threads that can be released (and run) concurrently. 00401 DWORD max_cthrs_; 00402 00403 /// Current number of threads waiting to dequeue messages. 00404 DWORD cur_thrs_; 00405 00406 /// Current number of bytes in queue. 00407 size_t cur_bytes_; 00408 00409 /// Current length of messages in queue. 00410 size_t cur_length_; 00411 00412 /// Current number of messages in the queue. 00413 size_t cur_count_; 00414 00415 /** 00416 * Synchronizer. This should really be an ACE_Recursive_Thread_Mutex 00417 * but since this class is only supported on NT, it's okay to use 00418 * ACE_Thread_Mutex here. 00419 */ 00420 ACE_SYNCH_MUTEX lock_; 00421 00422 /// Underlying NT IoCompletionPort. 00423 ACE_HANDLE completion_port_; 00424 00425 }; 00426 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */ 00427 00428 ACE_END_VERSIONED_NAMESPACE_DECL 00429 00430 #if defined (__ACE_INLINE__) 00431 #include "ace/Message_Queue.inl" 00432 #endif /* __ACE_INLINE__ */ 00433 00434 #include /**/ "ace/post.h" 00435 #endif /* ACE_MESSAGE_QUEUE_H */