00001 // -*- C++ -*- 00002 00003 //============================================================================= 00004 /** 00005 * @file Message_Queue.h 00006 * 00007 * Message_Queue.h,v 4.87 2006/05/30 10:57:22 jwillemsen Exp 00008 * 00009 * @author Douglas C. Schmidt <schmidt@cs.wustl.edu> 00010 */ 00011 //============================================================================= 00012 00013 #ifndef ACE_MESSAGE_QUEUE_H 00014 #define ACE_MESSAGE_QUEUE_H 00015 #include /**/ "ace/pre.h" 00016 00017 #include "ace/Message_Block.h" 00018 00019 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00020 # pragma once 00021 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00022 00023 #include "ace/IO_Cntl_Msg.h" 00024 #if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0) 00025 # include "ace/Thread_Mutex.h" /* Needed in ACE_Message_Queue_NT */ 00026 #endif 00027 00028 ACE_BEGIN_VERSIONED_NAMESPACE_DECL 00029 00030 // Forward decls. 00031 class ACE_Notification_Strategy; 00032 template <ACE_SYNCH_DECL> class ACE_Message_Queue_Iterator; 00033 template <ACE_SYNCH_DECL> class ACE_Message_Queue_Reverse_Iterator; 00034 00035 /** 00036 * @class ACE_Message_Queue_Base 00037 * 00038 * @brief Base class for ACE_Message_Queue, which is the central 00039 * queueing facility for messages in the ACE framework. 00040 * 00041 * For all the <ACE_Time_Value> pointer parameters the caller will 00042 * block until action is possible if <timeout> == 0. Otherwise, it 00043 * will wait until the absolute time specified in *<timeout> 00044 * elapses. 00045 * 00046 * A queue is always in one of three states: 00047 * . ACTIVATED 00048 * . DEACTIVATED 00049 * . PULSED 00050 */ 00051 class ACE_Export ACE_Message_Queue_Base 00052 { 00053 public: 00054 enum 00055 { 00056 // Default high and low watermarks. 00057 00058 /// Default high watermark (16 K). 00059 DEFAULT_HWM = 16 * 1024, 00060 /// Default low watermark (same as high water mark). 00061 DEFAULT_LWM = 16 * 1024, 00062 00063 // Queue states. Before PULSED state was added, the activate() 00064 // and deactivate() methods returned WAS_INACTIVE or WAS_ACTIVE 00065 // to indicate the previous condition. Now those methods 00066 // return the state the queue was previously in. WAS_ACTIVE 00067 // and WAS_INACTIVE are defined to match previous semantics for 00068 // applications that don't use the PULSED state. 00069 00070 /// @deprecated Use ACTIVATED instead. 00071 WAS_ACTIVE = 1, 00072 /// Message queue is active and processing normally 00073 ACTIVATED = 1, 00074 00075 /// @deprecated Use DEACTIVATED instead. 00076 WAS_INACTIVE = 2, 00077 /// Queue is deactivated; no enqueue or dequeue operations allowed. 00078 DEACTIVATED = 2, 00079 00080 /// Message queue was pulsed; enqueue and dequeue may proceed normally. 00081 PULSED = 3 00082 00083 }; 00084 00085 ACE_Message_Queue_Base (void); 00086 00087 /// Close down the message queue and release all resources. 00088 virtual int close (void) = 0; 00089 00090 /// Close down the message queue and release all resources. 00091 virtual ~ACE_Message_Queue_Base (void); 00092 00093 // = Enqueue and dequeue methods. 00094 00095 /** 00096 * Retrieve the first ACE_Message_Block without removing it. Note 00097 * that <timeout> uses <{absolute}> time rather than <{relative}> 00098 * time. If the <timeout> elapses without receiving a message -1 is 00099 * returned and <errno> is set to <EWOULDBLOCK>. If the queue is 00100 * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>. 00101 * Otherwise, returns -1 on failure, else the number of items still 00102 * on the queue. 00103 */ 00104 virtual int peek_dequeue_head (ACE_Message_Block *&first_item, 00105 ACE_Time_Value *timeout = 0) = 0; 00106 00107 /** 00108 * Enqueue a <ACE_Message_Block *> into the tail of the queue. 00109 * Returns number of items in queue if the call succeeds or -1 00110 * otherwise. These calls return -1 when queue is closed, 00111 * deactivated (in which case <errno> == <ESHUTDOWN>), when a signal 00112 * occurs (in which case <errno> == <EINTR>, or if the time 00113 * specified in timeout elapses (in which case <errno> == 00114 * <EWOULDBLOCK>). 00115 */ 00116 virtual int enqueue_tail (ACE_Message_Block *new_item, 00117 ACE_Time_Value *timeout = 0) = 0; 00118 virtual int enqueue (ACE_Message_Block *new_item, 00119 ACE_Time_Value *timeout = 0) = 0; 00120 00121 /** 00122 * Dequeue and return the <ACE_Message_Block *> at the head of the 00123 * queue. Returns number of items in queue if the call succeeds or 00124 * -1 otherwise. These calls return -1 when queue is closed, 00125 * deactivated (in which case <errno> == <ESHUTDOWN>), when a signal 00126 * occurs (in which case <errno> == <EINTR>, or if the time 00127 * specified in timeout elapses (in which case <errno> == 00128 * <EWOULDBLOCK>). 00129 */ 00130 virtual int dequeue_head (ACE_Message_Block *&first_item, 00131 ACE_Time_Value *timeout = 0) = 0; 00132 virtual int dequeue (ACE_Message_Block *&first_item, 00133 ACE_Time_Value *timeout = 0) = 0; 00134 00135 // = Check if queue is full/empty. 00136 /// True if queue is full, else false. 00137 virtual int is_full (void) = 0; 00138 00139 /// True if queue is empty, else false. 00140 virtual int is_empty (void) = 0; 00141 00142 // = Queue statistic methods. 00143 00144 /// Number of total bytes on the queue, i.e., sum of the message 00145 /// block sizes. 00146 virtual size_t message_bytes (void) = 0; 00147 00148 /// Number of total length on the queue, i.e., sum of the message 00149 /// block lengths. 00150 virtual size_t message_length (void) = 0; 00151 00152 /// Number of total messages on the queue. 00153 virtual size_t message_count (void) = 0; 00154 00155 /// New value of the number of total bytes on the queue, i.e., 00156 /// sum of the message block sizes. 00157 virtual void message_bytes (size_t new_size) = 0; 00158 00159 /// New value of the number of total length on the queue, i.e., 00160 /// sum of the message block lengths. 00161 virtual void message_length (size_t new_length) = 0; 00162 00163 // = Activation control methods. 00164 00165 /** 00166 * Deactivate the queue and wake up all threads waiting on the queue 00167 * so they can continue. No messages are removed from the queue, 00168 * however. Any other operations called until the queue is 00169 * activated again will immediately return -1 with @c errno 00170 * ESHUTDOWN. 00171 * 00172 * @retval The queue's state before this call. 00173 */ 00174 virtual int deactivate (void) = 0; 00175 00176 /** 00177 * Reactivate the queue so that threads can enqueue and dequeue 00178 * messages again. 00179 * 00180 * @retval The queue's state before this call. 00181 */ 00182 virtual int activate (void) = 0; 00183 00184 /** 00185 * Pulse the queue to wake up any waiting threads. Changes the 00186 * queue state to PULSED; future enqueue/dequeue operations proceed 00187 * as in ACTIVATED state. 00188 * 00189 * @retval The queue's state before this call. 00190 */ 00191 virtual int pulse (void) = 0; 00192 00193 /// Returns the current state of the queue. 00194 virtual int state (void); 00195 00196 /// Returns 1 if the state of the queue is DEACTIVATED, 00197 /// and 0 if the queue's state is ACTIVATED or PULSED. 00198 virtual int deactivated (void) = 0; 00199 00200 /// Get the notification strategy for the <Message_Queue> 00201 virtual ACE_Notification_Strategy *notification_strategy (void) = 0; 00202 00203 /// Set the notification strategy for the <Message_Queue> 00204 virtual void notification_strategy (ACE_Notification_Strategy *s) = 0; 00205 00206 // = Notification hook. 00207 00208 /// Dump the state of an object. 00209 virtual void dump (void) const = 0; 00210 00211 /// Declare the dynamic allocation hooks. 00212 ACE_ALLOC_HOOK_DECLARE; 00213 00214 private: 00215 // = Disallow copying and assignment. 00216 ACE_Message_Queue_Base (const ACE_Message_Queue_Base &); 00217 void operator= (const ACE_Message_Queue_Base &); 00218 00219 protected: 00220 /// Indicates the state of the queue, which can be 00221 /// <ACTIVATED>, <DEACTIVATED>, or <PULSED>. 00222 int state_; 00223 00224 }; 00225 00226 ACE_END_VERSIONED_NAMESPACE_DECL 00227 00228 // Include the templates here. 00229 #include "ace/Message_Queue_T.h" 00230 00231 #if defined (ACE_VXWORKS) 00232 # include /**/ <msgQLib.h> 00233 # include "ace/Null_Mutex.h" 00234 # include "ace/Null_Condition.h" 00235 00236 ACE_BEGIN_VERSIONED_NAMESPACE_DECL 00237 00238 /** 00239 * @class ACE_Message_Queue_Vx 00240 * 00241 * @brief Wrapper for VxWorks message queues. 00242 * 00243 * Specialization of ACE_Message_Queue to simply wrap VxWorks 00244 * MsgQ. It does not use any synchronization, because it relies 00245 * on the native MsgQ implementation to take care of that. The 00246 * only system calls that it uses are VxWorks msgQLib calls, so 00247 * it is suitable for use in interrupt service routines. 00248 * @note *Many* ACE_Message_Queue features are not supported with 00249 * this specialization, including: 00250 * * The two size arguments to the constructor and <open> are 00251 * interpreted differently. The first is interpreted as the 00252 * maximum number of bytes in a message. The second is 00253 * interpreted as the maximum number of messages that can be 00254 * queued. 00255 * * <dequeue_head> *requires* that the ACE_Message_Block 00256 * pointer argument point to an ACE_Message_Block that was 00257 * allocated by the caller. It must be big enough to support 00258 * the received message, without using continuation. The 00259 * pointer argument is not modified. 00260 * * Message priority. MSG_Q_FIFO is hard-coded. 00261 * * enqueue method timeouts. 00262 * * <peek_dequeue_head>. 00263 * * <ACE_Message_Queue_Iterators>. 00264 * * The ability to change low and high water marks after creation. 00265 * * <Message_Block> chains. The continuation field of ACE_Message_Block 00266 * * is ignored; only the first block of a fragment chain is 00267 * * recognized. 00268 */ 00269 class ACE_Message_Queue_Vx : public ACE_Message_Queue<ACE_NULL_SYNCH> 00270 { 00271 public: 00272 // = Initialization and termination methods. 00273 ACE_Message_Queue_Vx (size_t max_messages, 00274 size_t max_message_length, 00275 ACE_Notification_Strategy * = 0); 00276 00277 // Create a message queue with all the defaults. 00278 /// Create a message queue with all the defaults. 00279 virtual int open (size_t max_messages, 00280 size_t max_message_length, 00281 ACE_Notification_Strategy * = 0); 00282 00283 /// Close down the message queue and release all resources. 00284 virtual int close (void); 00285 00286 /// Close down the message queue and release all resources. 00287 virtual ~ACE_Message_Queue_Vx (void); 00288 00289 // = Queue statistic methods. 00290 /** 00291 * Number of total bytes on the queue, i.e., sum of the message 00292 * block sizes. 00293 */ 00294 virtual size_t message_bytes (void); 00295 00296 /** 00297 * Number of total length on the queue, i.e., sum of the message 00298 * block lengths. 00299 */ 00300 virtual size_t message_length (void); 00301 00302 /** 00303 * Number of total messages on the queue. 00304 */ 00305 virtual size_t message_count (void); 00306 00307 // = Manual changes to these stats (used when queued message blocks 00308 // change size or lengths). 00309 /** 00310 * New value of the number of total bytes on the queue, i.e., sum of 00311 * the message block sizes. 00312 */ 00313 virtual void message_bytes (size_t new_size); 00314 /** 00315 * New value of the number of total length on the queue, i.e., sum 00316 * of the message block lengths. 00317 */ 00318 virtual void message_length (size_t new_length); 00319 00320 // = Flow control routines 00321 00322 /// Get high watermark. 00323 virtual size_t high_water_mark (void); 00324 00325 /// Set high watermark. 00326 virtual void high_water_mark (size_t hwm); 00327 00328 /// Get low watermark. 00329 virtual size_t low_water_mark (void); 00330 00331 /// Set low watermark. 00332 virtual void low_water_mark (size_t lwm); 00333 00334 // = Activation control methods. 00335 00336 /// Dump the state of an object. 00337 void dump (void) const; 00338 00339 /// Declare the dynamic allocation hooks. 00340 ACE_ALLOC_HOOK_DECLARE; 00341 00342 protected: 00343 /// Enqueue an <ACE_Message_Block *> in accordance with its priority. 00344 virtual int enqueue_i (ACE_Message_Block *new_item); 00345 00346 /// Enqueue an <ACE_Message_Block *> in accordance with its deadline time. 00347 virtual int enqueue_deadline_i (ACE_Message_Block *new_item); 00348 00349 /// Enqueue an <ACE_Message_Block *> at the end of the queue. 00350 virtual int enqueue_tail_i (ACE_Message_Block *new_item); 00351 00352 /// Enqueue an <ACE_Message_Block *> at the head of the queue. 00353 virtual int enqueue_head_i (ACE_Message_Block *new_item); 00354 00355 /// Dequeue and return the <ACE_Message_Block *> at the head of the 00356 /// queue. 00357 virtual int dequeue_head_i (ACE_Message_Block *&first_item); 00358 00359 /// Dequeue and return the <ACE_Message_Block *> with the lowest 00360 /// priority. 00361 virtual int dequeue_prio_i (ACE_Message_Block *&dequeued); 00362 00363 /// Dequeue and return the <ACE_Message_Block *> at the tail of the 00364 /// queue. 00365 virtual int dequeue_tail_i (ACE_Message_Block *&dequeued); 00366 00367 /// Dequeue and return the <ACE_Message_Block *> that has the lowest 00368 /// deadline time. 00369 virtual int dequeue_deadline_i (ACE_Message_Block *&dequeued); 00370 00371 // = Check the boundary conditions (assumes locks are held). 00372 /// True if queue is full, else false. 00373 virtual int is_full_i (void); 00374 00375 /// True if queue is empty, else false. 00376 virtual int is_empty_i (void); 00377 00378 // = Implementation of public <activate>/<deactivate> methods above. 00379 00380 // These methods assume locks are held. 00381 00382 // = Helper methods to factor out common #ifdef code. 00383 /// Wait for the queue to become non-full. 00384 virtual int wait_not_full_cond (ACE_Guard<ACE_Null_Mutex> &mon, 00385 ACE_Time_Value *tv); 00386 00387 /// Wait for the queue to become non-empty. 00388 virtual int wait_not_empty_cond (ACE_Guard<ACE_Null_Mutex> &mon, 00389 ACE_Time_Value *tv); 00390 00391 /// Inform any threads waiting to enqueue that they can procede. 00392 virtual int signal_enqueue_waiters (void); 00393 00394 /// Inform any threads waiting to dequeue that they can procede. 00395 virtual int signal_dequeue_waiters (void); 00396 00397 /// Access the underlying msgQ. 00398 MSG_Q_ID msgq (void); 00399 00400 private: 00401 00402 // Disallow copying and assignment. 00403 ACE_Message_Queue_Vx (const ACE_Message_Queue_Vx &); 00404 void operator= (const ACE_Message_Queue_Vx &); 00405 00406 ACE_UNIMPLEMENTED_FUNC (virtual int peek_dequeue_head 00407 (ACE_Message_Block *&first_item, 00408 ACE_Time_Value *tv = 0)) 00409 00410 private: 00411 /// Maximum number of messages that can be queued. 00412 int max_messages_; 00413 00414 /// Maximum message size, in bytes. 00415 int max_message_length_; 00416 00417 /// Native message queue options. 00418 int options_; 00419 00420 }; 00421 00422 ACE_END_VERSIONED_NAMESPACE_DECL 00423 00424 #endif /* ACE_VXWORKS */ 00425 00426 ACE_BEGIN_VERSIONED_NAMESPACE_DECL 00427 00428 #if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0) 00429 /** 00430 * @class ACE_Message_Queue_NT 00431 * 00432 * @brief Message Queue implementation using IO completion port on NT. 00433 * 00434 * Implementation of a strip-downed ACE_Message_Queue using NT's 00435 * IO completion port mechanism. 00436 * @note *Many* ACE_Message_Queue features are not supported with 00437 * this implementation, including: 00438 * * <open> method have different signatures. 00439 * * <dequeue_head> *requires* that the ACE_Message_Block 00440 * pointer argument point to an ACE_Message_Block that was 00441 * allocated by the caller. 00442 * * <peek_dequeue_head>. 00443 * * <ACE_Message_Queue_Iterators>. 00444 * * No flow control. 00445 */ 00446 class ACE_Export ACE_Message_Queue_NT : public ACE_Message_Queue_Base 00447 { 00448 public: 00449 // = Initialization and termination methods. 00450 ACE_Message_Queue_NT (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM); 00451 00452 /** 00453 * Initialize the Message Queue by creating a new NT I/O completion 00454 * port. The first arguemnt specifies the number of threads 00455 * released by the MQ that are allowed to run concurrently. Return 00456 * 0 when succeeds, -1 otherwise. 00457 */ 00458 virtual int open (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM); 00459 00460 /// Close down the underlying I/O completion port. You need to 00461 /// re-open the MQ after this function is executed. 00462 virtual int close (void); 00463 00464 /// Close down the message queue and release all resources. 00465 virtual ~ACE_Message_Queue_NT (void); 00466 00467 // = Enqueue and dequeue methods. 00468 00469 /** 00470 * Enqueue an <ACE_Message_Block *> at the end of the queue. 00471 * Returns -1 on failure, else the number of items still on the 00472 * queue. 00473 */ 00474 virtual int enqueue_tail (ACE_Message_Block *new_item, 00475 ACE_Time_Value *timeout = 0); 00476 virtual int enqueue (ACE_Message_Block *new_item, 00477 ACE_Time_Value *timeout = 0); 00478 00479 /** 00480 * Dequeue and return the <ACE_Message_Block *> at the head of the 00481 * queue. Returns -1 on failure, else the number of items still on 00482 * the queue. 00483 */ 00484 virtual int dequeue_head (ACE_Message_Block *&first_item, 00485 ACE_Time_Value *timeout = 0); 00486 virtual int dequeue (ACE_Message_Block *&first_item, 00487 ACE_Time_Value *timeout = 0); 00488 00489 // = Check if queue is full/empty. 00490 /** 00491 * Always return false. 00492 */ 00493 00494 virtual int is_full (void); 00495 /** 00496 * True if queue is empty, else false. Notice the return value is 00497 * only transient. 00498 */ 00499 virtual int is_empty (void); 00500 00501 // = Queue statistic methods (transient.) 00502 /** 00503 * Number of total bytes on the queue, i.e., sum of the message 00504 * block sizes. 00505 */ 00506 virtual size_t message_bytes (void); 00507 00508 /** 00509 * Number of total length on the queue, i.e., sum of the message 00510 * block lengths. 00511 */ 00512 virtual size_t message_length (void); 00513 00514 /** 00515 * Number of total messages on the queue. 00516 */ 00517 virtual size_t message_count (void); 00518 00519 // = Manual changes to these stats (used when queued message blocks 00520 // change size or lengths). 00521 /** 00522 * New value of the number of total bytes on the queue, i.e., sum of 00523 * the message block sizes. 00524 */ 00525 virtual void message_bytes (size_t new_size); 00526 00527 /** 00528 * New value of the number of total length on the queue, i.e., sum 00529 * of the message block lengths. 00530 */ 00531 virtual void message_length (size_t new_length); 00532 00533 /// Get the max concurrent thread number. 00534 virtual DWORD max_threads (void); 00535 00536 // = Activation control methods. 00537 00538 /** 00539 * Deactivate the queue and wake up all threads waiting on the queue 00540 * so they can continue. No messages are removed from the queue, 00541 * however. Any other operations called until the queue is 00542 * activated again will immediately return -1 with @c errno 00543 * ESHUTDOWN. 00544 * 00545 * @retval The queue's state before this call. 00546 */ 00547 virtual int deactivate (void); 00548 00549 /** 00550 * Reactivate the queue so that threads can enqueue and dequeue 00551 * messages again. Returns the state of the queue before the call. 00552 */ 00553 virtual int activate (void); 00554 00555 /** 00556 * Pulse the queue to wake up any waiting threads. Changes the 00557 * queue state to PULSED; future enqueue/dequeue operations proceed 00558 * as in ACTIVATED state. 00559 * 00560 * @retval The queue's state before this call. 00561 */ 00562 virtual int pulse (void); 00563 00564 /// Returns true if the state of the queue is <DEACTIVATED>, 00565 /// but false if the queue's is <ACTIVATED> or <PULSED>. 00566 virtual int deactivated (void); 00567 00568 // = Not currently implemented... 00569 int peek_dequeue_head (ACE_Message_Block *&first_item, 00570 ACE_Time_Value *timeout = 0); 00571 ACE_Notification_Strategy *notification_strategy (void); 00572 void notification_strategy (ACE_Notification_Strategy *s); 00573 00574 // = Notification hook. 00575 00576 /// Dump the state of an object. 00577 virtual void dump (void) const; 00578 00579 /// Get the handle to the underlying completion port. 00580 virtual ACE_HANDLE completion_port (void); 00581 00582 /// Declare the dynamic allocation hooks. 00583 ACE_ALLOC_HOOK_DECLARE; 00584 00585 private: 00586 00587 // Disallow copying and assignment. 00588 ACE_Message_Queue_NT (const ACE_Message_Queue_NT &); 00589 void operator= (const ACE_Message_Queue_NT &); 00590 00591 private: 00592 // = Internal states. 00593 00594 /// Maximum threads that can be released (and run) concurrently. 00595 DWORD max_cthrs_; 00596 00597 /// Current number of threads waiting to dequeue messages. 00598 DWORD cur_thrs_; 00599 00600 /// Current number of bytes in queue. 00601 size_t cur_bytes_; 00602 00603 /// Current length of messages in queue. 00604 size_t cur_length_; 00605 00606 /// Current number of messages in the queue. 00607 size_t cur_count_; 00608 00609 /** 00610 * Synchronizer. This should really be an ACE_Recursive_Thread_Mutex 00611 * but since this class is only supported on NT, it's okay to use 00612 * ACE_Thread_Mutex here. 00613 */ 00614 ACE_Thread_Mutex lock_; 00615 00616 /// Underlying NT IoCompletionPort. 00617 ACE_HANDLE completion_port_; 00618 00619 }; 00620 #endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */ 00621 00622 ACE_END_VERSIONED_NAMESPACE_DECL 00623 00624 #if defined (__ACE_INLINE__) 00625 #include "ace/Message_Queue.inl" 00626 #endif /* __ACE_INLINE__ */ 00627 00628 #include /**/ "ace/post.h" 00629 #endif /* ACE_MESSAGE_QUEUE_H */