00001 /* -*- C++ -*- */ 00002 00003 //============================================================================= 00004 /** 00005 * @file Message_Queue_T.h 00006 * 00007 * $Id: Message_Queue_T.h 79384 2007-08-17 13:13:33Z johnnyw $ 00008 * 00009 * @author Douglas C. Schmidt <schmidt@cs.wustl.edu> 00010 */ 00011 //============================================================================= 00012 00013 #ifndef ACE_MESSAGE_QUEUE_T_H 00014 #define ACE_MESSAGE_QUEUE_T_H 00015 #include /**/ "ace/pre.h" 00016 00017 #include "ace/Message_Queue.h" 00018 #include "ace/Synch_Traits.h" 00019 #include "ace/Guard_T.h" 00020 00021 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00022 # pragma once 00023 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00024 00025 ACE_BEGIN_VERSIONED_NAMESPACE_DECL 00026 00027 #if defined (ACE_VXWORKS) 00028 class ACE_Message_Queue_Vx; 00029 #endif /* defined (ACE_VXWORKS) */ 00030 00031 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) 00032 class ACE_Message_Queue_NT; 00033 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO*/ 00034 00035 /** 00036 * @class ACE_Message_Queue 00037 * 00038 * @brief A message queueing facility with parameterized synchronization 00039 * capability. ACE_Message_Queue is modeled after the queueing facilities 00040 * in System V STREAMs. 00041 * 00042 * ACE_Message_Queue is the primary queueing facility for 00043 * messages in the ACE framework. It's one template argument parameterizes 00044 * the queue's synchronization. The argument specifies a synchronization 00045 * strategy. The two main strategies available for ACE_SYNCH_DECL are: 00046 * -# ACE_MT_SYNCH: all operations are thread-safe 00047 * -# ACE_NULL_SYNCH: no synchronization and no locking overhead 00048 * 00049 * All data passing through ACE_Message_Queue is in the form of 00050 * ACE_Message_Block objects. @sa ACE_Message_Block. 00051 */ 00052 template <ACE_SYNCH_DECL> 00053 class ACE_Message_Queue : public ACE_Message_Queue_Base 00054 { 00055 public: 00056 friend class ACE_Message_Queue_Iterator<ACE_SYNCH_USE>; 00057 friend class ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>; 00058 00059 // = Traits 00060 typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE> 00061 ITERATOR; 00062 typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE> 00063 REVERSE_ITERATOR; 00064 00065 /** 00066 * @name Initialization methods 00067 */ 00068 //@{ 00069 /** 00070 * Initialize an ACE_Message_Queue. 00071 * 00072 * @param hwm High water mark. Determines how many bytes can be stored in a 00073 * queue before it's considered full. Supplier threads must block 00074 * until the queue is no longer full. 00075 * @param lwm Low water mark. Determines how many bytes must be in the queue 00076 * before supplier threads are allowed to enqueue additional 00077 * data. By default, the @a hwm equals @a lwm, which means 00078 * that suppliers will be able to enqueue new messages as soon as 00079 * a consumer removes any message from the queue. Making the low 00080 * water mark smaller than the high water mark forces consumers to 00081 * drain more messages from the queue before suppliers can enqueue 00082 * new messages, which can minimize the "silly window syndrome." 00083 * @param ns Notification strategy. Pointer to an object conforming to the 00084 * ACE_Notification_Strategy interface. If set, the object's 00085 * notify(void) method will be called each time data is added to 00086 * this ACE_Message_Queue. @see ACE_Reactor_Notification_Strategy. 00087 */ 00088 ACE_Message_Queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00089 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00090 ACE_Notification_Strategy *ns = 0); 00091 virtual int open (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00092 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00093 ACE_Notification_Strategy *ns = 0); 00094 //@} 00095 00096 /// Releases all resources from the message queue and marks it deactivated. 00097 /// @sa flush(). 00098 /// 00099 /// @retval The number of messages released from the queue; -1 on error. 00100 virtual int close (void); 00101 00102 /// Releases all resources from the message queue and marks it deactivated. 00103 virtual ~ACE_Message_Queue (void); 00104 00105 /** 00106 * Releases all resources from the message queue but does not mark it 00107 * deactivated. This method holds the queue lock during this operation. 00108 * @sa close(). 00109 * 00110 * @return The number of messages flushed; -1 on error. 00111 */ 00112 virtual int flush (void); 00113 00114 /** 00115 * Release all resources from the message queue but do not mark it 00116 * as deactivated. 00117 * 00118 * @pre The caller must be holding the queue lock before calling this 00119 * method. 00120 * 00121 * @return The number of messages flushed. 00122 */ 00123 virtual int flush_i (void); 00124 00125 /** @name Enqueue and dequeue methods 00126 * 00127 * The enqueue and dequeue methods accept a timeout value passed as 00128 * an ACE_Time_Value *. In all cases, if the timeout pointer is 0, 00129 * the caller will block until action is possible. If the timeout pointer 00130 * is non-zero, the call will wait (if needed, subject to water mark 00131 * settings) until the absolute time specified in the referenced 00132 * ACE_Time_Value object is reached. If the time is reached before the 00133 * desired action is possible, the method will return -1 with errno set 00134 * to @c EWOULDBLOCK. Regardless of the timeout setting, however, 00135 * these methods will also fail and return -1 when the queue is closed, 00136 * deactivated, pulsed, or when a signal occurs. 00137 * 00138 * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of 00139 * ACE_Message_Queue, enqueueing, dequeueing, and how these operations are 00140 * affected by queue state transitions. 00141 */ 00142 //@{ 00143 /** 00144 * Retrieve a pointer to the first ACE_Message_Block in the queue 00145 * without removing it. 00146 * 00147 * @note Because the block whose pointer is returned is still on the queue, 00148 * another thread may dequeue the referenced block at any time, 00149 * including before the calling thread examines the peeked-at block. 00150 * Be very careful with this method in multithreaded queueing 00151 * situations. 00152 * 00153 * @param first_item Reference to an ACE_Message_Block * that will 00154 * point to the first block on the queue. The block 00155 * remains on the queue until this or another thread 00156 * dequeues it. 00157 * @param timeout The absolute time the caller will wait until 00158 * for a block to be queued. 00159 * 00160 * @retval >0 The number of ACE_Message_Blocks on the queue. 00161 * @retval -1 On failure. errno holds the reason. Common errno values are: 00162 * - EWOULDBLOCK: the timeout elapsed 00163 * - ESHUTDOWN: the queue was deactivated or pulsed 00164 */ 00165 virtual int peek_dequeue_head (ACE_Message_Block *&first_item, 00166 ACE_Time_Value *timeout = 0); 00167 00168 /** 00169 * Enqueue an ACE_Message_Block into the queue in accordance with 00170 * the ACE_Message_Block's priority (0 is lowest priority). FIFO 00171 * order is maintained when messages of the same priority are 00172 * inserted consecutively. 00173 * 00174 * @param new_item Pointer to an ACE_Message_Block that will be 00175 * added to the queue. The block's @c msg_priority() 00176 * method will be called to obtain the queueing priority. 00177 * @param timeout The absolute time the caller will wait until 00178 * for the block to be queued. 00179 * 00180 * @retval >0 The number of ACE_Message_Blocks on the queue after adding 00181 * the specified block. 00182 * @retval -1 On failure. errno holds the reason. Common errno values are: 00183 * - EWOULDBLOCK: the timeout elapsed 00184 * - ESHUTDOWN: the queue was deactivated or pulsed 00185 */ 00186 virtual int enqueue_prio (ACE_Message_Block *new_item, 00187 ACE_Time_Value *timeout = 0); 00188 00189 /** 00190 * Enqueue an ACE_Message_Block into the queue in accordance with the 00191 * block's deadline time. FIFO order is maintained when messages of 00192 * the same deadline time are inserted consecutively. 00193 * 00194 * @param new_item Pointer to an ACE_Message_Block that will be 00195 * added to the queue. The block's @c msg_deadline_time() 00196 * method will be called to obtain the relative queueing 00197 * position. 00198 * @param timeout The absolute time the caller will wait until 00199 * for the block to be queued. 00200 * 00201 * @retval >0 The number of ACE_Message_Blocks on the queue after adding 00202 * the specified block. 00203 * @retval -1 On failure. errno holds the reason. Common errno values are: 00204 * - EWOULDBLOCK: the timeout elapsed 00205 * - ESHUTDOWN: the queue was deactivated or pulsed 00206 */ 00207 virtual int enqueue_deadline (ACE_Message_Block *new_item, 00208 ACE_Time_Value *timeout = 0); 00209 00210 /** 00211 * @deprecated This is an alias for enqueue_prio(). It's only here for 00212 * backwards compatibility and will go away in a subsequent release. 00213 * Please use enqueue_prio() instead. 00214 */ 00215 virtual int enqueue (ACE_Message_Block *new_item, 00216 ACE_Time_Value *timeout = 0); 00217 00218 /** 00219 * Enqueue one or more ACE_Message_Block objects at the tail of the queue. 00220 * If the @a new_item @c next() pointer is non-zero, it is assumed to be the 00221 * start of a series of ACE_Message_Block objects connected via their 00222 * @c next() pointers. The series of blocks will be added to the queue in 00223 * the same order they are passed in as. 00224 * 00225 * @param new_item Pointer to an ACE_Message_Block that will be 00226 * added to the queue. If the block's @c next() pointer 00227 * is non-zero, all blocks chained from the @c next() 00228 * pointer are enqueued as well. 00229 * @param timeout The absolute time the caller will wait until 00230 * for the block to be queued. 00231 * 00232 * @retval >0 The number of ACE_Message_Blocks on the queue after adding 00233 * the specified block(s). 00234 * @retval -1 On failure. errno holds the reason. Common errno values are: 00235 * - EWOULDBLOCK: the timeout elapsed 00236 * - ESHUTDOWN: the queue was deactivated or pulsed 00237 */ 00238 virtual int enqueue_tail (ACE_Message_Block *new_item, 00239 ACE_Time_Value *timeout = 0); 00240 00241 /** 00242 * Enqueue one or more ACE_Message_Block objects at the head of the queue. 00243 * If the @a new_item @c next() pointer is non-zero, it is assumed to be the 00244 * start of a series of ACE_Message_Block objects connected via their 00245 * @c next() pointers. The series of blocks will be added to the queue in 00246 * the same order they are passed in as. 00247 * 00248 * @param new_item Pointer to an ACE_Message_Block that will be 00249 * added to the queue. If the block's @c next() pointer 00250 * is non-zero, all blocks chained from the @c next() 00251 * pointer are enqueued as well. 00252 * @param timeout The absolute time the caller will wait until 00253 * for the block to be queued. 00254 * 00255 * @retval >0 The number of ACE_Message_Blocks on the queue after adding 00256 * the specified block(s). 00257 * @retval -1 On failure. errno holds the reason. Common errno values are: 00258 * - EWOULDBLOCK: the timeout elapsed 00259 * - ESHUTDOWN: the queue was deactivated or pulsed 00260 */ 00261 virtual int enqueue_head (ACE_Message_Block *new_item, 00262 ACE_Time_Value *timeout = 0); 00263 00264 /// This method is an alias for the dequeue_head() method. 00265 virtual int dequeue (ACE_Message_Block *&first_item, 00266 ACE_Time_Value *timeout = 0); 00267 00268 /** 00269 * Dequeue the ACE_Message_Block at the head of the queue and return 00270 * a pointer to the dequeued block. 00271 * 00272 * @param first_item Reference to an ACE_Message_Block * that will 00273 * be set to the address of the dequeued block. 00274 * @param timeout The absolute time the caller will wait until 00275 * for a block to be dequeued. 00276 * 00277 * @retval >=0 The number of ACE_Message_Blocks remaining in the queue. 00278 * @retval -1 On failure. errno holds the reason. Common errno values are: 00279 * - EWOULDBLOCK: the timeout elapsed 00280 * - ESHUTDOWN: the queue was deactivated or pulsed 00281 */ 00282 virtual int dequeue_head (ACE_Message_Block *&first_item, 00283 ACE_Time_Value *timeout = 0); 00284 00285 /** 00286 * Dequeue the ACE_Message_Block that has the lowest priority (preserves 00287 * FIFO order for messages with the same priority) and return a pointer 00288 * to the dequeued block. 00289 * 00290 * @param first_item Reference to an ACE_Message_Block * that will 00291 * be set to the address of the dequeued block. 00292 * @param timeout The absolute time the caller will wait until 00293 * for a block to be dequeued. 00294 * 00295 * @retval >=0 The number of ACE_Message_Blocks remaining in the queue. 00296 * @retval -1 On failure. errno holds the reason. Common errno values are: 00297 * - EWOULDBLOCK: the timeout elapsed 00298 * - ESHUTDOWN: the queue was deactivated or pulsed 00299 */ 00300 virtual int dequeue_prio (ACE_Message_Block *&first_item, 00301 ACE_Time_Value *timeout = 0); 00302 00303 /** 00304 * Dequeue the ACE_Message_Block at the tail of the queue and return 00305 * a pointer to the dequeued block. 00306 * 00307 * @param dequeued Reference to an ACE_Message_Block * that will 00308 * be set to the address of the dequeued block. 00309 * @param timeout The absolute time the caller will wait until 00310 * for a block to be dequeued. 00311 * 00312 * @retval >=0 The number of ACE_Message_Blocks remaining in the queue. 00313 * @retval -1 On failure. errno holds the reason. Common errno values are: 00314 * - EWOULDBLOCK: the timeout elapsed 00315 * - ESHUTDOWN: the queue was deactivated or pulsed 00316 */ 00317 virtual int dequeue_tail (ACE_Message_Block *&dequeued, 00318 ACE_Time_Value *timeout = 0); 00319 00320 /** 00321 * Dequeue the ACE_Message_Block with the earliest deadline time and return 00322 * a pointer to the dequeued block. 00323 * 00324 * @param dequeued Reference to an ACE_Message_Block * that will 00325 * be set to the address of the dequeued block. 00326 * @param timeout The absolute time the caller will wait until 00327 * for a block to be dequeued. 00328 * 00329 * @retval >=0 The number of ACE_Message_Blocks remaining in the queue. 00330 * @retval -1 On failure. errno holds the reason. Common errno values are: 00331 * - EWOULDBLOCK: the timeout elapsed 00332 * - ESHUTDOWN: the queue was deactivated or pulsed 00333 */ 00334 virtual int dequeue_deadline (ACE_Message_Block *&dequeued, 00335 ACE_Time_Value *timeout = 0); 00336 //@} 00337 00338 /** @name Queue statistics methods 00339 */ 00340 //@{ 00341 00342 /// True if queue is full, else false. 00343 virtual int is_full (void); 00344 /// True if queue is empty, else false. 00345 virtual int is_empty (void); 00346 00347 /** 00348 * Number of total bytes on the queue, i.e., sum of the message 00349 * block sizes. 00350 */ 00351 virtual size_t message_bytes (void); 00352 00353 /** 00354 * Number of total length on the queue, i.e., sum of the message 00355 * block lengths. 00356 */ 00357 virtual size_t message_length (void); 00358 00359 /** 00360 * Number of total messages on the queue. 00361 */ 00362 virtual size_t message_count (void); 00363 00364 // = Manual changes to these stats (used when queued message blocks 00365 // change size or lengths). 00366 /** 00367 * New value of the number of total bytes on the queue, i.e., sum of 00368 * the message block sizes. 00369 */ 00370 virtual void message_bytes (size_t new_size); 00371 /** 00372 * New value of the number of total length on the queue, i.e., sum 00373 * of the message block lengths. 00374 */ 00375 virtual void message_length (size_t new_length); 00376 00377 //@} 00378 00379 00380 /** @name Water mark (flow control) methods 00381 */ 00382 //@{ 00383 00384 /** 00385 * Get high watermark. 00386 */ 00387 virtual size_t high_water_mark (void); 00388 /** 00389 * Set the high watermark, which determines how many bytes can be 00390 * stored in a queue before it's considered "full." 00391 */ 00392 virtual void high_water_mark (size_t hwm); 00393 00394 /** 00395 * Get low watermark. 00396 */ 00397 virtual size_t low_water_mark (void); 00398 /** 00399 * Set the low watermark, which determines how many bytes must be in 00400 * the queue before supplier threads are allowed to enqueue 00401 * additional ACE_Message_Blocks. 00402 */ 00403 virtual void low_water_mark (size_t lwm); 00404 //@} 00405 00406 /** @name Activation and queue state methods 00407 * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of 00408 * queue states and transitions and how the transitions affect message 00409 * enqueueing and dequeueing operations. 00410 */ 00411 //@{ 00412 00413 /** 00414 * Deactivate the queue and wakeup all threads waiting on the queue 00415 * so they can continue. No messages are removed from the queue, 00416 * however. Any other operations called until the queue is 00417 * activated again will immediately return -1 with @c errno == 00418 * ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the 00419 * call and WAS_ACTIVE if queue was active before the call. 00420 */ 00421 virtual int deactivate (void); 00422 00423 /** 00424 * Reactivate the queue so that threads can enqueue and dequeue 00425 * messages again. Returns the state of the queue before the call. 00426 */ 00427 virtual int activate (void); 00428 00429 /** 00430 * Pulse the queue to wake up any waiting threads. Changes the 00431 * queue state to PULSED; future enqueue/dequeue operations proceed 00432 * as in ACTIVATED state. 00433 * 00434 * @return The queue's state before this call. 00435 */ 00436 virtual int pulse (void); 00437 00438 /// Returns the current state of the queue, which can be one of 00439 /// ACTIVATED, DEACTIVATED, or PULSED. 00440 virtual int state (void); 00441 00442 /// Returns true if the state of the queue is <DEACTIVATED>, 00443 /// but false if the queue's is <ACTIVATED> or <PULSED>. 00444 virtual int deactivated (void); 00445 //@} 00446 00447 /** @name Notification strategy methods 00448 */ 00449 //@{ 00450 00451 /** 00452 * This hook is automatically invoked by <enqueue_head>, 00453 * <enqueue_tail>, and <enqueue_prio> when a new item is inserted 00454 * into the queue. Subclasses can override this method to perform 00455 * specific notification strategies (e.g., signaling events for a 00456 * <WFMO_Reactor>, notifying a <Reactor>, etc.). In a 00457 * multi-threaded application with concurrent consumers, there is no 00458 * guarantee that the queue will be still be non-empty by the time 00459 * the notification occurs. 00460 */ 00461 virtual int notify (void); 00462 00463 /// Get the notification strategy for the <Message_Queue> 00464 virtual ACE_Notification_Strategy *notification_strategy (void); 00465 00466 /// Set the notification strategy for the <Message_Queue> 00467 virtual void notification_strategy (ACE_Notification_Strategy *s); 00468 //@} 00469 00470 /// Returns a reference to the lock used by the ACE_Message_Queue. 00471 virtual ACE_SYNCH_MUTEX_T &lock (void); 00472 00473 /// Dump the state of an object. 00474 virtual void dump (void) const; 00475 00476 /// Declare the dynamic allocation hooks. 00477 ACE_ALLOC_HOOK_DECLARE; 00478 00479 protected: 00480 // = Routines that actually do the enqueueing and dequeueing. 00481 00482 // These routines assume that locks are held by the corresponding 00483 // public methods. Since they are virtual, you can change the 00484 // queueing mechanism by subclassing from ACE_Message_Queue. 00485 00486 /// Enqueue an <ACE_Message_Block *> in accordance with its priority. 00487 virtual int enqueue_i (ACE_Message_Block *new_item); 00488 00489 /// Enqueue an <ACE_Message_Block *> in accordance with its deadline time. 00490 virtual int enqueue_deadline_i (ACE_Message_Block *new_item); 00491 00492 /// Enqueue an <ACE_Message_Block *> at the end of the queue. 00493 virtual int enqueue_tail_i (ACE_Message_Block *new_item); 00494 00495 /// Enqueue an <ACE_Message_Block *> at the head of the queue. 00496 virtual int enqueue_head_i (ACE_Message_Block *new_item); 00497 00498 /// Dequeue and return the <ACE_Message_Block *> at the head of the 00499 /// queue. 00500 virtual int dequeue_head_i (ACE_Message_Block *&first_item); 00501 00502 /// Dequeue and return the <ACE_Message_Block *> with the lowest 00503 /// priority. 00504 virtual int dequeue_prio_i (ACE_Message_Block *&dequeued); 00505 00506 /// Dequeue and return the <ACE_Message_Block *> at the tail of the 00507 /// queue. 00508 virtual int dequeue_tail_i (ACE_Message_Block *&first_item); 00509 00510 /// Dequeue and return the <ACE_Message_Block *> with the lowest 00511 /// deadline time. 00512 virtual int dequeue_deadline_i (ACE_Message_Block *&first_item); 00513 00514 // = Check the boundary conditions (assumes locks are held). 00515 00516 /// True if queue is full, else false. 00517 virtual int is_full_i (void); 00518 00519 /// True if queue is empty, else false. 00520 virtual int is_empty_i (void); 00521 00522 // = Implementation of the public <activate> and <deactivate> methods. 00523 00524 // These methods assume locks are held. 00525 00526 /** 00527 * Notifies all waiting threads that the queue has been deactivated 00528 * so they can wakeup and continue other processing. 00529 * No messages are removed from the queue. 00530 * 00531 * @param pulse If 0, the queue's state is changed to DEACTIVATED 00532 * and any other operations called until the queue is 00533 * reactivated will immediately return -1 with 00534 * errno == ESHUTDOWN. 00535 * If not zero, only the waiting threads are notified and 00536 * the queue's state changes to PULSED. 00537 * 00538 * @return The state of the queue before the call. 00539 */ 00540 virtual int deactivate_i (int pulse = 0); 00541 00542 /// Activate the queue. 00543 virtual int activate_i (void); 00544 00545 // = Helper methods to factor out common #ifdef code. 00546 00547 /// Wait for the queue to become non-full. 00548 virtual int wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, 00549 ACE_Time_Value *timeout); 00550 00551 /// Wait for the queue to become non-empty. 00552 virtual int wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, 00553 ACE_Time_Value *timeout); 00554 00555 /// Inform any threads waiting to enqueue that they can procede. 00556 virtual int signal_enqueue_waiters (void); 00557 00558 /// Inform any threads waiting to dequeue that they can procede. 00559 virtual int signal_dequeue_waiters (void); 00560 00561 /// Pointer to head of ACE_Message_Block list. 00562 ACE_Message_Block *head_; 00563 00564 /// Pointer to tail of ACE_Message_Block list. 00565 ACE_Message_Block *tail_; 00566 00567 /// Lowest number before unblocking occurs. 00568 size_t low_water_mark_; 00569 00570 /// Greatest number of bytes before blocking. 00571 size_t high_water_mark_; 00572 00573 /// Current number of bytes in the queue. 00574 size_t cur_bytes_; 00575 00576 /// Current length of messages in the queue. 00577 size_t cur_length_; 00578 00579 /// Current number of messages in the queue. 00580 size_t cur_count_; 00581 00582 /// The notification strategy used when a new message is enqueued. 00583 ACE_Notification_Strategy *notification_strategy_; 00584 00585 // = Synchronization primitives for controlling concurrent access. 00586 /// Protect queue from concurrent access. 00587 ACE_SYNCH_MUTEX_T lock_; 00588 00589 /// Used to make threads sleep until the queue is no longer empty. 00590 ACE_SYNCH_CONDITION_T not_empty_cond_; 00591 00592 /// Used to make threads sleep until the queue is no longer full. 00593 ACE_SYNCH_CONDITION_T not_full_cond_; 00594 00595 private: 00596 00597 // = Disallow these operations. 00598 ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue<ACE_SYNCH_USE> &)) 00599 ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue (const ACE_Message_Queue<ACE_SYNCH_USE> &)) 00600 }; 00601 00602 // This typedef is used to get around a compiler bug in g++/vxworks. 00603 typedef ACE_Message_Queue<ACE_SYNCH> ACE_DEFAULT_MESSAGE_QUEUE_TYPE; 00604 00605 00606 /** 00607 * @class ACE_Message_Queue_Iterator 00608 * 00609 * @brief Iterator for the ACE_Message_Queue. 00610 */ 00611 template <ACE_SYNCH_DECL> 00612 class ACE_Message_Queue_Iterator 00613 { 00614 public: 00615 // = Initialization method. 00616 ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue); 00617 00618 // = Iteration methods. 00619 /// Pass back the <entry> that hasn't been seen in the queue. 00620 /// Returns 0 when all items have been seen, else 1. 00621 int next (ACE_Message_Block *&entry); 00622 00623 /// Returns 1 when all items have been seen, else 0. 00624 int done (void) const; 00625 00626 /// Move forward by one element in the queue. Returns 0 when all the 00627 /// items in the set have been seen, else 1. 00628 int advance (void); 00629 00630 /// Dump the state of an object. 00631 void dump (void) const; 00632 00633 /// Declare the dynamic allocation hooks. 00634 ACE_ALLOC_HOOK_DECLARE; 00635 00636 private: 00637 /// Message_Queue we are iterating over. 00638 ACE_Message_Queue <ACE_SYNCH_USE> &queue_; 00639 00640 /// Keeps track of how far we've advanced... 00641 ACE_Message_Block *curr_; 00642 }; 00643 00644 /** 00645 * @class ACE_Message_Queue_Reverse_Iterator 00646 * 00647 * @brief Reverse Iterator for the ACE_Message_Queue. 00648 */ 00649 template <ACE_SYNCH_DECL> 00650 class ACE_Message_Queue_Reverse_Iterator 00651 { 00652 public: 00653 // = Initialization method. 00654 ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue); 00655 00656 // = Iteration methods. 00657 /// Pass back the <entry> that hasn't been seen in the queue. 00658 /// Returns 0 when all items have been seen, else 1. 00659 int next (ACE_Message_Block *&entry); 00660 00661 /// Returns 1 when all items have been seen, else 0. 00662 int done (void) const; 00663 00664 /// Move forward by one element in the queue. Returns 0 when all the 00665 /// items in the set have been seen, else 1. 00666 int advance (void); 00667 00668 /// Dump the state of an object. 00669 void dump (void) const; 00670 00671 /// Declare the dynamic allocation hooks. 00672 ACE_ALLOC_HOOK_DECLARE; 00673 00674 private: 00675 /// Message_Queue we are iterating over. 00676 ACE_Message_Queue <ACE_SYNCH_USE> &queue_; 00677 00678 /// Keeps track of how far we've advanced... 00679 ACE_Message_Block *curr_; 00680 }; 00681 00682 /** 00683 * @class ACE_Dynamic_Message_Queue 00684 * 00685 * @brief A derived class which adapts the ACE_Message_Queue 00686 * class in order to maintain dynamic priorities for enqueued 00687 * <ACE_Message_Blocks> and manage the queue order according 00688 * to these dynamic priorities. 00689 * 00690 * The messages in the queue are managed so as to preserve 00691 * a logical ordering with minimal overhead per enqueue and 00692 * dequeue operation. For this reason, the actual order of 00693 * messages in the linked list of the queue may differ from 00694 * their priority order. As time passes, a message may change 00695 * from pending status to late status, and eventually to beyond 00696 * late status. To minimize reordering overhead under this 00697 * design force, three separate boundaries are maintained 00698 * within the linked list of messages. Messages are dequeued 00699 * preferentially from the head of the pending portion, then 00700 * the head of the late portion, and finally from the head 00701 * of the beyond late portion. In this way, only the boundaries 00702 * need to be maintained (which can be done efficiently, as 00703 * aging messages maintain the same linked list order as they 00704 * progress from one status to the next), with no reordering 00705 * of the messages themselves, while providing correct priority 00706 * ordered dequeueing semantics. 00707 * Head and tail enqueue methods inherited from ACE_Message_Queue 00708 * are made private to prevent out-of-order messages from confusing 00709 * management of the various portions of the queue. Messages in 00710 * the pending portion of the queue whose priority becomes late 00711 * (according to the specific dynamic strategy) advance into 00712 * the late portion of the queue. Messages in the late portion 00713 * of the queue whose priority becomes later than can be represented 00714 * advance to the beyond_late portion of the queue. These behaviors 00715 * support a limited schedule overrun, with pending messages prioritized 00716 * ahead of late messages, and late messages ahead of beyond late 00717 * messages. These behaviors can be modified in derived classes by 00718 * providing alternative definitions for the appropriate virtual methods. 00719 * When filled with messages, the queue's linked list should look like: 00720 * H T 00721 * | | 00722 * B - B - B - B - L - L - L - P - P - P - P - P 00723 * | | | | | | 00724 * BH BT LH LT PH PT 00725 * Where the symbols are as follows: 00726 * H = Head of the entire list 00727 * T = Tail of the entire list 00728 * B = Beyond late message 00729 * BH = Beyond late messages Head 00730 * BT = Beyond late messages Tail 00731 * L = Late message 00732 * LH = Late messages Head 00733 * LT = Late messages Tail 00734 * P = Pending message 00735 * PH = Pending messages Head 00736 * PT = Pending messages Tail 00737 * Caveat: the virtual methods enqueue_tail, enqueue_head, 00738 * and peek_dequeue_head have semantics for the static 00739 * message queues that cannot be guaranteed for dynamic 00740 * message queues. The peek_dequeue_head method just 00741 * calls the base class method, while the two enqueue 00742 * methods call the priority enqueue method. The 00743 * order of messages in the dynamic queue is a function 00744 * of message deadlines and how long they are in the 00745 * queues. You can manipulate these in some cases to 00746 * ensure the correct semantics, but that is not a 00747 * very stable or portable approach (discouraged). 00748 */ 00749 template <ACE_SYNCH_DECL> 00750 class ACE_Dynamic_Message_Queue : public ACE_Message_Queue<ACE_SYNCH_USE> 00751 { 00752 public: 00753 // = Initialization and termination methods. 00754 ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy, 00755 size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00756 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00757 ACE_Notification_Strategy * = 0); 00758 00759 /// Close down the message queue and release all resources. 00760 virtual ~ACE_Dynamic_Message_Queue (void); 00761 00762 /** 00763 * Detach all messages with status given in the passed flags from 00764 * the queue and return them by setting passed head and tail pointers 00765 * to the linked list they comprise. This method is intended primarily 00766 * as a means of periodically harvesting messages that have missed 00767 * their deadlines, but is available in its most general form. All 00768 * messages are returned in priority order, from head to tail, as of 00769 * the time this method was called. 00770 */ 00771 virtual int remove_messages (ACE_Message_Block *&list_head, 00772 ACE_Message_Block *&list_tail, 00773 u_int status_flags); 00774 00775 /** 00776 * Dequeue and return the <ACE_Message_Block *> at the head of the 00777 * queue. Returns -1 on failure, else the number of items still on 00778 * the queue. 00779 */ 00780 virtual int dequeue_head (ACE_Message_Block *&first_item, 00781 ACE_Time_Value *timeout = 0); 00782 00783 /// Dump the state of the queue. 00784 virtual void dump (void) const; 00785 00786 /** 00787 * Just call priority enqueue method: tail enqueue semantics for dynamic 00788 * message queues are unstable: the message may or may not be where 00789 * it was placed after the queue is refreshed prior to the next 00790 * enqueue or dequeue operation. 00791 */ 00792 virtual int enqueue_tail (ACE_Message_Block *new_item, 00793 ACE_Time_Value *timeout = 0); 00794 00795 /** 00796 * Just call priority enqueue method: head enqueue semantics for dynamic 00797 * message queues are unstable: the message may or may not be where 00798 * it was placed after the queue is refreshed prior to the next 00799 * enqueue or dequeue operation. 00800 */ 00801 virtual int enqueue_head (ACE_Message_Block *new_item, 00802 ACE_Time_Value *timeout = 0); 00803 00804 00805 /// Declare the dynamic allocation hooks. 00806 ACE_ALLOC_HOOK_DECLARE; 00807 00808 protected: 00809 00810 /** 00811 * Enqueue an <ACE_Message_Block *> in accordance with its priority. 00812 * priority may be *dynamic* or *static* or a combination or *both* 00813 * It calls the priority evaluation function passed into the Dynamic 00814 * Message Queue constructor to update the priorities of all 00815 * enqueued messages. 00816 */ 00817 virtual int enqueue_i (ACE_Message_Block *new_item); 00818 00819 /// Enqueue a message in priority order within a given priority status sublist 00820 virtual int sublist_enqueue_i (ACE_Message_Block *new_item, 00821 const ACE_Time_Value ¤t_time, 00822 ACE_Message_Block *&sublist_head, 00823 ACE_Message_Block *&sublist_tail, 00824 ACE_Dynamic_Message_Strategy::Priority_Status status); 00825 00826 /** 00827 * Dequeue and return the <ACE_Message_Block *> at the head of the 00828 * logical queue. Attempts first to dequeue from the pending 00829 * portion of the queue, or if that is empty from the late portion, 00830 * or if that is empty from the beyond late portion, or if that is 00831 * empty just sets the passed pointer to zero and returns -1. 00832 */ 00833 virtual int dequeue_head_i (ACE_Message_Block *&first_item); 00834 00835 /// Refresh the queue using the strategy 00836 /// specific priority status function. 00837 virtual int refresh_queue (const ACE_Time_Value & current_time); 00838 00839 /// Refresh the pending queue using the strategy 00840 /// specific priority status function. 00841 virtual int refresh_pending_queue (const ACE_Time_Value & current_time); 00842 00843 /// Refresh the late queue using the strategy 00844 /// specific priority status function. 00845 virtual int refresh_late_queue (const ACE_Time_Value & current_time); 00846 00847 /// Pointer to head of the pending messages 00848 ACE_Message_Block *pending_head_; 00849 00850 /// Pointer to tail of the pending messages 00851 ACE_Message_Block *pending_tail_; 00852 00853 /// Pointer to head of the late messages 00854 ACE_Message_Block *late_head_; 00855 00856 /// Pointer to tail of the late messages 00857 ACE_Message_Block *late_tail_; 00858 00859 /// Pointer to head of the beyond late messages 00860 ACE_Message_Block *beyond_late_head_; 00861 00862 /// Pointer to tail of the beyond late messages 00863 ACE_Message_Block *beyond_late_tail_; 00864 00865 /// Pointer to a dynamic priority evaluation function. 00866 ACE_Dynamic_Message_Strategy &message_strategy_; 00867 00868 private: 00869 // = Disallow public access to these operations. 00870 00871 ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &)) 00872 ACE_UNIMPLEMENTED_FUNC (ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &)) 00873 00874 // provide definitions for these (just call base class method), 00875 // but make them private so they're not accessible outside the class 00876 00877 /// Private method to hide public base class method: just calls base class method 00878 virtual int peek_dequeue_head (ACE_Message_Block *&first_item, 00879 ACE_Time_Value *timeout = 0); 00880 00881 }; 00882 00883 /** 00884 * @class ACE_Message_Queue_Factory 00885 * 00886 * @brief ACE_Message_Queue_Factory is a static factory class template which 00887 * provides a separate factory method for each of the major kinds of 00888 * priority based message dispatching: static, earliest deadline first 00889 * (EDF), and minimum laxity first (MLF). 00890 * 00891 * The ACE_Dynamic_Message_Queue class assumes responsibility for 00892 * releasing the resources of the strategy with which it was 00893 * constructed: the user of a message queue constructed by 00894 * any of these factory methods is only responsible for 00895 * ensuring destruction of the message queue itself. 00896 */ 00897 template <ACE_SYNCH_DECL> 00898 class ACE_Message_Queue_Factory 00899 { 00900 public: 00901 /// Factory method for a statically prioritized ACE_Message_Queue 00902 static ACE_Message_Queue<ACE_SYNCH_USE> * 00903 create_static_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00904 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00905 ACE_Notification_Strategy * = 0); 00906 00907 /// Factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue 00908 static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * 00909 create_deadline_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00910 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00911 ACE_Notification_Strategy * = 0, 00912 u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 00913 u_long static_bit_field_shift = 10, // 10 low order bits 00914 u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 00915 u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) 00916 00917 /// Factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue 00918 static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * 00919 create_laxity_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00920 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00921 ACE_Notification_Strategy * = 0, 00922 u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 00923 u_long static_bit_field_shift = 10, // 10 low order bits 00924 u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 00925 u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) 00926 00927 00928 #if defined (ACE_VXWORKS) 00929 00930 /// Factory method for a wrapped VxWorks message queue 00931 static ACE_Message_Queue_Vx * 00932 create_Vx_message_queue (size_t max_messages, size_t max_message_length, 00933 ACE_Notification_Strategy *ns = 0); 00934 00935 #endif /* defined (ACE_VXWORKS) */ 00936 00937 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) 00938 00939 /// Factory method for a NT message queue. 00940 static ACE_Message_Queue_NT * 00941 create_NT_message_queue (size_t max_threads); 00942 00943 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */ 00944 }; 00945 00946 /** 00947 * @class ACE_Message_Queue_Ex 00948 * 00949 * @brief A threaded message queueing facility, modeled after the 00950 * queueing facilities in System V STREAMs. 00951 * 00952 * ACE_Message_Queue_Ex is a strongly-typed version of the 00953 * ACE_Message_Queue class. Rather than queueing in terms of ACE_Message_Block 00954 * objects, ACE_Message_Queue_Ex has a template argument to specify the 00955 * type of objects that are queued. 00956 * 00957 * The second template argument parameterizes the queue's synchronization. 00958 * The argument specifies a synchronization strategy. The two main 00959 * strategies available for ACE_SYNCH_DECL are: 00960 * -# ACE_MT_SYNCH: all operations are thread-safe 00961 * -# ACE_NULL_SYNCH: no synchronization and no locking overhead 00962 */ 00963 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> 00964 class ACE_Message_Queue_Ex 00965 { 00966 public: 00967 00968 enum 00969 { 00970 /// Default priority value. This is the lowest priority. 00971 DEFAULT_PRIORITY = 0 00972 }; 00973 00974 #if 0 00975 // @@ Iterators are not implemented yet... 00976 00977 friend class ACE_Message_Queue_Iterator<ACE_SYNCH_USE>; 00978 friend class ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>; 00979 00980 // = Traits 00981 typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE> 00982 ITERATOR; 00983 typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE> 00984 REVERSE_ITERATOR; 00985 #endif /* 0 */ 00986 00987 /** 00988 * @name Initialization methods 00989 */ 00990 //@{ 00991 /** 00992 * Initialize an ACE_Message_Queue_Ex. 00993 * 00994 * @param high_water_mark High water mark. Determines how many bytes can be 00995 * stored in a queue before it's considered full. Supplier threads 00996 * must block until the queue is no longer full. 00997 * @param low_water_mark Low water mark. Determines how many bytes must be in 00998 * the queue before supplier threads are allowed to enqueue additional 00999 * data. By default, the @a hwm equals @a lwm, which means 01000 * that suppliers will be able to enqueue new messages as soon as 01001 * a consumer removes any message from the queue. Making the low 01002 * water mark smaller than the high water mark forces consumers to 01003 * drain more messages from the queue before suppliers can enqueue 01004 * new messages, which can minimize the "silly window syndrome." 01005 * @param ns Notification strategy. Pointer to an object conforming to the 01006 * ACE_Notification_Strategy interface. If set, the object's 01007 * notify(void) method will be called each time data is added to 01008 * this ACE_Message_Queue. @see ACE_Reactor_Notification_Strategy. 01009 */ 01010 ACE_Message_Queue_Ex (size_t high_water_mark = ACE_Message_Queue_Base::DEFAULT_HWM, 01011 size_t low_water_mark = ACE_Message_Queue_Base::DEFAULT_LWM, 01012 ACE_Notification_Strategy * ns = 0); 01013 virtual int open (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 01014 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 01015 ACE_Notification_Strategy * = 0); 01016 //@} 01017 01018 /// Releases all resources from the message queue and marks it deactivated. 01019 /// @sa flush(). 01020 /// 01021 /// @retval The number of messages released from the queue; -1 on error. 01022 virtual int close (void); 01023 01024 /// Releases all resources from the message queue and marks it deactivated. 01025 virtual ~ACE_Message_Queue_Ex (void); 01026 01027 /** 01028 * Releases all resources from the message queue but does not mark it 01029 * deactivated. This method holds the queue lock during this operation. 01030 * @sa close(). 01031 * 01032 * @return The number of messages flushed; -1 on error. 01033 */ 01034 virtual int flush (void); 01035 01036 /** 01037 * Release all resources from the message queue but do not mark it 01038 * as deactivated. 01039 * 01040 * @pre The caller must be holding the queue lock before calling this 01041 * method. 01042 * 01043 * @return The number of messages flushed. 01044 */ 01045 virtual int flush_i (void); 01046 01047 /** @name Enqueue and dequeue methods 01048 * 01049 * The enqueue and dequeue methods accept a timeout value passed as 01050 * an ACE_Time_Value *. In all cases, if the timeout pointer is 0, 01051 * the caller will block until action is possible. If the timeout pointer 01052 * is non-zero, the call will wait (if needed, subject to water mark 01053 * settings) until the absolute time specified in the referenced 01054 * ACE_Time_Value object is reached. If the time is reached before the 01055 * desired action is possible, the method will return -1 with errno set 01056 * to @c EWOULDBLOCK. Regardless of the timeout setting, however, 01057 * these methods will also fail and return -1 when the queue is closed, 01058 * deactivated, pulsed, or when a signal occurs. 01059 * 01060 * The time parameters are handled the same as in ACE_Message_Queue, so 01061 * you can see C++NPv2 Section 6.2 and APG Section 12.3 for a fuller 01062 * treatment of ACE_Message_Queue, enqueueing, dequeueing, and how these 01063 * operations are affected by queue state transitions. 01064 */ 01065 //@{ 01066 /** 01067 * Retrieve a pointer to the first item in the queue without removing it. 01068 * 01069 * @note Because the item whose pointer is returned is still on the queue, 01070 * another thread may dequeue that item at any time, 01071 * including before the calling thread examines the peeked-at item. 01072 * Be very careful with this method in multithreaded queueing 01073 * situations. 01074 * 01075 * @param first_item Reference to an ACE_MESSAGE_TYPE * that will 01076 * point to the first item on the queue. The item 01077 * remains on the queue until this or another thread 01078 * dequeues it. 01079 * @param timeout The absolute time the caller will wait until 01080 * for an item to be queued. 01081 * 01082 * @retval >0 The number of items on the queue. 01083 * @retval -1 On failure. errno holds the reason. Common errno values are: 01084 * - EWOULDBLOCK: the timeout elapsed 01085 * - ESHUTDOWN: the queue was deactivated or pulsed 01086 */ 01087 virtual int peek_dequeue_head (ACE_MESSAGE_TYPE *&first_item, 01088 ACE_Time_Value *timeout = 0); 01089 01090 /** 01091 * Enqueue an ACE_MESSAGE TYPE into the queue in accordance with 01092 * the specified priority (0 is lowest priority). FIFO 01093 * order is maintained when items of the same priority are 01094 * inserted consecutively. 01095 * 01096 * @param new_item Pointer to an item that will be added to the queue. 01097 * @param timeout The absolute time the caller will wait until 01098 * for the block to be queued. 01099 * @param priority The priority to use when enqueueing the item. 01100 * 01101 * @retval >0 The number of items on the queue after adding 01102 * the specified item. 01103 * @retval -1 On failure. errno holds the reason. Common errno values are: 01104 * - EWOULDBLOCK: the timeout elapsed 01105 * - ESHUTDOWN: the queue was deactivated or pulsed 01106 */ 01107 virtual int enqueue_prio (ACE_MESSAGE_TYPE *new_item, 01108 ACE_Time_Value *timeout = 0, 01109 unsigned long priority = DEFAULT_PRIORITY); 01110 01111 /** 01112 * This method acts just like enqueue_tail(). There's no deadline 01113 * time associated with items. 01114 */ 01115 virtual int enqueue_deadline (ACE_MESSAGE_TYPE *new_item, 01116 ACE_Time_Value *timeout = 0); 01117 01118 /** 01119 * @deprecated This is an alias for enqueue_prio(). It's only here for 01120 * backwards compatibility and will go away in a subsequent release. 01121 * Please use enqueue_prio() instead. 01122 */ 01123 virtual int enqueue (ACE_MESSAGE_TYPE *new_item, 01124 ACE_Time_Value *timeout = 0); 01125 01126 /** 01127 * Enqueue an item at the tail of the queue. 01128 * 01129 * @param new_item Pointer to an item that will be added to the queue. 01130 * @param timeout The absolute time the caller will wait until 01131 * for the item to be queued. 01132 * 01133 * @retval >0 The number of items on the queue after adding 01134 * the specified item. 01135 * @retval -1 On failure. errno holds the reason. Common errno values are: 01136 * - EWOULDBLOCK: the timeout elapsed 01137 * - ESHUTDOWN: the queue was deactivated or pulsed 01138 */ 01139 virtual int enqueue_tail (ACE_MESSAGE_TYPE *new_item, 01140 ACE_Time_Value *timeout = 0); 01141 01142 /** 01143 * Enqueue an item at the head of the queue. 01144 * 01145 * @param new_item Pointer to an item that will be added to the queue. 01146 * @param timeout The absolute time the caller will wait until 01147 * for the item to be queued. 01148 * 01149 * @retval >0 The number of items on the queue after adding 01150 * the specified item. 01151 * @retval -1 On failure. errno holds the reason. Common errno values are: 01152 * - EWOULDBLOCK: the timeout elapsed 01153 * - ESHUTDOWN: the queue was deactivated or pulsed 01154 */ 01155 virtual int enqueue_head (ACE_MESSAGE_TYPE *new_item, 01156 ACE_Time_Value *timeout = 0); 01157 01158 /// This method is an alias for the following <dequeue_head> method. 01159 virtual int dequeue (ACE_MESSAGE_TYPE *&first_item, 01160 ACE_Time_Value *timeout = 0); 01161 01162 /** 01163 * Dequeue the item at the head of the queue and return a pointer to it. 01164 * 01165 * @param first_item Reference to an ACE_MESSAGE_TYPE * that will 01166 * be set to the address of the dequeued item. 01167 * @param timeout The absolute time the caller will wait until 01168 * for an item to be dequeued. 01169 * 01170 * @retval >=0 The number of items remaining in the queue. 01171 * @retval -1 On failure. errno holds the reason. Common errno values are: 01172 * - EWOULDBLOCK: the timeout elapsed 01173 * - ESHUTDOWN: the queue was deactivated or pulsed 01174 */ 01175 virtual int dequeue_head (ACE_MESSAGE_TYPE *&first_item, 01176 ACE_Time_Value *timeout = 0); 01177 01178 /** 01179 * Dequeue the item that has the lowest priority (preserves 01180 * FIFO order for items with the same priority) and return a pointer 01181 * to it. 01182 * 01183 * @param dequeued Reference to an ACE_MESSAGE_TYPE * that will 01184 * be set to the address of the dequeued item. 01185 * @param timeout The absolute time the caller will wait until 01186 * for an item to be dequeued. 01187 * 01188 * @retval >=0 The number of items remaining in the queue. 01189 * @retval -1 On failure. errno holds the reason. Common errno values are: 01190 * - EWOULDBLOCK: the timeout elapsed 01191 * - ESHUTDOWN: the queue was deactivated or pulsed 01192 */ 01193 virtual int dequeue_prio (ACE_MESSAGE_TYPE *&dequeued, 01194 ACE_Time_Value *timeout = 0); 01195 01196 /** 01197 * Dequeue the item at the tail of the queue and return a pointer to it. 01198 * 01199 * @param dequeued Reference to an ACE_MESSAGE_TYPE * that will 01200 * be set to the address of the dequeued item. 01201 * @param timeout The absolute time the caller will wait until 01202 * for an item to be dequeued. 01203 * 01204 * @retval >=0 The number of items remaining in the queue. 01205 * @retval -1 On failure. errno holds the reason. Common errno values are: 01206 * - EWOULDBLOCK: the timeout elapsed 01207 * - ESHUTDOWN: the queue was deactivated or pulsed 01208 */ 01209 virtual int dequeue_tail (ACE_MESSAGE_TYPE *&dequeued, 01210 ACE_Time_Value *timeout = 0); 01211 01212 /** 01213 * Because there's deadline associated with enqueue_deadline(), this 01214 * method will behave just as dequeue_head(). 01215 */ 01216 virtual int dequeue_deadline (ACE_MESSAGE_TYPE *&dequeued, 01217 ACE_Time_Value *timeout = 0); 01218 //@} 01219 01220 /** @name Queue statistics methods 01221 */ 01222 //@{ 01223 01224 /// True if queue is full, else false. 01225 virtual int is_full (void); 01226 /// True if queue is empty, else false. 01227 virtual int is_empty (void); 01228 01229 /** 01230 * Number of total bytes on the queue, i.e., sum of the message 01231 * block sizes. 01232 */ 01233 virtual size_t message_bytes (void); 01234 /** 01235 * Number of total length on the queue, i.e., sum of the message 01236 * block lengths. 01237 */ 01238 virtual size_t message_length (void); 01239 /** 01240 * Number of total messages on the queue. 01241 */ 01242 virtual size_t message_count (void); 01243 01244 // = Manual changes to these stats (used when queued message blocks 01245 // change size or lengths). 01246 /** 01247 * New value of the number of total bytes on the queue, i.e., sum of 01248 * the message block sizes. 01249 */ 01250 virtual void message_bytes (size_t new_size); 01251 /** 01252 * New value of the number of total length on the queue, i.e., sum 01253 * of the message block lengths. 01254 */ 01255 virtual void message_length (size_t new_length); 01256 01257 //@} 01258 01259 /** @name Water mark (flow control) methods 01260 */ 01261 //@{ 01262 01263 /** 01264 * Get high watermark. 01265 */ 01266 virtual size_t high_water_mark (void); 01267 /** 01268 * Set the high watermark, which determines how many bytes can be 01269 * stored in a queue before it's considered "full." 01270 */ 01271 virtual void high_water_mark (size_t hwm); 01272 01273 /** 01274 * Get low watermark. 01275 */ 01276 virtual size_t low_water_mark (void); 01277 /** 01278 * Set the low watermark, which determines how many bytes must be in 01279 * the queue before supplier threads are allowed to enqueue 01280 * additional <ACE_MESSAGE_TYPE>s. 01281 */ 01282 virtual void low_water_mark (size_t lwm); 01283 //@} 01284 01285 /** @name Activation and queue state methods 01286 * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of 01287 * queue states and transitions and how the transitions affect message 01288 * enqueueing and dequeueing operations. 01289 */ 01290 //@{ 01291 01292 /** 01293 * Deactivate the queue and wakeup all threads waiting on the queue 01294 * so they can continue. No messages are removed from the queue, 01295 * however. Any other operations called until the queue is 01296 * activated again will immediately return -1 with @c errno == 01297 * ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the 01298 * call and WAS_ACTIVE if queue was active before the call. 01299 */ 01300 virtual int deactivate (void); 01301 01302 /** 01303 * Reactivate the queue so that threads can enqueue and dequeue 01304 * messages again. Returns the state of the queue before the call. 01305 */ 01306 virtual int activate (void); 01307 01308 /** 01309 * Pulse the queue to wake up any waiting threads. Changes the 01310 * queue state to PULSED; future enqueue/dequeue operations proceed 01311 * as in ACTIVATED state. 01312 * 01313 * @retval The queue's state before this call. 01314 */ 01315 virtual int pulse (void); 01316 01317 /// Returns the current state of the queue, which can be one of 01318 /// ACTIVATED, DEACTIVATED, or PULSED. 01319 virtual int state (void); 01320 01321 /// Returns true if the state of the queue is DEACTIVATED, 01322 /// but false if the queue's state is ACTIVATED or PULSED. 01323 virtual int deactivated (void); 01324 //@} 01325 01326 /** @name Notification strategy methods 01327 */ 01328 //@{ 01329 01330 /** 01331 * This hook is automatically invoked by <enqueue_head>, 01332 * <enqueue_tail>, and <enqueue_prio> when a new item is inserted 01333 * into the queue. Subclasses can override this method to perform 01334 * specific notification strategies (e.g., signaling events for a 01335 * <WFMO_Reactor>, notifying a <Reactor>, etc.). In a 01336 * multi-threaded application with concurrent consumers, there is no 01337 * guarantee that the queue will be still be non-empty by the time 01338 * the notification occurs. 01339 */ 01340 virtual int notify (void); 01341 01342 /// Get the notification strategy for the <Message_Queue> 01343 virtual ACE_Notification_Strategy *notification_strategy (void); 01344 01345 /// Set the notification strategy for the <Message_Queue> 01346 virtual void notification_strategy (ACE_Notification_Strategy *s); 01347 //@} 01348 01349 /// Returns a reference to the lock used by the ACE_Message_Queue_Ex. 01350 virtual ACE_SYNCH_MUTEX_T &lock (void); 01351 01352 /// Dump the state of an object. 01353 virtual void dump (void) const; 01354 01355 /// Declare the dynamic allocation hooks. 01356 ACE_ALLOC_HOOK_DECLARE; 01357 01358 protected: 01359 /// Implement this via an ACE_Message_Queue. 01360 ACE_Message_Queue<ACE_SYNCH_USE> queue_; 01361 }; 01362 01363 /** 01364 * @class ACE_Message_Queue_Ex_N 01365 * 01366 * @brief A threaded message queueing facility, modeled after the 01367 * queueing facilities in System V STREAMs which can enqueue 01368 * multiple messages in one call. 01369 * 01370 * As ACE_Message_Queue_Ex, ACE_Message_Queue_Ex_N is a strongly-typed 01371 * version of the ACE_Message_Queue. If @c ACE_SYNCH_DECL is @c ACE_MT_SYNCH 01372 * then all operations are thread-safe. Otherwise, if it's @c ACE_NULL_SYNCH 01373 * then there's no locking overhead. 01374 * 01375 * The @c ACE_MESSAGE_TYPE messages that are sent to this 01376 * queue can be chained. Messages are expected to have a 01377 * @c next method that returns the next message in the chain; 01378 * ACE_Message_Queue_Ex_N uses this method to run through 01379 * all the incoming messages and enqueue them in one call. 01380 */ 01381 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> 01382 class ACE_Message_Queue_Ex_N : public ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE> 01383 { 01384 public: 01385 // = Initialization and termination methods. 01386 01387 /** 01388 * Initialize an ACE_Message_Queue_Ex_N. The @a high_water_mark 01389 * determines how many bytes can be stored in a queue before it's 01390 * considered "full." Supplier threads must block until the queue 01391 * is no longer full. The @a low_water_mark determines how many 01392 * bytes must be in the queue before supplier threads are allowed to 01393 * enqueue additional messages. By default, the @a high_water_mark 01394 * equals the @a low_water_mark, which means that suppliers will be 01395 * able to enqueue new messages as soon as a consumer removes any message 01396 * from the queue. Making the @a low_water_mark smaller than the 01397 * @a high_water_mark forces consumers to drain more messages from the 01398 * queue before suppliers can enqueue new messages, which can minimize 01399 * the "silly window syndrome." 01400 */ 01401 ACE_Message_Queue_Ex_N (size_t high_water_mark = ACE_Message_Queue_Base::DEFAULT_HWM, 01402 size_t low_water_mark = ACE_Message_Queue_Base::DEFAULT_LWM, 01403 ACE_Notification_Strategy * ns = 0); 01404 01405 /// Close down the message queue and release all resources. 01406 virtual ~ACE_Message_Queue_Ex_N (void); 01407 01408 /** 01409 * Enqueue one or more @c ACE_MESSAGE_TYPE objects at the head of the queue. 01410 * If the @a new_item @c next() pointer is non-zero, it is assumed to be the 01411 * start of a series of @c ACE_MESSAGE_TYPE objects connected via their 01412 * @c next() pointers. The series of blocks will be added to the queue in 01413 * the same order they are passed in as. 01414 * 01415 * @param new_item Pointer to an @c ACE_MESSAGE_TYPE that will be 01416 * added to the queue. If the block's @c next() pointer 01417 * is non-zero, all blocks chained from the @c next() 01418 * pointer are enqueued as well. 01419 * @param tv The absolute time the caller will wait until 01420 * for the block to be queued. 01421 * 01422 * @retval >0 The number of @c ACE_MESSAGE_TYPE objects on the queue after 01423 * adding the specified block(s). 01424 * @retval -1 On failure. errno holds the reason. Common errno values are: 01425 * - EWOULDBLOCK: the timeout elapsed 01426 * - ESHUTDOWN: the queue was deactivated or pulsed 01427 */ 01428 virtual int enqueue_head (ACE_MESSAGE_TYPE *new_item, ACE_Time_Value *tv = 0); 01429 01430 /** 01431 * Enqueue one or more @c ACE_MESSAGE_TYPE objects at the tail of the queue. 01432 * If the @a new_item @c next() pointer is non-zero, it is assumed to be the 01433 * start of a series of @c ACE_MESSAGE_TYPE objects connected via their 01434 * @c next() pointers. The series of blocks will be added to the queue in 01435 * the same order they are passed in as. 01436 * 01437 * @param new_item Pointer to an @c ACE_MESSAGE_TYPE that will be 01438 * added to the queue. If the block's @c next() pointer 01439 * is non-zero, all blocks chained from the @c next() 01440 * pointer are enqueued as well. 01441 * @param tv The absolute time the caller will wait until 01442 * for the block to be queued. 01443 * 01444 * @retval >0 The number of @c ACE_MESSAGE_TYPE objects on the queue after 01445 * adding the specified block(s). 01446 * @retval -1 On failure. errno holds the reason. Common errno values are: 01447 * - EWOULDBLOCK: the timeout elapsed 01448 * - ESHUTDOWN: the queue was deactivated or pulsed 01449 */ 01450 virtual int enqueue_tail (ACE_MESSAGE_TYPE *new_item, ACE_Time_Value *tv = 0); 01451 01452 /// Declare the dynamic allocation hooks. 01453 ACE_ALLOC_HOOK_DECLARE; 01454 01455 protected: 01456 /** 01457 * An helper method that wraps the incoming chain messages 01458 * with ACE_Message_Blocks. 01459 */ 01460 ACE_Message_Block *wrap_with_mbs_i (ACE_MESSAGE_TYPE *new_item); 01461 }; 01462 01463 ACE_END_VERSIONED_NAMESPACE_DECL 01464 01465 #if defined (ACE_TEMPLATES_REQUIRE_SOURCE) 01466 #include "ace/Message_Queue_T.cpp" 01467 #endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ 01468 01469 #if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) 01470 #pragma implementation ("Message_Queue_T.cpp") 01471 #endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ 01472 01473 #include /**/ "ace/post.h" 01474 #endif /* ACE_MESSAGE_QUEUE_T_H */