00001 /* -*- C++ -*- */ 00002 00003 //============================================================================= 00004 /** 00005 * @file Message_Queue_T.h 00006 * 00007 * Message_Queue_T.h,v 4.61 2006/05/30 10:57:22 jwillemsen Exp 00008 * 00009 * @author Douglas C. Schmidt <schmidt@cs.wustl.edu> 00010 */ 00011 //============================================================================= 00012 00013 #ifndef ACE_MESSAGE_QUEUE_T_H 00014 #define ACE_MESSAGE_QUEUE_T_H 00015 #include /**/ "ace/pre.h" 00016 00017 #include "ace/Message_Queue.h" 00018 #include "ace/Synch_Traits.h" 00019 #include "ace/Guard_T.h" 00020 00021 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00022 # pragma once 00023 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00024 00025 ACE_BEGIN_VERSIONED_NAMESPACE_DECL 00026 00027 #if defined (ACE_VXWORKS) 00028 class ACE_Message_Queue_Vx; 00029 #endif /* defined (ACE_VXWORKS) */ 00030 00031 #if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0) 00032 class ACE_Message_Queue_NT; 00033 #endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */ 00034 00035 /** 00036 * @class ACE_Message_Queue 00037 * 00038 * @brief A message queueing facility with parameterized synchronization 00039 * capability. ACE_Message_Queue is modeled after the queueing facilities 00040 * in System V STREAMs. 00041 * 00042 * ACE_Message_Queue is the primary queueing facility for 00043 * messages in the ACE framework. It's one template argument parameterizes 00044 * the queue's synchronization. The argument specifies a synchronization 00045 * strategy. The two main strategies available for ACE_SYNCH_DECL are: 00046 * -# ACE_MT_SYNCH: all operations are thread-safe 00047 * -# ACE_NULL_SYNCH: no synchronization and no locking overhead 00048 * 00049 * All data passing through ACE_Message_Queue is in the form of 00050 * ACE_Message_Block objects. @sa ACE_Message_Block. 00051 */ 00052 template <ACE_SYNCH_DECL> 00053 class ACE_Message_Queue : public ACE_Message_Queue_Base 00054 { 00055 public: 00056 friend class ACE_Message_Queue_Iterator<ACE_SYNCH_USE>; 00057 friend class ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>; 00058 00059 // = Traits 00060 typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE> 00061 ITERATOR; 00062 typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE> 00063 REVERSE_ITERATOR; 00064 00065 // = Initialization and termination methods. 00066 //@{ 00067 /** 00068 * Initialize an ACE_Message_Queue. 00069 * 00070 * @param hwm High water mark. Determines how many bytes can be stored in a 00071 * queue before it's considered full. Supplier threads must block 00072 * until the queue is no longer full. 00073 * @param lwm Low water mark. Determines how many bytes must be in the queue 00074 * before supplier threads are allowed to enqueue additional 00075 * data. By default, the @a hwm equals @a lwm, which means 00076 * that suppliers will be able to enqueue new messages as soon as 00077 * a consumer removes any message from the queue. Making the low 00078 * water mark smaller than the high water mark forces consumers to 00079 * drain more messages from the queue before suppliers can enqueue 00080 * new messages, which can minimize the "silly window syndrome." 00081 * @param ns Notification strategy. Pointer to an object conforming to the 00082 * ACE_Notification_Strategy interface. If set, the object's 00083 * notify(void) method will be called each time data is added to 00084 * this ACE_Message_Queue. @see ACE_Reactor_Notification_Strategy. 00085 */ 00086 ACE_Message_Queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00087 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00088 ACE_Notification_Strategy *ns = 0); 00089 virtual int open (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00090 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00091 ACE_Notification_Strategy *ns = 0); 00092 //@} 00093 00094 /// Releases all resources from the message queue and marks it deactivated. 00095 /// @sa flush(). 00096 /// 00097 /// @retval The number of messages released from the queue; -1 on error. 00098 virtual int close (void); 00099 00100 /// Releases all resources from the message queue and marks it deactivated. 00101 virtual ~ACE_Message_Queue (void); 00102 00103 /// Releases all resources from the message queue but does not mark it 00104 /// deactivated. 00105 /// @sa close(). 00106 /** 00107 * This method holds the queue lock during this operation. 00108 * 00109 * @return The number of messages flushed; -1 on error. 00110 */ 00111 virtual int flush (void); 00112 00113 /// Release all resources from the message queue but do not mark it 00114 /// as deactivated. 00115 /** 00116 * @pre The caller must be holding the queue lock before calling this 00117 * method. 00118 * 00119 * @return The number of messages flushed. 00120 */ 00121 virtual int flush_i (void); 00122 00123 /** @name Enqueue and dequeue methods 00124 * 00125 * The enqueue and dequeue methods accept a timeout value passed as 00126 * an ACE_Time_Value *. In all cases, if the timeout pointer is 0, 00127 * the caller will block until action is possible. If the timeout pointer 00128 * is non-zero, the call will wait (if needed, subject to water mark 00129 * settings) until the absolute time specified in the referenced 00130 * ACE_Time_Value object is reached. If the time is reached before the 00131 * desired action is possible, the method will return -1 with errno set 00132 * to @c EWOULDBLOCK. Regardless of the timeout setting, however, 00133 * these methods will also fail and return -1 when the queue is closed, 00134 * deactivated, pulsed, or when a signal occurs. 00135 * 00136 * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of 00137 * ACE_Message_Queue, enqueueing, dequeueing, and how these operations are 00138 * affected by queue state transitions. 00139 */ 00140 //@{ 00141 /** 00142 * Retrieve a pointer to the first ACE_Message_Block in the queue 00143 * without removing it. 00144 * 00145 * @note Because the block whose pointer is returned is still on the queue, 00146 * another thread may dequeue the referenced block at any time, 00147 * including before the calling thread examines the peeked-at block. 00148 * Be very careful with this method in multithreaded queueing 00149 * situations. 00150 * 00151 * @param first_item Reference to an ACE_Message_Block * that will 00152 * point to the first block on the queue. The block 00153 * remains on the queue until this or another thread 00154 * dequeues it. 00155 * @param timeout The absolute time the caller will wait until 00156 * for a block to be queued. 00157 * 00158 * @retval >0 The number of ACE_Message_Blocks on the queue. 00159 * @retval -1 On failure. errno holds the reason. Common errno values are: 00160 * - EWOULDBLOCK: the timeout elapsed 00161 * - ESHUTDOWN: the queue was deactivated or pulsed 00162 */ 00163 virtual int peek_dequeue_head (ACE_Message_Block *&first_item, 00164 ACE_Time_Value *timeout = 0); 00165 00166 /** 00167 * Enqueue an ACE_Message_Block into the queue in accordance with 00168 * the ACE_Message_Block's priority (0 is lowest priority). FIFO 00169 * order is maintained when messages of the same priority are 00170 * inserted consecutively. 00171 * 00172 * @param new_item Pointer to an ACE_Message_Block that will be 00173 * added to the queue. The block's @c msg_priority() 00174 * method will be called to obtain the queueing priority. 00175 * @param timeout The absolute time the caller will wait until 00176 * for the block to be queued. 00177 * 00178 * @retval >0 The number of ACE_Message_Blocks on the queue after adding 00179 * the specified block. 00180 * @retval -1 On failure. errno holds the reason. Common errno values are: 00181 * - EWOULDBLOCK: the timeout elapsed 00182 * - ESHUTDOWN: the queue was deactivated or pulsed 00183 */ 00184 virtual int enqueue_prio (ACE_Message_Block *new_item, 00185 ACE_Time_Value *timeout = 0); 00186 00187 /** 00188 * Enqueue an ACE_Message_Block into the queue in accordance with the 00189 * block's deadline time. FIFO order is maintained when messages of 00190 * the same deadline time are inserted consecutively. 00191 * 00192 * @param new_item Pointer to an ACE_Message_Block that will be 00193 * added to the queue. The block's @c msg_deadline_time() 00194 * method will be called to obtain the relative queueing 00195 * position. 00196 * @param timeout The absolute time the caller will wait until 00197 * for the block to be queued. 00198 * 00199 * @retval >0 The number of ACE_Message_Blocks on the queue after adding 00200 * the specified block. 00201 * @retval -1 On failure. errno holds the reason. Common errno values are: 00202 * - EWOULDBLOCK: the timeout elapsed 00203 * - ESHUTDOWN: the queue was deactivated or pulsed 00204 */ 00205 virtual int enqueue_deadline (ACE_Message_Block *new_item, 00206 ACE_Time_Value *timeout = 0); 00207 00208 /** 00209 * @deprecated This is an alias for enqueue_prio(). It's only here for 00210 * backwards compatibility and will go away in a subsequent release. 00211 * Please use enqueue_prio() instead. 00212 */ 00213 virtual int enqueue (ACE_Message_Block *new_item, 00214 ACE_Time_Value *timeout = 0); 00215 00216 /** 00217 * Enqueue one or more ACE_Message_Block objects at the tail of the queue. 00218 * If the @a new_item @c next() pointer is non-zero, it is assumed to be the 00219 * start of a series of ACE_Message_Block objects connected via their 00220 * @c next() pointers. The series of blocks will be added to the queue in 00221 * the same order they are passed in as. 00222 * 00223 * @param new_item Pointer to an ACE_Message_Block that will be 00224 * added to the queue. If the block's @c next() pointer 00225 * is non-zero, all blocks chained from the @c next() 00226 * pointer are enqueued as well. 00227 * @param timeout The absolute time the caller will wait until 00228 * for the block to be queued. 00229 * 00230 * @retval >0 The number of ACE_Message_Blocks on the queue after adding 00231 * the specified block(s). 00232 * @retval -1 On failure. errno holds the reason. Common errno values are: 00233 * - EWOULDBLOCK: the timeout elapsed 00234 * - ESHUTDOWN: the queue was deactivated or pulsed 00235 */ 00236 virtual int enqueue_tail (ACE_Message_Block *new_item, 00237 ACE_Time_Value *timeout = 0); 00238 00239 /** 00240 * Enqueue one or more ACE_Message_Block objects at the head of the queue. 00241 * If the @a new_item @c next() pointer is non-zero, it is assumed to be the 00242 * start of a series of ACE_Message_Block objects connected via their 00243 * @c next() pointers. The series of blocks will be added to the queue in 00244 * the same order they are passed in as. 00245 * 00246 * @param new_item Pointer to an ACE_Message_Block that will be 00247 * added to the queue. If the block's @c next() pointer 00248 * is non-zero, all blocks chained from the @c next() 00249 * pointer are enqueued as well. 00250 * @param timeout The absolute time the caller will wait until 00251 * for the block to be queued. 00252 * 00253 * @retval >0 The number of ACE_Message_Blocks on the queue after adding 00254 * the specified block(s). 00255 * @retval -1 On failure. errno holds the reason. Common errno values are: 00256 * - EWOULDBLOCK: the timeout elapsed 00257 * - ESHUTDOWN: the queue was deactivated or pulsed 00258 */ 00259 virtual int enqueue_head (ACE_Message_Block *new_item, 00260 ACE_Time_Value *timeout = 0); 00261 00262 /// This method is an alias for the dequeue_head() method. 00263 virtual int dequeue (ACE_Message_Block *&first_item, 00264 ACE_Time_Value *timeout = 0); 00265 00266 /** 00267 * Dequeue the ACE_Message_Block at the head of the queue and return 00268 * a pointer to the dequeued block. 00269 * 00270 * @param first_item Reference to an ACE_Message_Block * that will 00271 * be set to the address of the dequeued block. 00272 * @param timeout The absolute time the caller will wait until 00273 * for a block to be dequeued. 00274 * 00275 * @retval >=0 The number of ACE_Message_Blocks remaining in the queue. 00276 * @retval -1 On failure. errno holds the reason. Common errno values are: 00277 * - EWOULDBLOCK: the timeout elapsed 00278 * - ESHUTDOWN: the queue was deactivated or pulsed 00279 */ 00280 virtual int dequeue_head (ACE_Message_Block *&first_item, 00281 ACE_Time_Value *timeout = 0); 00282 00283 /** 00284 * Dequeue the ACE_Message_Block that has the lowest priority (preserves 00285 * FIFO order for messages with the same priority) and return a pointer 00286 * to the dequeued block. 00287 * 00288 * @param first_item Reference to an ACE_Message_Block * that will 00289 * be set to the address of the dequeued block. 00290 * @param timeout The absolute time the caller will wait until 00291 * for a block to be dequeued. 00292 * 00293 * @retval >=0 The number of ACE_Message_Blocks remaining in the queue. 00294 * @retval -1 On failure. errno holds the reason. Common errno values are: 00295 * - EWOULDBLOCK: the timeout elapsed 00296 * - ESHUTDOWN: the queue was deactivated or pulsed 00297 */ 00298 virtual int dequeue_prio (ACE_Message_Block *&first_item, 00299 ACE_Time_Value *timeout = 0); 00300 00301 /** 00302 * Dequeue the ACE_Message_Block at the tail of the queue and return 00303 * a pointer to the dequeued block. 00304 * 00305 * @param dequeued Reference to an ACE_Message_Block * that will 00306 * be set to the address of the dequeued block. 00307 * @param timeout The absolute time the caller will wait until 00308 * for a block to be dequeued. 00309 * 00310 * @retval >=0 The number of ACE_Message_Blocks remaining in the queue. 00311 * @retval -1 On failure. errno holds the reason. Common errno values are: 00312 * - EWOULDBLOCK: the timeout elapsed 00313 * - ESHUTDOWN: the queue was deactivated or pulsed 00314 */ 00315 virtual int dequeue_tail (ACE_Message_Block *&dequeued, 00316 ACE_Time_Value *timeout = 0); 00317 00318 /** 00319 * Dequeue the ACE_Message_Block with the earliest deadline time and return 00320 * a pointer to the dequeued block. 00321 * 00322 * @param dequeued Reference to an ACE_Message_Block * that will 00323 * be set to the address of the dequeued block. 00324 * @param timeout The absolute time the caller will wait until 00325 * for a block to be dequeued. 00326 * 00327 * @retval >=0 The number of ACE_Message_Blocks remaining in the queue. 00328 * @retval -1 On failure. errno holds the reason. Common errno values are: 00329 * - EWOULDBLOCK: the timeout elapsed 00330 * - ESHUTDOWN: the queue was deactivated or pulsed 00331 */ 00332 virtual int dequeue_deadline (ACE_Message_Block *&dequeued, 00333 ACE_Time_Value *timeout = 0); 00334 //@} 00335 00336 // = Check if queue is full/empty. 00337 /// True if queue is full, else false. 00338 virtual int is_full (void); 00339 /// True if queue is empty, else false. 00340 virtual int is_empty (void); 00341 00342 /** @name Queue statistics methods 00343 */ 00344 //@{ 00345 00346 /** 00347 * Number of total bytes on the queue, i.e., sum of the message 00348 * block sizes. 00349 */ 00350 virtual size_t message_bytes (void); 00351 00352 /** 00353 * Number of total length on the queue, i.e., sum of the message 00354 * block lengths. 00355 */ 00356 virtual size_t message_length (void); 00357 00358 /** 00359 * Number of total messages on the queue. 00360 */ 00361 virtual size_t message_count (void); 00362 00363 // = Manual changes to these stats (used when queued message blocks 00364 // change size or lengths). 00365 /** 00366 * New value of the number of total bytes on the queue, i.e., sum of 00367 * the message block sizes. 00368 */ 00369 virtual void message_bytes (size_t new_size); 00370 /** 00371 * New value of the number of total length on the queue, i.e., sum 00372 * of the message block lengths. 00373 */ 00374 virtual void message_length (size_t new_length); 00375 00376 //@} 00377 00378 00379 /** @name Water mark (flow control) methods 00380 */ 00381 //@{ 00382 00383 /** 00384 * Get high watermark. 00385 */ 00386 virtual size_t high_water_mark (void); 00387 /** 00388 * Set the high watermark, which determines how many bytes can be 00389 * stored in a queue before it's considered "full." 00390 */ 00391 virtual void high_water_mark (size_t hwm); 00392 00393 /** 00394 * Get low watermark. 00395 */ 00396 virtual size_t low_water_mark (void); 00397 /** 00398 * Set the low watermark, which determines how many bytes must be in 00399 * the queue before supplier threads are allowed to enqueue 00400 * additional ACE_Message_Blocks. 00401 */ 00402 virtual void low_water_mark (size_t lwm); 00403 //@} 00404 00405 /** @name Activation and queue state methods 00406 * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of 00407 * queue states and transitions and how the transitions affect message 00408 * enqueueing and dequeueing operations. 00409 */ 00410 //@{ 00411 00412 /** 00413 * Deactivate the queue and wakeup all threads waiting on the queue 00414 * so they can continue. No messages are removed from the queue, 00415 * however. Any other operations called until the queue is 00416 * activated again will immediately return -1 with <errno> == 00417 * ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the 00418 * call and WAS_ACTIVE if queue was active before the call. 00419 */ 00420 virtual int deactivate (void); 00421 00422 /** 00423 * Reactivate the queue so that threads can enqueue and dequeue 00424 * messages again. Returns the state of the queue before the call. 00425 */ 00426 virtual int activate (void); 00427 00428 /** 00429 * Pulse the queue to wake up any waiting threads. Changes the 00430 * queue state to PULSED; future enqueue/dequeue operations proceed 00431 * as in ACTIVATED state. 00432 * 00433 * @return The queue's state before this call. 00434 */ 00435 virtual int pulse (void); 00436 00437 /// Returns the current state of the queue, which can be one of 00438 /// ACTIVATED, DEACTIVATED, or PULSED. 00439 virtual int state (void); 00440 00441 /// Returns true if the state of the queue is <DEACTIVATED>, 00442 /// but false if the queue's is <ACTIVATED> or <PULSED>. 00443 virtual int deactivated (void); 00444 //@} 00445 00446 /** @name Notification strategy methods 00447 */ 00448 //@{ 00449 00450 /** 00451 * This hook is automatically invoked by <enqueue_head>, 00452 * <enqueue_tail>, and <enqueue_prio> when a new item is inserted 00453 * into the queue. Subclasses can override this method to perform 00454 * specific notification strategies (e.g., signaling events for a 00455 * <WFMO_Reactor>, notifying a <Reactor>, etc.). In a 00456 * multi-threaded application with concurrent consumers, there is no 00457 * guarantee that the queue will be still be non-empty by the time 00458 * the notification occurs. 00459 */ 00460 virtual int notify (void); 00461 00462 /// Get the notification strategy for the <Message_Queue> 00463 virtual ACE_Notification_Strategy *notification_strategy (void); 00464 00465 /// Set the notification strategy for the <Message_Queue> 00466 virtual void notification_strategy (ACE_Notification_Strategy *s); 00467 //@} 00468 00469 /// Returns a reference to the lock used by the ACE_Message_Queue. 00470 virtual ACE_SYNCH_MUTEX_T &lock (void); 00471 00472 /// Dump the state of an object. 00473 virtual void dump (void) const; 00474 00475 /// Declare the dynamic allocation hooks. 00476 ACE_ALLOC_HOOK_DECLARE; 00477 00478 protected: 00479 // = Routines that actually do the enqueueing and dequeueing. 00480 00481 // These routines assume that locks are held by the corresponding 00482 // public methods. Since they are virtual, you can change the 00483 // queueing mechanism by subclassing from ACE_Message_Queue. 00484 00485 /// Enqueue an <ACE_Message_Block *> in accordance with its priority. 00486 virtual int enqueue_i (ACE_Message_Block *new_item); 00487 00488 /// Enqueue an <ACE_Message_Block *> in accordance with its deadline time. 00489 virtual int enqueue_deadline_i (ACE_Message_Block *new_item); 00490 00491 /// Enqueue an <ACE_Message_Block *> at the end of the queue. 00492 virtual int enqueue_tail_i (ACE_Message_Block *new_item); 00493 00494 /// Enqueue an <ACE_Message_Block *> at the head of the queue. 00495 virtual int enqueue_head_i (ACE_Message_Block *new_item); 00496 00497 /// Dequeue and return the <ACE_Message_Block *> at the head of the 00498 /// queue. 00499 virtual int dequeue_head_i (ACE_Message_Block *&first_item); 00500 00501 /// Dequeue and return the <ACE_Message_Block *> with the lowest 00502 /// priority. 00503 virtual int dequeue_prio_i (ACE_Message_Block *&dequeued); 00504 00505 /// Dequeue and return the <ACE_Message_Block *> at the tail of the 00506 /// queue. 00507 virtual int dequeue_tail_i (ACE_Message_Block *&first_item); 00508 00509 /// Dequeue and return the <ACE_Message_Block *> with the lowest 00510 /// deadline time. 00511 virtual int dequeue_deadline_i (ACE_Message_Block *&first_item); 00512 00513 // = Check the boundary conditions (assumes locks are held). 00514 00515 /// True if queue is full, else false. 00516 virtual int is_full_i (void); 00517 00518 /// True if queue is empty, else false. 00519 virtual int is_empty_i (void); 00520 00521 // = Implementation of the public <activate> and <deactivate> methods. 00522 00523 // These methods assume locks are held. 00524 00525 /** 00526 * Notifies all waiting threads that the queue has been deactivated 00527 * so they can wakeup and continue other processing. 00528 * No messages are removed from the queue. 00529 * 00530 * @param pulse If 0, the queue's state is changed to DEACTIVATED 00531 * and any other operations called until the queue is 00532 * reactivated will immediately return -1 with 00533 * errno == ESHUTDOWN. 00534 * If not zero, only the waiting threads are notified and 00535 * the queue's state changes to PULSED. 00536 * 00537 * @return The state of the queue before the call. 00538 */ 00539 virtual int deactivate_i (int pulse = 0); 00540 00541 /// Activate the queue. 00542 virtual int activate_i (void); 00543 00544 // = Helper methods to factor out common #ifdef code. 00545 00546 /// Wait for the queue to become non-full. 00547 virtual int wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, 00548 ACE_Time_Value *timeout); 00549 00550 /// Wait for the queue to become non-empty. 00551 virtual int wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, 00552 ACE_Time_Value *timeout); 00553 00554 /// Inform any threads waiting to enqueue that they can procede. 00555 virtual int signal_enqueue_waiters (void); 00556 00557 /// Inform any threads waiting to dequeue that they can procede. 00558 virtual int signal_dequeue_waiters (void); 00559 00560 /// Pointer to head of ACE_Message_Block list. 00561 ACE_Message_Block *head_; 00562 00563 /// Pointer to tail of ACE_Message_Block list. 00564 ACE_Message_Block *tail_; 00565 00566 /// Lowest number before unblocking occurs. 00567 size_t low_water_mark_; 00568 00569 /// Greatest number of bytes before blocking. 00570 size_t high_water_mark_; 00571 00572 /// Current number of bytes in the queue. 00573 size_t cur_bytes_; 00574 00575 /// Current length of messages in the queue. 00576 size_t cur_length_; 00577 00578 /// Current number of messages in the queue. 00579 size_t cur_count_; 00580 00581 /// The notification strategy used when a new message is enqueued. 00582 ACE_Notification_Strategy *notification_strategy_; 00583 00584 // = Synchronization primitives for controlling concurrent access. 00585 /// Protect queue from concurrent access. 00586 ACE_SYNCH_MUTEX_T lock_; 00587 00588 /// Used to make threads sleep until the queue is no longer empty. 00589 ACE_SYNCH_CONDITION_T not_empty_cond_; 00590 00591 /// Used to make threads sleep until the queue is no longer full. 00592 ACE_SYNCH_CONDITION_T not_full_cond_; 00593 00594 private: 00595 00596 // = Disallow these operations. 00597 ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue<ACE_SYNCH_USE> &)) 00598 ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue (const ACE_Message_Queue<ACE_SYNCH_USE> &)) 00599 }; 00600 00601 // This typedef is used to get around a compiler bug in g++/vxworks. 00602 typedef ACE_Message_Queue<ACE_SYNCH> ACE_DEFAULT_MESSAGE_QUEUE_TYPE; 00603 00604 00605 /** 00606 * @class ACE_Message_Queue_Iterator 00607 * 00608 * @brief Iterator for the ACE_Message_Queue. 00609 */ 00610 template <ACE_SYNCH_DECL> 00611 class ACE_Message_Queue_Iterator 00612 { 00613 public: 00614 // = Initialization method. 00615 ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue); 00616 00617 // = Iteration methods. 00618 /// Pass back the <entry> that hasn't been seen in the queue. 00619 /// Returns 0 when all items have been seen, else 1. 00620 int next (ACE_Message_Block *&entry); 00621 00622 /// Returns 1 when all items have been seen, else 0. 00623 int done (void) const; 00624 00625 /// Move forward by one element in the queue. Returns 0 when all the 00626 /// items in the set have been seen, else 1. 00627 int advance (void); 00628 00629 /// Dump the state of an object. 00630 void dump (void) const; 00631 00632 /// Declare the dynamic allocation hooks. 00633 ACE_ALLOC_HOOK_DECLARE; 00634 00635 private: 00636 /// Message_Queue we are iterating over. 00637 ACE_Message_Queue <ACE_SYNCH_USE> &queue_; 00638 00639 /// Keeps track of how far we've advanced... 00640 ACE_Message_Block *curr_; 00641 }; 00642 00643 /** 00644 * @class ACE_Message_Queue_Reverse_Iterator 00645 * 00646 * @brief Reverse Iterator for the ACE_Message_Queue. 00647 */ 00648 template <ACE_SYNCH_DECL> 00649 class ACE_Message_Queue_Reverse_Iterator 00650 { 00651 public: 00652 // = Initialization method. 00653 ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue); 00654 00655 // = Iteration methods. 00656 /// Pass back the <entry> that hasn't been seen in the queue. 00657 /// Returns 0 when all items have been seen, else 1. 00658 int next (ACE_Message_Block *&entry); 00659 00660 /// Returns 1 when all items have been seen, else 0. 00661 int done (void) const; 00662 00663 /// Move forward by one element in the queue. Returns 0 when all the 00664 /// items in the set have been seen, else 1. 00665 int advance (void); 00666 00667 /// Dump the state of an object. 00668 void dump (void) const; 00669 00670 /// Declare the dynamic allocation hooks. 00671 ACE_ALLOC_HOOK_DECLARE; 00672 00673 private: 00674 /// Message_Queue we are iterating over. 00675 ACE_Message_Queue <ACE_SYNCH_USE> &queue_; 00676 00677 /// Keeps track of how far we've advanced... 00678 ACE_Message_Block *curr_; 00679 }; 00680 00681 /** 00682 * @class ACE_Dynamic_Message_Queue 00683 * 00684 * @brief A derived class which adapts the ACE_Message_Queue 00685 * class in order to maintain dynamic priorities for enqueued 00686 * <ACE_Message_Blocks> and manage the queue order according 00687 * to these dynamic priorities. 00688 * 00689 * The messages in the queue are managed so as to preserve 00690 * a logical ordering with minimal overhead per enqueue and 00691 * dequeue operation. For this reason, the actual order of 00692 * messages in the linked list of the queue may differ from 00693 * their priority order. As time passes, a message may change 00694 * from pending status to late status, and eventually to beyond 00695 * late status. To minimize reordering overhead under this 00696 * design force, three separate boundaries are maintained 00697 * within the linked list of messages. Messages are dequeued 00698 * preferentially from the head of the pending portion, then 00699 * the head of the late portion, and finally from the head 00700 * of the beyond late portion. In this way, only the boundaries 00701 * need to be maintained (which can be done efficiently, as 00702 * aging messages maintain the same linked list order as they 00703 * progress from one status to the next), with no reordering 00704 * of the messages themselves, while providing correct priority 00705 * ordered dequeueing semantics. 00706 * Head and tail enqueue methods inherited from ACE_Message_Queue 00707 * are made private to prevent out-of-order messages from confusing 00708 * management of the various portions of the queue. Messages in 00709 * the pending portion of the queue whose priority becomes late 00710 * (according to the specific dynamic strategy) advance into 00711 * the late portion of the queue. Messages in the late portion 00712 * of the queue whose priority becomes later than can be represented 00713 * advance to the beyond_late portion of the queue. These behaviors 00714 * support a limited schedule overrun, with pending messages prioritized 00715 * ahead of late messages, and late messages ahead of beyond late 00716 * messages. These behaviors can be modified in derived classes by 00717 * providing alternative definitions for the appropriate virtual methods. 00718 * When filled with messages, the queue's linked list should look like: 00719 * H T 00720 * | | 00721 * B - B - B - B - L - L - L - P - P - P - P - P 00722 * | | | | | | 00723 * BH BT LH LT PH PT 00724 * Where the symbols are as follows: 00725 * H = Head of the entire list 00726 * T = Tail of the entire list 00727 * B = Beyond late message 00728 * BH = Beyond late messages Head 00729 * BT = Beyond late messages Tail 00730 * L = Late message 00731 * LH = Late messages Head 00732 * LT = Late messages Tail 00733 * P = Pending message 00734 * PH = Pending messages Head 00735 * PT = Pending messages Tail 00736 * Caveat: the virtual methods enqueue_tail, enqueue_head, 00737 * and peek_dequeue_head have semantics for the static 00738 * message queues that cannot be guaranteed for dynamic 00739 * message queues. The peek_dequeue_head method just 00740 * calls the base class method, while the two enqueue 00741 * methods call the priority enqueue method. The 00742 * order of messages in the dynamic queue is a function 00743 * of message deadlines and how long they are in the 00744 * queues. You can manipulate these in some cases to 00745 * ensure the correct semantics, but that is not a 00746 * very stable or portable approach (discouraged). 00747 */ 00748 template <ACE_SYNCH_DECL> 00749 class ACE_Dynamic_Message_Queue : public ACE_Message_Queue<ACE_SYNCH_USE> 00750 { 00751 public: 00752 // = Initialization and termination methods. 00753 ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy, 00754 size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00755 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00756 ACE_Notification_Strategy * = 0); 00757 00758 /// Close down the message queue and release all resources. 00759 virtual ~ACE_Dynamic_Message_Queue (void); 00760 00761 /** 00762 * Detach all messages with status given in the passed flags from 00763 * the queue and return them by setting passed head and tail pointers 00764 * to the linked list they comprise. This method is intended primarily 00765 * as a means of periodically harvesting messages that have missed 00766 * their deadlines, but is available in its most general form. All 00767 * messages are returned in priority order, from head to tail, as of 00768 * the time this method was called. 00769 */ 00770 virtual int remove_messages (ACE_Message_Block *&list_head, 00771 ACE_Message_Block *&list_tail, 00772 u_int status_flags); 00773 00774 /** 00775 * Dequeue and return the <ACE_Message_Block *> at the head of the 00776 * queue. Returns -1 on failure, else the number of items still on 00777 * the queue. 00778 */ 00779 virtual int dequeue_head (ACE_Message_Block *&first_item, 00780 ACE_Time_Value *timeout = 0); 00781 00782 /// Dump the state of the queue. 00783 virtual void dump (void) const; 00784 00785 /** 00786 * Just call priority enqueue method: tail enqueue semantics for dynamic 00787 * message queues are unstable: the message may or may not be where 00788 * it was placed after the queue is refreshed prior to the next 00789 * enqueue or dequeue operation. 00790 */ 00791 virtual int enqueue_tail (ACE_Message_Block *new_item, 00792 ACE_Time_Value *timeout = 0); 00793 00794 /** 00795 * Just call priority enqueue method: head enqueue semantics for dynamic 00796 * message queues are unstable: the message may or may not be where 00797 * it was placed after the queue is refreshed prior to the next 00798 * enqueue or dequeue operation. 00799 */ 00800 virtual int enqueue_head (ACE_Message_Block *new_item, 00801 ACE_Time_Value *timeout = 0); 00802 00803 00804 /// Declare the dynamic allocation hooks. 00805 ACE_ALLOC_HOOK_DECLARE; 00806 00807 protected: 00808 00809 /** 00810 * Enqueue an <ACE_Message_Block *> in accordance with its priority. 00811 * priority may be *dynamic* or *static* or a combination or *both* 00812 * It calls the priority evaluation function passed into the Dynamic 00813 * Message Queue constructor to update the priorities of all 00814 * enqueued messages. 00815 */ 00816 virtual int enqueue_i (ACE_Message_Block *new_item); 00817 00818 /// Enqueue a message in priority order within a given priority status sublist 00819 virtual int sublist_enqueue_i (ACE_Message_Block *new_item, 00820 const ACE_Time_Value ¤t_time, 00821 ACE_Message_Block *&sublist_head, 00822 ACE_Message_Block *&sublist_tail, 00823 ACE_Dynamic_Message_Strategy::Priority_Status status); 00824 00825 /** 00826 * Dequeue and return the <ACE_Message_Block *> at the head of the 00827 * logical queue. Attempts first to dequeue from the pending 00828 * portion of the queue, or if that is empty from the late portion, 00829 * or if that is empty from the beyond late portion, or if that is 00830 * empty just sets the passed pointer to zero and returns -1. 00831 */ 00832 virtual int dequeue_head_i (ACE_Message_Block *&first_item); 00833 00834 /// Refresh the queue using the strategy 00835 /// specific priority status function. 00836 virtual int refresh_queue (const ACE_Time_Value & current_time); 00837 00838 /// Refresh the pending queue using the strategy 00839 /// specific priority status function. 00840 virtual int refresh_pending_queue (const ACE_Time_Value & current_time); 00841 00842 /// Refresh the late queue using the strategy 00843 /// specific priority status function. 00844 virtual int refresh_late_queue (const ACE_Time_Value & current_time); 00845 00846 /// Pointer to head of the pending messages 00847 ACE_Message_Block *pending_head_; 00848 00849 /// Pointer to tail of the pending messages 00850 ACE_Message_Block *pending_tail_; 00851 00852 /// Pointer to head of the late messages 00853 ACE_Message_Block *late_head_; 00854 00855 /// Pointer to tail of the late messages 00856 ACE_Message_Block *late_tail_; 00857 00858 /// Pointer to head of the beyond late messages 00859 ACE_Message_Block *beyond_late_head_; 00860 00861 /// Pointer to tail of the beyond late messages 00862 ACE_Message_Block *beyond_late_tail_; 00863 00864 /// Pointer to a dynamic priority evaluation function. 00865 ACE_Dynamic_Message_Strategy &message_strategy_; 00866 00867 private: 00868 // = Disallow public access to these operations. 00869 00870 ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &)) 00871 ACE_UNIMPLEMENTED_FUNC (ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &)) 00872 00873 // provide definitions for these (just call base class method), 00874 // but make them private so they're not accessible outside the class 00875 00876 /// Private method to hide public base class method: just calls base class method 00877 virtual int peek_dequeue_head (ACE_Message_Block *&first_item, 00878 ACE_Time_Value *timeout = 0); 00879 00880 }; 00881 00882 /** 00883 * @class ACE_Message_Queue_Factory 00884 * 00885 * @brief ACE_Message_Queue_Factory is a static factory class template which 00886 * provides a separate factory method for each of the major kinds of 00887 * priority based message dispatching: static, earliest deadline first 00888 * (EDF), and minimum laxity first (MLF). 00889 * 00890 * The ACE_Dynamic_Message_Queue class assumes responsibility for 00891 * releasing the resources of the strategy with which it was 00892 * constructed: the user of a message queue constructed by 00893 * any of these factory methods is only responsible for 00894 * ensuring destruction of the message queue itself. 00895 */ 00896 template <ACE_SYNCH_DECL> 00897 class ACE_Message_Queue_Factory 00898 { 00899 public: 00900 /// Factory method for a statically prioritized ACE_Message_Queue 00901 static ACE_Message_Queue<ACE_SYNCH_USE> * 00902 create_static_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00903 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00904 ACE_Notification_Strategy * = 0); 00905 00906 /// Factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue 00907 static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * 00908 create_deadline_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00909 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00910 ACE_Notification_Strategy * = 0, 00911 u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 00912 u_long static_bit_field_shift = 10, // 10 low order bits 00913 u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 00914 u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) 00915 00916 /// Factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue 00917 static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * 00918 create_laxity_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00919 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00920 ACE_Notification_Strategy * = 0, 00921 u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 00922 u_long static_bit_field_shift = 10, // 10 low order bits 00923 u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 00924 u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) 00925 00926 00927 #if defined (ACE_VXWORKS) 00928 00929 /// Factory method for a wrapped VxWorks message queue 00930 static ACE_Message_Queue_Vx * 00931 create_Vx_message_queue (size_t max_messages, size_t max_message_length, 00932 ACE_Notification_Strategy *ns = 0); 00933 00934 #endif /* defined (ACE_VXWORKS) */ 00935 00936 #if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0) 00937 00938 /// Factory method for a NT message queue. 00939 static ACE_Message_Queue_NT * 00940 create_NT_message_queue (size_t max_threads); 00941 00942 #endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */ 00943 }; 00944 00945 /** 00946 * @class ACE_Message_Queue_Ex 00947 * 00948 * @brief A threaded message queueing facility, modeled after the 00949 * queueing facilities in System V STREAMs. 00950 * 00951 * An <ACE_Message_Queue_Ex> is a strongly-typed version of the 00952 * ACE_Message_Queue. If 00953 * <ACE_SYNCH_DECL> is <ACE_MT_SYNCH> then all operations are 00954 * thread-safe. Otherwise, if it's <ACE_NULL_SYNCH> then there's no 00955 * locking overhead. 00956 */ 00957 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> 00958 class ACE_Message_Queue_Ex 00959 { 00960 public: 00961 00962 // = Default priority value. 00963 enum 00964 { 00965 DEFAULT_PRIORITY = 0 00966 }; 00967 00968 #if 0 00969 // @@ Iterators are not implemented yet... 00970 00971 friend class ACE_Message_Queue_Iterator<ACE_SYNCH_USE>; 00972 friend class ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>; 00973 00974 // = Traits 00975 typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE> 00976 ITERATOR; 00977 typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE> 00978 REVERSE_ITERATOR; 00979 #endif /* 0 */ 00980 00981 // = Initialization and termination methods. 00982 00983 /** 00984 * Initialize an ACE_Message_Queue. The <high_water_mark> 00985 * determines how many bytes can be stored in a queue before it's 00986 * considered "full." Supplier threads must block until the queue 00987 * is no longer full. The <low_water_mark> determines how many 00988 * bytes must be in the queue before supplier threads are allowed to 00989 * enqueue additional ACE_Message_Blocks. By default, the 00990 * <high_water_mark> equals the <low_water_mark>, which means that 00991 * suppliers will be able to enqueue new messages as soon as a 00992 * consumer removes any message from the queue. Making the 00993 * <low_water_mark> smaller than the <high_water_mark> forces 00994 * consumers to drain more messages from the queue before suppliers 00995 * can enqueue new messages, which can minimize the "silly window 00996 * syndrome." 00997 */ 00998 ACE_Message_Queue_Ex (size_t high_water_mark = ACE_Message_Queue_Base::DEFAULT_HWM, 00999 size_t low_water_mark = ACE_Message_Queue_Base::DEFAULT_LWM, 01000 ACE_Notification_Strategy * = 0); 01001 01002 /** 01003 * Initialize an ACE_Message_Queue. The <high_water_mark> 01004 * determines how many bytes can be stored in a queue before it's 01005 * considered "full." Supplier threads must block until the queue 01006 * is no longer full. The <low_water_mark> determines how many 01007 * bytes must be in the queue before supplier threads are allowed to 01008 * enqueue additional ACE_Message_Blocks. By default, the 01009 * <high_water_mark> equals the <low_water_mark>, which means that 01010 * suppliers will be able to enqueue new messages as soon as a 01011 * consumer removes any message from the queue. Making the 01012 * <low_water_mark> smaller than the <high_water_mark> forces 01013 * consumers to drain more messages from the queue before suppliers 01014 * can enqueue new messages, which can minimize the "silly window 01015 * syndrome." 01016 */ 01017 virtual int open (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 01018 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 01019 ACE_Notification_Strategy * = 0); 01020 01021 /// Close down the message queue and release all resources. 01022 virtual int close (void); 01023 01024 /// Close down the message queue and release all resources. 01025 virtual ~ACE_Message_Queue_Ex (void); 01026 01027 /// Release all resources from the message queue but do not mark it as deactivated. 01028 /// This method holds the queue lock during this operation. Returns the number of 01029 /// messages flushed. 01030 virtual int flush (void); 01031 01032 /// Release all resources from the message queue but do not mark it as 01033 /// deactivated. This method does not hold the queue lock during this 01034 /// operation, i.e., it assume the lock is held externally. 01035 /// Returns the number of messages flushed. 01036 virtual int flush_i (void); 01037 01038 // = Enqueue and dequeue methods. 01039 01040 // For the following enqueue and dequeue methods if <timeout> == 0, 01041 // the caller will block until action is possible, else will wait 01042 // until the absolute time specified in *<timeout> elapses). These 01043 // calls will return, however, when queue is closed, deactivated, 01044 // when a signal occurs, or if the time specified in timeout 01045 // elapses, (in which case errno = EWOULDBLOCK). 01046 01047 /** 01048 * Retrieve the first <ACE_MESSAGE_TYPE> without removing it. Note 01049 * that <timeout> uses <{absolute}> time rather than <{relative}> 01050 * time. If the <timeout> elapses without receiving a message -1 is 01051 * returned and <errno> is set to <EWOULDBLOCK>. If the queue is 01052 * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>. 01053 * Otherwise, returns -1 on failure, else the number of items still 01054 * on the queue. 01055 */ 01056 virtual int peek_dequeue_head (ACE_MESSAGE_TYPE *&first_item, 01057 ACE_Time_Value *timeout = 0); 01058 01059 /** 01060 * Enqueue an <ACE_MESSAGE_TYPE *> into the <Message_Queue> in 01061 * accordance with its <msg_priority> (0 is lowest priority). FIFO 01062 * order is maintained when messages of the same priority are 01063 * inserted consecutively. Note that <timeout> uses <{absolute}> 01064 * time rather than <{relative}> time. If the <timeout> elapses 01065 * without receiving a message -1 is returned and <errno> is set to 01066 * <EWOULDBLOCK>. If the queue is deactivated -1 is returned and 01067 * <errno> is set to <ESHUTDOWN>. Otherwise, returns -1 on failure, 01068 * else the number of items still on the queue. 01069 */ 01070 virtual int enqueue_prio (ACE_MESSAGE_TYPE *new_item, 01071 ACE_Time_Value *timeout = 0); 01072 01073 /** 01074 * Enqueue an <ACE_MESSAGE_TYPE *> into the <Message_Queue> in 01075 * accordance with its <msg_deadline_time>. FIFO 01076 * order is maintained when messages of the same deadline time are 01077 * inserted consecutively. Note that <timeout> uses <{absolute}> 01078 * time rather than <{relative}> time. If the <timeout> elapses 01079 * without receiving a message -1 is returned and <errno> is set to 01080 * <EWOULDBLOCK>. If the queue is deactivated -1 is returned and 01081 * <errno> is set to <ESHUTDOWN>. Otherwise, returns -1 on failure, 01082 * else the number of items still on the queue. 01083 */ 01084 virtual int enqueue_deadline (ACE_MESSAGE_TYPE *new_item, 01085 ACE_Time_Value *timeout = 0); 01086 01087 /** 01088 * This is an alias for <enqueue_prio>. It's only here for 01089 * backwards compatibility and will go away in a subsequent release. 01090 * Please use <enqueue_prio> instead. Note that <timeout> uses 01091 * <{absolute}> time rather than <{relative}> time. 01092 */ 01093 virtual int enqueue (ACE_MESSAGE_TYPE *new_item, 01094 ACE_Time_Value *timeout = 0); 01095 01096 /** 01097 * Enqueue an <ACE_MESSAGE_TYPE *> at the end of the queue. Note 01098 * that <timeout> uses <{absolute}> time rather than <{relative}> 01099 * time. If the <timeout> elapses without receiving a message -1 is 01100 * returned and <errno> is set to <EWOULDBLOCK>. If the queue is 01101 * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>. 01102 * Otherwise, returns -1 on failure, else the number of items still 01103 * on the queue. 01104 */ 01105 virtual int enqueue_tail (ACE_MESSAGE_TYPE *new_item, 01106 ACE_Time_Value *timeout = 0); 01107 01108 /** 01109 * Enqueue an <ACE_MESSAGE_TYPE *> at the head of the queue. Note 01110 * that <timeout> uses <{absolute}> time rather than <{relative}> 01111 * time. If the <timeout> elapses without receiving a message -1 is 01112 * returned and <errno> is set to <EWOULDBLOCK>. If the queue is 01113 * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>. 01114 * Otherwise, returns -1 on failure, else the number of items still 01115 * on the queue. 01116 */ 01117 virtual int enqueue_head (ACE_MESSAGE_TYPE *new_item, 01118 ACE_Time_Value *timeout = 0); 01119 01120 /// This method is an alias for the following <dequeue_head> method. 01121 virtual int dequeue (ACE_MESSAGE_TYPE *&first_item, 01122 ACE_Time_Value *timeout = 0); 01123 // This method is an alias for the following <dequeue_head> method. 01124 01125 /** 01126 * Dequeue and return the <ACE_MESSAGE_TYPE *> at the head of the 01127 * queue. Note that <timeout> uses <{absolute}> time rather than 01128 * <{relative}> time. If the <timeout> elapses without receiving a 01129 * message -1 is returned and <errno> is set to <EWOULDBLOCK>. If 01130 * the queue is deactivated -1 is returned and <errno> is set to 01131 * <ESHUTDOWN>. Otherwise, returns -1 on failure, else the number 01132 * of items still on the queue. 01133 */ 01134 virtual int dequeue_head (ACE_MESSAGE_TYPE *&first_item, 01135 ACE_Time_Value *timeout = 0); 01136 01137 /** 01138 * Dequeue and return the <ACE_MESSAGE_TYPE *> that has the lowest 01139 * priority. Note that <timeout> uses <{absolute}> time rather than 01140 * <{relative}> time. If the <timeout> elapses without receiving a 01141 * message -1 is returned and <errno> is set to <EWOULDBLOCK>. If 01142 * the queue is deactivated -1 is returned and <errno> is set to 01143 * <ESHUTDOWN>. Otherwise, returns -1 on failure, else the number 01144 * of items still on the queue. 01145 */ 01146 virtual int dequeue_prio (ACE_MESSAGE_TYPE *&dequeued, 01147 ACE_Time_Value *timeout = 0); 01148 01149 /** 01150 * Dequeue and return the <ACE_MESSAGE_TYPE *> at the tail of the 01151 * queue. Note that <timeout> uses <{absolute}> time rather than 01152 * <{relative}> time. If the <timeout> elapses without receiving a 01153 * message -1 is returned and <errno> is set to <EWOULDBLOCK>. If 01154 * the queue is deactivated -1 is returned and <errno> is set to 01155 * <ESHUTDOWN>. Otherwise, returns -1 on failure, else the number 01156 * of items still on the queue. 01157 */ 01158 virtual int dequeue_tail (ACE_MESSAGE_TYPE *&dequeued, 01159 ACE_Time_Value *timeout = 0); 01160 01161 /** 01162 * Dequeue and return the <ACE_MESSAGE_TYPE *> with the lowest 01163 * deadline time. Note that <timeout> uses <{absolute}> time rather than 01164 * <{relative}> time. If the <timeout> elapses without receiving a 01165 * message -1 is returned and <errno> is set to <EWOULDBLOCK>. If 01166 * the queue is deactivated -1 is returned and <errno> is set to 01167 * <ESHUTDOWN>. Otherwise, returns -1 on failure, else the number 01168 * of items still on the queue. 01169 */ 01170 virtual int dequeue_deadline (ACE_MESSAGE_TYPE *&dequeued, 01171 ACE_Time_Value *timeout = 0); 01172 01173 // = Check if queue is full/empty. 01174 /// True if queue is full, else false. 01175 virtual int is_full (void); 01176 /// True if queue is empty, else false. 01177 virtual int is_empty (void); 01178 01179 01180 // = Queue statistic methods. 01181 /** 01182 * Number of total bytes on the queue, i.e., sum of the message 01183 * block sizes. 01184 */ 01185 virtual size_t message_bytes (void); 01186 /** 01187 * Number of total length on the queue, i.e., sum of the message 01188 * block lengths. 01189 */ 01190 virtual size_t message_length (void); 01191 /** 01192 * Number of total messages on the queue. 01193 */ 01194 virtual size_t message_count (void); 01195 01196 // = Manual changes to these stats (used when queued message blocks 01197 // change size or lengths). 01198 /** 01199 * New value of the number of total bytes on the queue, i.e., sum of 01200 * the message block sizes. 01201 */ 01202 virtual void message_bytes (size_t new_size); 01203 /** 01204 * New value of the number of total length on the queue, i.e., sum 01205 * of the message block lengths. 01206 */ 01207 virtual void message_length (size_t new_length); 01208 01209 // = Flow control methods. 01210 /** 01211 * Get high watermark. 01212 */ 01213 virtual size_t high_water_mark (void); 01214 /** 01215 * Set the high watermark, which determines how many bytes can be 01216 * stored in a queue before it's considered "full." 01217 */ 01218 virtual void high_water_mark (size_t hwm); 01219 01220 /** 01221 * Get low watermark. 01222 */ 01223 virtual size_t low_water_mark (void); 01224 /** 01225 * Set the low watermark, which determines how many bytes must be in 01226 * the queue before supplier threads are allowed to enqueue 01227 * additional <ACE_MESSAGE_TYPE>s. 01228 */ 01229 virtual void low_water_mark (size_t lwm); 01230 01231 // = Activation control methods. 01232 01233 /** 01234 * Deactivate the queue and wakeup all threads waiting on the queue 01235 * so they can continue. No messages are removed from the queue, 01236 * however. Any other operations called until the queue is 01237 * activated again will immediately return -1 with <errno> == 01238 * ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the 01239 * call and WAS_ACTIVE if queue was active before the call. 01240 */ 01241 virtual int deactivate (void); 01242 01243 /** 01244 * Reactivate the queue so that threads can enqueue and dequeue 01245 * messages again. Returns the state of the queue before the call. 01246 */ 01247 virtual int activate (void); 01248 01249 /** 01250 * Pulse the queue to wake up any waiting threads. Changes the 01251 * queue state to PULSED; future enqueue/dequeue operations proceed 01252 * as in ACTIVATED state. 01253 * 01254 * @retval The queue's state before this call. 01255 */ 01256 virtual int pulse (void); 01257 01258 /// Returns the current state of the queue, which can be one of 01259 /// ACTIVATED, DEACTIVATED, or PULSED. 01260 virtual int state (void); 01261 01262 /// Returns true if the state of the queue is DEACTIVATED, 01263 /// but false if the queue's state is ACTIVATED or PULSED. 01264 virtual int deactivated (void); 01265 01266 // = Notification hook. 01267 01268 /** 01269 * This hook is automatically invoked by <enqueue_head>, 01270 * <enqueue_tail>, and <enqueue_prio> when a new item is inserted 01271 * into the queue. Subclasses can override this method to perform 01272 * specific notification strategies (e.g., signaling events for a 01273 * <WFMO_Reactor>, notifying a <Reactor>, etc.). In a 01274 * multi-threaded application with concurrent consumers, there is no 01275 * guarantee that the queue will be still be non-empty by the time 01276 * the notification occurs. 01277 */ 01278 virtual int notify (void); 01279 01280 /// Get the notification strategy for the <Message_Queue> 01281 virtual ACE_Notification_Strategy *notification_strategy (void); 01282 01283 /// Set the notification strategy for the <Message_Queue> 01284 virtual void notification_strategy (ACE_Notification_Strategy *s); 01285 01286 /// Returns a reference to the lock used by the <ACE_Message_Queue_Ex>. 01287 virtual ACE_SYNCH_MUTEX_T &lock (void); 01288 01289 /// Dump the state of an object. 01290 virtual void dump (void) const; 01291 01292 /// Declare the dynamic allocation hooks. 01293 ACE_ALLOC_HOOK_DECLARE; 01294 01295 protected: 01296 /// Implement this via an ACE_Message_Queue. 01297 ACE_Message_Queue<ACE_SYNCH_USE> queue_; 01298 }; 01299 01300 ACE_END_VERSIONED_NAMESPACE_DECL 01301 01302 #if defined (ACE_TEMPLATES_REQUIRE_SOURCE) 01303 #include "ace/Message_Queue_T.cpp" 01304 #endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ 01305 01306 #if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) 01307 #pragma implementation ("Message_Queue_T.cpp") 01308 #endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ 01309 01310 #include /**/ "ace/post.h" 01311 #endif /* ACE_MESSAGE_QUEUE_T_H */