00001 /* -*- C++ -*- */ 00002 00003 //============================================================================= 00004 /** 00005 * @file Message_Queue_T.h 00006 * 00007 * $Id: Message_Queue_T.h 81691 2008-05-14 11:09:21Z johnnyw $ 00008 * 00009 * @author Douglas C. Schmidt <schmidt@cs.wustl.edu> 00010 */ 00011 //============================================================================= 00012 00013 #ifndef ACE_MESSAGE_QUEUE_T_H 00014 #define ACE_MESSAGE_QUEUE_T_H 00015 00016 #include /**/ "ace/pre.h" 00017 00018 #include "ace/Message_Queue.h" 00019 #include "ace/Dynamic_Message_Strategy.h" 00020 #include "ace/Synch_Traits.h" 00021 #include "ace/Guard_T.h" 00022 00023 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00024 # pragma once 00025 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00026 00027 ACE_BEGIN_VERSIONED_NAMESPACE_DECL 00028 00029 #if defined (ACE_VXWORKS) 00030 class ACE_Message_Queue_Vx; 00031 #endif /* defined (ACE_VXWORKS) */ 00032 00033 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) 00034 class ACE_Message_Queue_NT; 00035 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO*/ 00036 00037 #if defined (ACE_HAS_MONITOR_POINTS) && ACE_HAS_MONITOR_POINTS == 1 00038 namespace ACE 00039 { 00040 namespace Monitor_Control 00041 { 00042 class Size_Monitor; 00043 } 00044 } 00045 #endif /* ACE_HAS_MONITOR_POINTS==1 */ 00046 00047 /** 00048 * @class ACE_Message_Queue 00049 * 00050 * @brief A message queueing facility with parameterized synchronization 00051 * capability. ACE_Message_Queue is modeled after the queueing facilities 00052 * in System V STREAMs. 00053 * 00054 * ACE_Message_Queue is the primary queueing facility for 00055 * messages in the ACE framework. It's one template argument parameterizes 00056 * the queue's synchronization. The argument specifies a synchronization 00057 * strategy. The two main strategies available for ACE_SYNCH_DECL are: 00058 * -# ACE_MT_SYNCH: all operations are thread-safe 00059 * -# ACE_NULL_SYNCH: no synchronization and no locking overhead 00060 * 00061 * All data passing through ACE_Message_Queue is in the form of 00062 * ACE_Message_Block objects. @sa ACE_Message_Block. 00063 */ 00064 template <ACE_SYNCH_DECL> 00065 class ACE_Message_Queue : public ACE_Message_Queue_Base 00066 { 00067 public: 00068 friend class ACE_Message_Queue_Iterator<ACE_SYNCH_USE>; 00069 friend class ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>; 00070 00071 // = Traits 00072 typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE> 00073 ITERATOR; 00074 typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE> 00075 REVERSE_ITERATOR; 00076 00077 /** 00078 * @name Initialization methods 00079 */ 00080 //@{ 00081 /** 00082 * Initialize an ACE_Message_Queue. 00083 * 00084 * @param hwm High water mark. Determines how many bytes can be stored in a 00085 * queue before it's considered full. Supplier threads must block 00086 * until the queue is no longer full. 00087 * @param lwm Low water mark. Determines how many bytes must be in the queue 00088 * before supplier threads are allowed to enqueue additional 00089 * data. By default, the @a hwm equals @a lwm, which means 00090 * that suppliers will be able to enqueue new messages as soon as 00091 * a consumer removes any message from the queue. Making the low 00092 * water mark smaller than the high water mark forces consumers to 00093 * drain more messages from the queue before suppliers can enqueue 00094 * new messages, which can minimize the "silly window syndrome." 00095 * @param ns Notification strategy. Pointer to an object conforming to the 00096 * ACE_Notification_Strategy interface. If set, the object's 00097 * notify(void) method will be called each time data is added to 00098 * this ACE_Message_Queue. @see ACE_Reactor_Notification_Strategy. 00099 */ 00100 ACE_Message_Queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00101 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00102 ACE_Notification_Strategy *ns = 0); 00103 virtual int open (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00104 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00105 ACE_Notification_Strategy *ns = 0); 00106 //@} 00107 00108 /// Releases all resources from the message queue and marks it deactivated. 00109 /// @sa flush(). 00110 /// 00111 /// @retval The number of messages released from the queue; -1 on error. 00112 virtual int close (void); 00113 00114 /// Releases all resources from the message queue and marks it deactivated. 00115 virtual ~ACE_Message_Queue (void); 00116 00117 /** 00118 * Releases all resources from the message queue but does not mark it 00119 * deactivated. This method holds the queue lock during this operation. 00120 * @sa close(). 00121 * 00122 * @return The number of messages flushed; -1 on error. 00123 */ 00124 virtual int flush (void); 00125 00126 /** 00127 * Release all resources from the message queue but do not mark it 00128 * as deactivated. 00129 * 00130 * @pre The caller must be holding the queue lock before calling this 00131 * method. 00132 * 00133 * @return The number of messages flushed. 00134 */ 00135 virtual int flush_i (void); 00136 00137 /** @name Enqueue and dequeue methods 00138 * 00139 * The enqueue and dequeue methods accept a timeout value passed as 00140 * an ACE_Time_Value *. In all cases, if the timeout pointer is 0, 00141 * the caller will block until action is possible. If the timeout pointer 00142 * is non-zero, the call will wait (if needed, subject to water mark 00143 * settings) until the absolute time specified in the referenced 00144 * ACE_Time_Value object is reached. If the time is reached before the 00145 * desired action is possible, the method will return -1 with errno set 00146 * to @c EWOULDBLOCK. Regardless of the timeout setting, however, 00147 * these methods will also fail and return -1 when the queue is closed, 00148 * deactivated, pulsed, or when a signal occurs. 00149 * 00150 * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of 00151 * ACE_Message_Queue, enqueueing, dequeueing, and how these operations are 00152 * affected by queue state transitions. 00153 */ 00154 //@{ 00155 /** 00156 * Retrieve a pointer to the first ACE_Message_Block in the queue 00157 * without removing it. 00158 * 00159 * @note Because the block whose pointer is returned is still on the queue, 00160 * another thread may dequeue the referenced block at any time, 00161 * including before the calling thread examines the peeked-at block. 00162 * Be very careful with this method in multithreaded queueing 00163 * situations. 00164 * 00165 * @param first_item Reference to an ACE_Message_Block * that will 00166 * point to the first block on the queue. The block 00167 * remains on the queue until this or another thread 00168 * dequeues it. 00169 * @param timeout The absolute time the caller will wait until 00170 * for a block to be queued. 00171 * 00172 * @retval >0 The number of ACE_Message_Blocks on the queue. 00173 * @retval -1 On failure. errno holds the reason. Common errno values are: 00174 * - EWOULDBLOCK: the timeout elapsed 00175 * - ESHUTDOWN: the queue was deactivated or pulsed 00176 */ 00177 virtual int peek_dequeue_head (ACE_Message_Block *&first_item, 00178 ACE_Time_Value *timeout = 0); 00179 00180 /** 00181 * Enqueue an ACE_Message_Block into the queue in accordance with 00182 * the ACE_Message_Block's priority (0 is lowest priority). FIFO 00183 * order is maintained when messages of the same priority are 00184 * inserted consecutively. 00185 * 00186 * @param new_item Pointer to an ACE_Message_Block that will be 00187 * added to the queue. The block's @c msg_priority() 00188 * method will be called to obtain the queueing priority. 00189 * @param timeout The absolute time the caller will wait until 00190 * for the block to be queued. 00191 * 00192 * @retval >0 The number of ACE_Message_Blocks on the queue after adding 00193 * the specified block. 00194 * @retval -1 On failure. errno holds the reason. Common errno values are: 00195 * - EWOULDBLOCK: the timeout elapsed 00196 * - ESHUTDOWN: the queue was deactivated or pulsed 00197 */ 00198 virtual int enqueue_prio (ACE_Message_Block *new_item, 00199 ACE_Time_Value *timeout = 0); 00200 00201 /** 00202 * Enqueue an ACE_Message_Block into the queue in accordance with the 00203 * block's deadline time. FIFO order is maintained when messages of 00204 * the same deadline time are inserted consecutively. 00205 * 00206 * @param new_item Pointer to an ACE_Message_Block that will be 00207 * added to the queue. The block's @c msg_deadline_time() 00208 * method will be called to obtain the relative queueing 00209 * position. 00210 * @param timeout The absolute time the caller will wait until 00211 * for the block to be queued. 00212 * 00213 * @retval >0 The number of ACE_Message_Blocks on the queue after adding 00214 * the specified block. 00215 * @retval -1 On failure. errno holds the reason. Common errno values are: 00216 * - EWOULDBLOCK: the timeout elapsed 00217 * - ESHUTDOWN: the queue was deactivated or pulsed 00218 */ 00219 virtual int enqueue_deadline (ACE_Message_Block *new_item, 00220 ACE_Time_Value *timeout = 0); 00221 00222 /** 00223 * @deprecated This is an alias for enqueue_prio(). It's only here for 00224 * backwards compatibility and will go away in a subsequent release. 00225 * Please use enqueue_prio() instead. 00226 */ 00227 virtual int enqueue (ACE_Message_Block *new_item, 00228 ACE_Time_Value *timeout = 0); 00229 00230 /** 00231 * Enqueue one or more ACE_Message_Block objects at the tail of the queue. 00232 * If the @a new_item @c next() pointer is non-zero, it is assumed to be the 00233 * start of a series of ACE_Message_Block objects connected via their 00234 * @c next() pointers. The series of blocks will be added to the queue in 00235 * the same order they are passed in as. 00236 * 00237 * @param new_item Pointer to an ACE_Message_Block that will be 00238 * added to the queue. If the block's @c next() pointer 00239 * is non-zero, all blocks chained from the @c next() 00240 * pointer are enqueued as well. 00241 * @param timeout The absolute time the caller will wait until 00242 * for the block to be queued. 00243 * 00244 * @retval >0 The number of ACE_Message_Blocks on the queue after adding 00245 * the specified block(s). 00246 * @retval -1 On failure. errno holds the reason. Common errno values are: 00247 * - EWOULDBLOCK: the timeout elapsed 00248 * - ESHUTDOWN: the queue was deactivated or pulsed 00249 */ 00250 virtual int enqueue_tail (ACE_Message_Block *new_item, 00251 ACE_Time_Value *timeout = 0); 00252 00253 /** 00254 * Enqueue one or more ACE_Message_Block objects at the head of the queue. 00255 * If the @a new_item @c next() pointer is non-zero, it is assumed to be the 00256 * start of a series of ACE_Message_Block objects connected via their 00257 * @c next() pointers. The series of blocks will be added to the queue in 00258 * the same order they are passed in as. 00259 * 00260 * @param new_item Pointer to an ACE_Message_Block that will be 00261 * added to the queue. If the block's @c next() pointer 00262 * is non-zero, all blocks chained from the @c next() 00263 * pointer are enqueued as well. 00264 * @param timeout The absolute time the caller will wait until 00265 * for the block to be queued. 00266 * 00267 * @retval >0 The number of ACE_Message_Blocks on the queue after adding 00268 * the specified block(s). 00269 * @retval -1 On failure. errno holds the reason. Common errno values are: 00270 * - EWOULDBLOCK: the timeout elapsed 00271 * - ESHUTDOWN: the queue was deactivated or pulsed 00272 */ 00273 virtual int enqueue_head (ACE_Message_Block *new_item, 00274 ACE_Time_Value *timeout = 0); 00275 00276 /// This method is an alias for the dequeue_head() method. 00277 virtual int dequeue (ACE_Message_Block *&first_item, 00278 ACE_Time_Value *timeout = 0); 00279 00280 /** 00281 * Dequeue the ACE_Message_Block at the head of the queue and return 00282 * a pointer to the dequeued block. 00283 * 00284 * @param first_item Reference to an ACE_Message_Block * that will 00285 * be set to the address of the dequeued block. 00286 * @param timeout The absolute time the caller will wait until 00287 * for a block to be dequeued. 00288 * 00289 * @retval >=0 The number of ACE_Message_Blocks remaining in the queue. 00290 * @retval -1 On failure. errno holds the reason. Common errno values are: 00291 * - EWOULDBLOCK: the timeout elapsed 00292 * - ESHUTDOWN: the queue was deactivated or pulsed 00293 */ 00294 virtual int dequeue_head (ACE_Message_Block *&first_item, 00295 ACE_Time_Value *timeout = 0); 00296 00297 /** 00298 * Dequeue the ACE_Message_Block that has the lowest priority (preserves 00299 * FIFO order for messages with the same priority) and return a pointer 00300 * to the dequeued block. 00301 * 00302 * @param first_item Reference to an ACE_Message_Block * that will 00303 * be set to the address of the dequeued block. 00304 * @param timeout The absolute time the caller will wait until 00305 * for a block to be dequeued. 00306 * 00307 * @retval >=0 The number of ACE_Message_Blocks remaining in the queue. 00308 * @retval -1 On failure. errno holds the reason. Common errno values are: 00309 * - EWOULDBLOCK: the timeout elapsed 00310 * - ESHUTDOWN: the queue was deactivated or pulsed 00311 */ 00312 virtual int dequeue_prio (ACE_Message_Block *&first_item, 00313 ACE_Time_Value *timeout = 0); 00314 00315 /** 00316 * Dequeue the ACE_Message_Block at the tail of the queue and return 00317 * a pointer to the dequeued block. 00318 * 00319 * @param dequeued Reference to an ACE_Message_Block * that will 00320 * be set to the address of the dequeued block. 00321 * @param timeout The absolute time the caller will wait until 00322 * for a block to be dequeued. 00323 * 00324 * @retval >=0 The number of ACE_Message_Blocks remaining in the queue. 00325 * @retval -1 On failure. errno holds the reason. Common errno values are: 00326 * - EWOULDBLOCK: the timeout elapsed 00327 * - ESHUTDOWN: the queue was deactivated or pulsed 00328 */ 00329 virtual int dequeue_tail (ACE_Message_Block *&dequeued, 00330 ACE_Time_Value *timeout = 0); 00331 00332 /** 00333 * Dequeue the ACE_Message_Block with the earliest deadline time and return 00334 * a pointer to the dequeued block. 00335 * 00336 * @param dequeued Reference to an ACE_Message_Block * that will 00337 * be set to the address of the dequeued block. 00338 * @param timeout The absolute time the caller will wait until 00339 * for a block to be dequeued. 00340 * 00341 * @retval >=0 The number of ACE_Message_Blocks remaining in the queue. 00342 * @retval -1 On failure. errno holds the reason. Common errno values are: 00343 * - EWOULDBLOCK: the timeout elapsed 00344 * - ESHUTDOWN: the queue was deactivated or pulsed 00345 */ 00346 virtual int dequeue_deadline (ACE_Message_Block *&dequeued, 00347 ACE_Time_Value *timeout = 0); 00348 //@} 00349 00350 /** @name Queue statistics methods 00351 */ 00352 //@{ 00353 00354 /// True if queue is full, else false. 00355 virtual bool is_full (void); 00356 /// True if queue is empty, else false. 00357 virtual bool is_empty (void); 00358 00359 /** 00360 * Number of total bytes on the queue, i.e., sum of the message 00361 * block sizes. 00362 */ 00363 virtual size_t message_bytes (void); 00364 00365 /** 00366 * Number of total length on the queue, i.e., sum of the message 00367 * block lengths. 00368 */ 00369 virtual size_t message_length (void); 00370 00371 /** 00372 * Number of total messages on the queue. 00373 */ 00374 virtual size_t message_count (void); 00375 00376 // = Manual changes to these stats (used when queued message blocks 00377 // change size or lengths). 00378 /** 00379 * New value of the number of total bytes on the queue, i.e., sum of 00380 * the message block sizes. 00381 */ 00382 virtual void message_bytes (size_t new_size); 00383 /** 00384 * New value of the number of total length on the queue, i.e., sum 00385 * of the message block lengths. 00386 */ 00387 virtual void message_length (size_t new_length); 00388 00389 //@} 00390 00391 00392 /** @name Water mark (flow control) methods 00393 */ 00394 //@{ 00395 00396 /** 00397 * Get high watermark. 00398 */ 00399 virtual size_t high_water_mark (void); 00400 /** 00401 * Set the high watermark, which determines how many bytes can be 00402 * stored in a queue before it's considered "full." 00403 */ 00404 virtual void high_water_mark (size_t hwm); 00405 00406 /** 00407 * Get low watermark. 00408 */ 00409 virtual size_t low_water_mark (void); 00410 /** 00411 * Set the low watermark, which determines how many bytes must be in 00412 * the queue before supplier threads are allowed to enqueue 00413 * additional ACE_Message_Blocks. 00414 */ 00415 virtual void low_water_mark (size_t lwm); 00416 //@} 00417 00418 /** @name Activation and queue state methods 00419 * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of 00420 * queue states and transitions and how the transitions affect message 00421 * enqueueing and dequeueing operations. 00422 */ 00423 //@{ 00424 00425 /** 00426 * Deactivate the queue and wakeup all threads waiting on the queue 00427 * so they can continue. No messages are removed from the queue, 00428 * however. Any other operations called until the queue is 00429 * activated again will immediately return -1 with @c errno == 00430 * ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the 00431 * call and WAS_ACTIVE if queue was active before the call. 00432 */ 00433 virtual int deactivate (void); 00434 00435 /** 00436 * Reactivate the queue so that threads can enqueue and dequeue 00437 * messages again. Returns the state of the queue before the call. 00438 */ 00439 virtual int activate (void); 00440 00441 /** 00442 * Pulse the queue to wake up any waiting threads. Changes the 00443 * queue state to PULSED; future enqueue/dequeue operations proceed 00444 * as in ACTIVATED state. 00445 * 00446 * @return The queue's state before this call. 00447 */ 00448 virtual int pulse (void); 00449 00450 /// Returns the current state of the queue, which can be one of 00451 /// ACTIVATED, DEACTIVATED, or PULSED. 00452 virtual int state (void); 00453 00454 /// Returns true if the state of the queue is <DEACTIVATED>, 00455 /// but false if the queue's is <ACTIVATED> or <PULSED>. 00456 virtual int deactivated (void); 00457 //@} 00458 00459 /** @name Notification strategy methods 00460 */ 00461 //@{ 00462 00463 /** 00464 * This hook is automatically invoked by <enqueue_head>, 00465 * <enqueue_tail>, and <enqueue_prio> when a new item is inserted 00466 * into the queue. Subclasses can override this method to perform 00467 * specific notification strategies (e.g., signaling events for a 00468 * <WFMO_Reactor>, notifying a <Reactor>, etc.). In a 00469 * multi-threaded application with concurrent consumers, there is no 00470 * guarantee that the queue will be still be non-empty by the time 00471 * the notification occurs. 00472 */ 00473 virtual int notify (void); 00474 00475 /// Get the notification strategy for the <Message_Queue> 00476 virtual ACE_Notification_Strategy *notification_strategy (void); 00477 00478 /// Set the notification strategy for the <Message_Queue> 00479 virtual void notification_strategy (ACE_Notification_Strategy *s); 00480 //@} 00481 00482 /// Returns a reference to the lock used by the ACE_Message_Queue. 00483 virtual ACE_SYNCH_MUTEX_T &lock (void); 00484 00485 /// Dump the state of an object. 00486 virtual void dump (void) const; 00487 00488 /// Declare the dynamic allocation hooks. 00489 ACE_ALLOC_HOOK_DECLARE; 00490 00491 protected: 00492 // = Routines that actually do the enqueueing and dequeueing. 00493 00494 // These routines assume that locks are held by the corresponding 00495 // public methods. Since they are virtual, you can change the 00496 // queueing mechanism by subclassing from ACE_Message_Queue. 00497 00498 /// Enqueue an <ACE_Message_Block *> in accordance with its priority. 00499 virtual int enqueue_i (ACE_Message_Block *new_item); 00500 00501 /// Enqueue an <ACE_Message_Block *> in accordance with its deadline time. 00502 virtual int enqueue_deadline_i (ACE_Message_Block *new_item); 00503 00504 /// Enqueue an <ACE_Message_Block *> at the end of the queue. 00505 virtual int enqueue_tail_i (ACE_Message_Block *new_item); 00506 00507 /// Enqueue an <ACE_Message_Block *> at the head of the queue. 00508 virtual int enqueue_head_i (ACE_Message_Block *new_item); 00509 00510 /// Dequeue and return the <ACE_Message_Block *> at the head of the 00511 /// queue. 00512 virtual int dequeue_head_i (ACE_Message_Block *&first_item); 00513 00514 /// Dequeue and return the <ACE_Message_Block *> with the lowest 00515 /// priority. 00516 virtual int dequeue_prio_i (ACE_Message_Block *&dequeued); 00517 00518 /// Dequeue and return the <ACE_Message_Block *> at the tail of the 00519 /// queue. 00520 virtual int dequeue_tail_i (ACE_Message_Block *&first_item); 00521 00522 /// Dequeue and return the <ACE_Message_Block *> with the lowest 00523 /// deadline time. 00524 virtual int dequeue_deadline_i (ACE_Message_Block *&first_item); 00525 00526 // = Check the boundary conditions (assumes locks are held). 00527 00528 /// True if queue is full, else false. 00529 virtual bool is_full_i (void); 00530 00531 /// True if queue is empty, else false. 00532 virtual bool is_empty_i (void); 00533 00534 // = Implementation of the public <activate> and <deactivate> methods. 00535 00536 // These methods assume locks are held. 00537 00538 /** 00539 * Notifies all waiting threads that the queue has been deactivated 00540 * so they can wakeup and continue other processing. 00541 * No messages are removed from the queue. 00542 * 00543 * @param pulse If 0, the queue's state is changed to DEACTIVATED 00544 * and any other operations called until the queue is 00545 * reactivated will immediately return -1 with 00546 * errno == ESHUTDOWN. 00547 * If not zero, only the waiting threads are notified and 00548 * the queue's state changes to PULSED. 00549 * 00550 * @return The state of the queue before the call. 00551 */ 00552 virtual int deactivate_i (int pulse = 0); 00553 00554 /// Activate the queue. 00555 virtual int activate_i (void); 00556 00557 // = Helper methods to factor out common #ifdef code. 00558 00559 /// Wait for the queue to become non-full. 00560 virtual int wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, 00561 ACE_Time_Value *timeout); 00562 00563 /// Wait for the queue to become non-empty. 00564 virtual int wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, 00565 ACE_Time_Value *timeout); 00566 00567 /// Inform any threads waiting to enqueue that they can procede. 00568 virtual int signal_enqueue_waiters (void); 00569 00570 /// Inform any threads waiting to dequeue that they can procede. 00571 virtual int signal_dequeue_waiters (void); 00572 00573 /// Pointer to head of ACE_Message_Block list. 00574 ACE_Message_Block *head_; 00575 00576 /// Pointer to tail of ACE_Message_Block list. 00577 ACE_Message_Block *tail_; 00578 00579 /// Lowest number before unblocking occurs. 00580 size_t low_water_mark_; 00581 00582 /// Greatest number of bytes before blocking. 00583 size_t high_water_mark_; 00584 00585 /// Current number of bytes in the queue. 00586 size_t cur_bytes_; 00587 00588 /// Current length of messages in the queue. 00589 size_t cur_length_; 00590 00591 /// Current number of messages in the queue. 00592 size_t cur_count_; 00593 00594 /// The notification strategy used when a new message is enqueued. 00595 ACE_Notification_Strategy *notification_strategy_; 00596 00597 // = Synchronization primitives for controlling concurrent access. 00598 /// Protect queue from concurrent access. 00599 ACE_SYNCH_MUTEX_T lock_; 00600 00601 /// Used to make threads sleep until the queue is no longer empty. 00602 ACE_SYNCH_CONDITION_T not_empty_cond_; 00603 00604 /// Used to make threads sleep until the queue is no longer full. 00605 ACE_SYNCH_CONDITION_T not_full_cond_; 00606 00607 /// Sends the size of the queue whenever it changes. 00608 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1) 00609 ACE::Monitor_Control::Size_Monitor *monitor_; 00610 #endif 00611 00612 private: 00613 00614 // = Disallow these operations. 00615 ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue<ACE_SYNCH_USE> &)) 00616 ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue (const ACE_Message_Queue<ACE_SYNCH_USE> &)) 00617 }; 00618 00619 // This typedef is used to get around a compiler bug in g++/vxworks. 00620 typedef ACE_Message_Queue<ACE_SYNCH> ACE_DEFAULT_MESSAGE_QUEUE_TYPE; 00621 00622 00623 /** 00624 * @class ACE_Message_Queue_Iterator 00625 * 00626 * @brief Iterator for the ACE_Message_Queue. 00627 */ 00628 template <ACE_SYNCH_DECL> 00629 class ACE_Message_Queue_Iterator 00630 { 00631 public: 00632 // = Initialization method. 00633 ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue); 00634 00635 // = Iteration methods. 00636 /// Pass back the @a entry that hasn't been seen in the queue. 00637 /// Returns 0 when all items have been seen, else 1. 00638 int next (ACE_Message_Block *&entry); 00639 00640 /// Returns 1 when all items have been seen, else 0. 00641 int done (void) const; 00642 00643 /// Move forward by one element in the queue. Returns 0 when all the 00644 /// items in the set have been seen, else 1. 00645 int advance (void); 00646 00647 /// Dump the state of an object. 00648 void dump (void) const; 00649 00650 /// Declare the dynamic allocation hooks. 00651 ACE_ALLOC_HOOK_DECLARE; 00652 00653 private: 00654 /// Message_Queue we are iterating over. 00655 ACE_Message_Queue <ACE_SYNCH_USE> &queue_; 00656 00657 /// Keeps track of how far we've advanced... 00658 ACE_Message_Block *curr_; 00659 }; 00660 00661 /** 00662 * @class ACE_Message_Queue_Reverse_Iterator 00663 * 00664 * @brief Reverse Iterator for the ACE_Message_Queue. 00665 */ 00666 template <ACE_SYNCH_DECL> 00667 class ACE_Message_Queue_Reverse_Iterator 00668 { 00669 public: 00670 // = Initialization method. 00671 ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue); 00672 00673 // = Iteration methods. 00674 /// Pass back the @a entry that hasn't been seen in the queue. 00675 /// Returns 0 when all items have been seen, else 1. 00676 int next (ACE_Message_Block *&entry); 00677 00678 /// Returns 1 when all items have been seen, else 0. 00679 int done (void) const; 00680 00681 /// Move forward by one element in the queue. Returns 0 when all the 00682 /// items in the set have been seen, else 1. 00683 int advance (void); 00684 00685 /// Dump the state of an object. 00686 void dump (void) const; 00687 00688 /// Declare the dynamic allocation hooks. 00689 ACE_ALLOC_HOOK_DECLARE; 00690 00691 private: 00692 /// Message_Queue we are iterating over. 00693 ACE_Message_Queue <ACE_SYNCH_USE> &queue_; 00694 00695 /// Keeps track of how far we've advanced... 00696 ACE_Message_Block *curr_; 00697 }; 00698 00699 /** 00700 * @class ACE_Dynamic_Message_Queue 00701 * 00702 * @brief A derived class which adapts the ACE_Message_Queue 00703 * class in order to maintain dynamic priorities for enqueued 00704 * <ACE_Message_Blocks> and manage the queue order according 00705 * to these dynamic priorities. 00706 * 00707 * The messages in the queue are managed so as to preserve 00708 * a logical ordering with minimal overhead per enqueue and 00709 * dequeue operation. For this reason, the actual order of 00710 * messages in the linked list of the queue may differ from 00711 * their priority order. As time passes, a message may change 00712 * from pending status to late status, and eventually to beyond 00713 * late status. To minimize reordering overhead under this 00714 * design force, three separate boundaries are maintained 00715 * within the linked list of messages. Messages are dequeued 00716 * preferentially from the head of the pending portion, then 00717 * the head of the late portion, and finally from the head 00718 * of the beyond late portion. In this way, only the boundaries 00719 * need to be maintained (which can be done efficiently, as 00720 * aging messages maintain the same linked list order as they 00721 * progress from one status to the next), with no reordering 00722 * of the messages themselves, while providing correct priority 00723 * ordered dequeueing semantics. 00724 * Head and tail enqueue methods inherited from ACE_Message_Queue 00725 * are made private to prevent out-of-order messages from confusing 00726 * management of the various portions of the queue. Messages in 00727 * the pending portion of the queue whose priority becomes late 00728 * (according to the specific dynamic strategy) advance into 00729 * the late portion of the queue. Messages in the late portion 00730 * of the queue whose priority becomes later than can be represented 00731 * advance to the beyond_late portion of the queue. These behaviors 00732 * support a limited schedule overrun, with pending messages prioritized 00733 * ahead of late messages, and late messages ahead of beyond late 00734 * messages. These behaviors can be modified in derived classes by 00735 * providing alternative definitions for the appropriate virtual methods. 00736 * When filled with messages, the queue's linked list should look like: 00737 * H T 00738 * | | 00739 * B - B - B - B - L - L - L - P - P - P - P - P 00740 * | | | | | | 00741 * BH BT LH LT PH PT 00742 * Where the symbols are as follows: 00743 * H = Head of the entire list 00744 * T = Tail of the entire list 00745 * B = Beyond late message 00746 * BH = Beyond late messages Head 00747 * BT = Beyond late messages Tail 00748 * L = Late message 00749 * LH = Late messages Head 00750 * LT = Late messages Tail 00751 * P = Pending message 00752 * PH = Pending messages Head 00753 * PT = Pending messages Tail 00754 * Caveat: the virtual methods enqueue_tail, enqueue_head, 00755 * and peek_dequeue_head have semantics for the static 00756 * message queues that cannot be guaranteed for dynamic 00757 * message queues. The peek_dequeue_head method just 00758 * calls the base class method, while the two enqueue 00759 * methods call the priority enqueue method. The 00760 * order of messages in the dynamic queue is a function 00761 * of message deadlines and how long they are in the 00762 * queues. You can manipulate these in some cases to 00763 * ensure the correct semantics, but that is not a 00764 * very stable or portable approach (discouraged). 00765 */ 00766 template <ACE_SYNCH_DECL> 00767 class ACE_Dynamic_Message_Queue : public ACE_Message_Queue<ACE_SYNCH_USE> 00768 { 00769 public: 00770 // = Initialization and termination methods. 00771 ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy, 00772 size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00773 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00774 ACE_Notification_Strategy * = 0); 00775 00776 /// Close down the message queue and release all resources. 00777 virtual ~ACE_Dynamic_Message_Queue (void); 00778 00779 /** 00780 * Detach all messages with status given in the passed flags from 00781 * the queue and return them by setting passed head and tail pointers 00782 * to the linked list they comprise. This method is intended primarily 00783 * as a means of periodically harvesting messages that have missed 00784 * their deadlines, but is available in its most general form. All 00785 * messages are returned in priority order, from head to tail, as of 00786 * the time this method was called. 00787 */ 00788 virtual int remove_messages (ACE_Message_Block *&list_head, 00789 ACE_Message_Block *&list_tail, 00790 u_int status_flags); 00791 00792 /** 00793 * Dequeue and return the <ACE_Message_Block *> at the head of the 00794 * queue. Returns -1 on failure, else the number of items still on 00795 * the queue. 00796 */ 00797 virtual int dequeue_head (ACE_Message_Block *&first_item, 00798 ACE_Time_Value *timeout = 0); 00799 00800 /// Dump the state of the queue. 00801 virtual void dump (void) const; 00802 00803 /** 00804 * Just call priority enqueue method: tail enqueue semantics for dynamic 00805 * message queues are unstable: the message may or may not be where 00806 * it was placed after the queue is refreshed prior to the next 00807 * enqueue or dequeue operation. 00808 */ 00809 virtual int enqueue_tail (ACE_Message_Block *new_item, 00810 ACE_Time_Value *timeout = 0); 00811 00812 /** 00813 * Just call priority enqueue method: head enqueue semantics for dynamic 00814 * message queues are unstable: the message may or may not be where 00815 * it was placed after the queue is refreshed prior to the next 00816 * enqueue or dequeue operation. 00817 */ 00818 virtual int enqueue_head (ACE_Message_Block *new_item, 00819 ACE_Time_Value *timeout = 0); 00820 00821 00822 /// Declare the dynamic allocation hooks. 00823 ACE_ALLOC_HOOK_DECLARE; 00824 00825 protected: 00826 00827 /** 00828 * Enqueue an <ACE_Message_Block *> in accordance with its priority. 00829 * priority may be *dynamic* or *static* or a combination or *both* 00830 * It calls the priority evaluation function passed into the Dynamic 00831 * Message Queue constructor to update the priorities of all 00832 * enqueued messages. 00833 */ 00834 virtual int enqueue_i (ACE_Message_Block *new_item); 00835 00836 /// Enqueue a message in priority order within a given priority status sublist 00837 virtual int sublist_enqueue_i (ACE_Message_Block *new_item, 00838 const ACE_Time_Value ¤t_time, 00839 ACE_Message_Block *&sublist_head, 00840 ACE_Message_Block *&sublist_tail, 00841 ACE_Dynamic_Message_Strategy::Priority_Status status); 00842 00843 /** 00844 * Dequeue and return the <ACE_Message_Block *> at the head of the 00845 * logical queue. Attempts first to dequeue from the pending 00846 * portion of the queue, or if that is empty from the late portion, 00847 * or if that is empty from the beyond late portion, or if that is 00848 * empty just sets the passed pointer to zero and returns -1. 00849 */ 00850 virtual int dequeue_head_i (ACE_Message_Block *&first_item); 00851 00852 /// Refresh the queue using the strategy 00853 /// specific priority status function. 00854 virtual int refresh_queue (const ACE_Time_Value & current_time); 00855 00856 /// Refresh the pending queue using the strategy 00857 /// specific priority status function. 00858 virtual int refresh_pending_queue (const ACE_Time_Value & current_time); 00859 00860 /// Refresh the late queue using the strategy 00861 /// specific priority status function. 00862 virtual int refresh_late_queue (const ACE_Time_Value & current_time); 00863 00864 /// Pointer to head of the pending messages 00865 ACE_Message_Block *pending_head_; 00866 00867 /// Pointer to tail of the pending messages 00868 ACE_Message_Block *pending_tail_; 00869 00870 /// Pointer to head of the late messages 00871 ACE_Message_Block *late_head_; 00872 00873 /// Pointer to tail of the late messages 00874 ACE_Message_Block *late_tail_; 00875 00876 /// Pointer to head of the beyond late messages 00877 ACE_Message_Block *beyond_late_head_; 00878 00879 /// Pointer to tail of the beyond late messages 00880 ACE_Message_Block *beyond_late_tail_; 00881 00882 /// Pointer to a dynamic priority evaluation function. 00883 ACE_Dynamic_Message_Strategy &message_strategy_; 00884 00885 private: 00886 // = Disallow public access to these operations. 00887 00888 ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &)) 00889 ACE_UNIMPLEMENTED_FUNC (ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &)) 00890 00891 // provide definitions for these (just call base class method), 00892 // but make them private so they're not accessible outside the class 00893 00894 /// Private method to hide public base class method: just calls base class method 00895 virtual int peek_dequeue_head (ACE_Message_Block *&first_item, 00896 ACE_Time_Value *timeout = 0); 00897 00898 }; 00899 00900 /** 00901 * @class ACE_Message_Queue_Factory 00902 * 00903 * @brief ACE_Message_Queue_Factory is a static factory class template which 00904 * provides a separate factory method for each of the major kinds of 00905 * priority based message dispatching: static, earliest deadline first 00906 * (EDF), and minimum laxity first (MLF). 00907 * 00908 * The ACE_Dynamic_Message_Queue class assumes responsibility for 00909 * releasing the resources of the strategy with which it was 00910 * constructed: the user of a message queue constructed by 00911 * any of these factory methods is only responsible for 00912 * ensuring destruction of the message queue itself. 00913 */ 00914 template <ACE_SYNCH_DECL> 00915 class ACE_Message_Queue_Factory 00916 { 00917 public: 00918 /// Factory method for a statically prioritized ACE_Message_Queue 00919 static ACE_Message_Queue<ACE_SYNCH_USE> * 00920 create_static_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00921 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00922 ACE_Notification_Strategy * = 0); 00923 00924 /// Factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue 00925 static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * 00926 create_deadline_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00927 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00928 ACE_Notification_Strategy * = 0, 00929 u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 00930 u_long static_bit_field_shift = 10, // 10 low order bits 00931 u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 00932 u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) 00933 00934 /// Factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue 00935 static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * 00936 create_laxity_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00937 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00938 ACE_Notification_Strategy * = 0, 00939 u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 00940 u_long static_bit_field_shift = 10, // 10 low order bits 00941 u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 00942 u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) 00943 00944 00945 #if defined (ACE_VXWORKS) 00946 00947 /// Factory method for a wrapped VxWorks message queue 00948 static ACE_Message_Queue_Vx * 00949 create_Vx_message_queue (size_t max_messages, size_t max_message_length, 00950 ACE_Notification_Strategy *ns = 0); 00951 00952 #endif /* defined (ACE_VXWORKS) */ 00953 00954 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) 00955 00956 /// Factory method for a NT message queue. 00957 static ACE_Message_Queue_NT * 00958 create_NT_message_queue (size_t max_threads); 00959 00960 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */ 00961 }; 00962 00963 /** 00964 * @class ACE_Message_Queue_Ex 00965 * 00966 * @brief A threaded message queueing facility, modeled after the 00967 * queueing facilities in System V STREAMs. 00968 * 00969 * ACE_Message_Queue_Ex is a strongly-typed version of the 00970 * ACE_Message_Queue class. Rather than queueing in terms of ACE_Message_Block 00971 * objects, ACE_Message_Queue_Ex has a template argument to specify the 00972 * type of objects that are queued. 00973 * 00974 * The second template argument parameterizes the queue's synchronization. 00975 * The argument specifies a synchronization strategy. The two main 00976 * strategies available for ACE_SYNCH_DECL are: 00977 * -# ACE_MT_SYNCH: all operations are thread-safe 00978 * -# ACE_NULL_SYNCH: no synchronization and no locking overhead 00979 */ 00980 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> 00981 class ACE_Message_Queue_Ex 00982 { 00983 public: 00984 00985 enum 00986 { 00987 /// Default priority value. This is the lowest priority. 00988 DEFAULT_PRIORITY = 0 00989 }; 00990 00991 #if 0 00992 // @@ Iterators are not implemented yet... 00993 00994 friend class ACE_Message_Queue_Iterator<ACE_SYNCH_USE>; 00995 friend class ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>; 00996 00997 // = Traits 00998 typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE> 00999 ITERATOR; 01000 typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE> 01001 REVERSE_ITERATOR; 01002 #endif /* 0 */ 01003 01004 /** 01005 * @name Initialization methods 01006 */ 01007 //@{ 01008 /** 01009 * Initialize an ACE_Message_Queue_Ex. 01010 * 01011 * @param high_water_mark High water mark. Determines how many bytes can be 01012 * stored in a queue before it's considered full. Supplier threads 01013 * must block until the queue is no longer full. 01014 * @param low_water_mark Low water mark. Determines how many bytes must be in 01015 * the queue before supplier threads are allowed to enqueue additional 01016 * data. By default, the @a hwm equals @a lwm, which means 01017 * that suppliers will be able to enqueue new messages as soon as 01018 * a consumer removes any message from the queue. Making the low 01019 * water mark smaller than the high water mark forces consumers to 01020 * drain more messages from the queue before suppliers can enqueue 01021 * new messages, which can minimize the "silly window syndrome." 01022 * @param ns Notification strategy. Pointer to an object conforming to the 01023 * ACE_Notification_Strategy interface. If set, the object's 01024 * notify(void) method will be called each time data is added to 01025 * this ACE_Message_Queue. @see ACE_Reactor_Notification_Strategy. 01026 */ 01027 ACE_Message_Queue_Ex (size_t high_water_mark = ACE_Message_Queue_Base::DEFAULT_HWM, 01028 size_t low_water_mark = ACE_Message_Queue_Base::DEFAULT_LWM, 01029 ACE_Notification_Strategy * ns = 0); 01030 virtual int open (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 01031 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 01032 ACE_Notification_Strategy * = 0); 01033 //@} 01034 01035 /// Releases all resources from the message queue and marks it deactivated. 01036 /// @sa flush(). 01037 /// 01038 /// @retval The number of messages released from the queue; -1 on error. 01039 virtual int close (void); 01040 01041 /// Releases all resources from the message queue and marks it deactivated. 01042 virtual ~ACE_Message_Queue_Ex (void); 01043 01044 /** 01045 * Releases all resources from the message queue but does not mark it 01046 * deactivated. This method holds the queue lock during this operation. 01047 * @sa close(). 01048 * 01049 * @return The number of messages flushed; -1 on error. 01050 */ 01051 virtual int flush (void); 01052 01053 /** 01054 * Release all resources from the message queue but do not mark it 01055 * as deactivated. 01056 * 01057 * @pre The caller must be holding the queue lock before calling this 01058 * method. 01059 * 01060 * @return The number of messages flushed. 01061 */ 01062 virtual int flush_i (void); 01063 01064 /** @name Enqueue and dequeue methods 01065 * 01066 * The enqueue and dequeue methods accept a timeout value passed as 01067 * an ACE_Time_Value *. In all cases, if the timeout pointer is 0, 01068 * the caller will block until action is possible. If the timeout pointer 01069 * is non-zero, the call will wait (if needed, subject to water mark 01070 * settings) until the absolute time specified in the referenced 01071 * ACE_Time_Value object is reached. If the time is reached before the 01072 * desired action is possible, the method will return -1 with errno set 01073 * to @c EWOULDBLOCK. Regardless of the timeout setting, however, 01074 * these methods will also fail and return -1 when the queue is closed, 01075 * deactivated, pulsed, or when a signal occurs. 01076 * 01077 * The time parameters are handled the same as in ACE_Message_Queue, so 01078 * you can see C++NPv2 Section 6.2 and APG Section 12.3 for a fuller 01079 * treatment of ACE_Message_Queue, enqueueing, dequeueing, and how these 01080 * operations are affected by queue state transitions. 01081 */ 01082 //@{ 01083 /** 01084 * Retrieve a pointer to the first item in the queue without removing it. 01085 * 01086 * @note Because the item whose pointer is returned is still on the queue, 01087 * another thread may dequeue that item at any time, 01088 * including before the calling thread examines the peeked-at item. 01089 * Be very careful with this method in multithreaded queueing 01090 * situations. 01091 * 01092 * @param first_item Reference to an ACE_MESSAGE_TYPE * that will 01093 * point to the first item on the queue. The item 01094 * remains on the queue until this or another thread 01095 * dequeues it. 01096 * @param timeout The absolute time the caller will wait until 01097 * for an item to be queued. 01098 * 01099 * @retval >0 The number of items on the queue. 01100 * @retval -1 On failure. errno holds the reason. Common errno values are: 01101 * - EWOULDBLOCK: the timeout elapsed 01102 * - ESHUTDOWN: the queue was deactivated or pulsed 01103 */ 01104 virtual int peek_dequeue_head (ACE_MESSAGE_TYPE *&first_item, 01105 ACE_Time_Value *timeout = 0); 01106 01107 /** 01108 * Enqueue an ACE_MESSAGE TYPE into the queue in accordance with 01109 * the specified priority (0 is lowest priority). FIFO 01110 * order is maintained when items of the same priority are 01111 * inserted consecutively. 01112 * 01113 * @param new_item Pointer to an item that will be added to the queue. 01114 * @param timeout The absolute time the caller will wait until 01115 * for the block to be queued. 01116 * @param priority The priority to use when enqueueing the item. 01117 * 01118 * @retval >0 The number of items on the queue after adding 01119 * the specified item. 01120 * @retval -1 On failure. errno holds the reason. Common errno values are: 01121 * - EWOULDBLOCK: the timeout elapsed 01122 * - ESHUTDOWN: the queue was deactivated or pulsed 01123 */ 01124 virtual int enqueue_prio (ACE_MESSAGE_TYPE *new_item, 01125 ACE_Time_Value *timeout = 0, 01126 unsigned long priority = DEFAULT_PRIORITY); 01127 01128 /** 01129 * This method acts just like enqueue_tail(). There's no deadline 01130 * time associated with items. 01131 */ 01132 virtual int enqueue_deadline (ACE_MESSAGE_TYPE *new_item, 01133 ACE_Time_Value *timeout = 0); 01134 01135 /** 01136 * @deprecated This is an alias for enqueue_prio(). It's only here for 01137 * backwards compatibility and will go away in a subsequent release. 01138 * Please use enqueue_prio() instead. 01139 */ 01140 virtual int enqueue (ACE_MESSAGE_TYPE *new_item, 01141 ACE_Time_Value *timeout = 0); 01142 01143 /** 01144 * Enqueue an item at the tail of the queue. 01145 * 01146 * @param new_item Pointer to an item that will be added to the queue. 01147 * @param timeout The absolute time the caller will wait until 01148 * for the item to be queued. 01149 * 01150 * @retval >0 The number of items on the queue after adding 01151 * the specified item. 01152 * @retval -1 On failure. errno holds the reason. Common errno values are: 01153 * - EWOULDBLOCK: the timeout elapsed 01154 * - ESHUTDOWN: the queue was deactivated or pulsed 01155 */ 01156 virtual int enqueue_tail (ACE_MESSAGE_TYPE *new_item, 01157 ACE_Time_Value *timeout = 0); 01158 01159 /** 01160 * Enqueue an item at the head of the queue. 01161 * 01162 * @param new_item Pointer to an item that will be added to the queue. 01163 * @param timeout The absolute time the caller will wait until 01164 * for the item to be queued. 01165 * 01166 * @retval >0 The number of items on the queue after adding 01167 * the specified item. 01168 * @retval -1 On failure. errno holds the reason. Common errno values are: 01169 * - EWOULDBLOCK: the timeout elapsed 01170 * - ESHUTDOWN: the queue was deactivated or pulsed 01171 */ 01172 virtual int enqueue_head (ACE_MESSAGE_TYPE *new_item, 01173 ACE_Time_Value *timeout = 0); 01174 01175 /// This method is an alias for the following <dequeue_head> method. 01176 virtual int dequeue (ACE_MESSAGE_TYPE *&first_item, 01177 ACE_Time_Value *timeout = 0); 01178 01179 /** 01180 * Dequeue the item at the head of the queue and return a pointer to it. 01181 * 01182 * @param first_item Reference to an ACE_MESSAGE_TYPE * that will 01183 * be set to the address of the dequeued item. 01184 * @param timeout The absolute time the caller will wait until 01185 * for an item to be dequeued. 01186 * 01187 * @retval >=0 The number of items remaining in the queue. 01188 * @retval -1 On failure. errno holds the reason. Common errno values are: 01189 * - EWOULDBLOCK: the timeout elapsed 01190 * - ESHUTDOWN: the queue was deactivated or pulsed 01191 */ 01192 virtual int dequeue_head (ACE_MESSAGE_TYPE *&first_item, 01193 ACE_Time_Value *timeout = 0); 01194 01195 /** 01196 * Dequeue the item that has the lowest priority (preserves 01197 * FIFO order for items with the same priority) and return a pointer 01198 * to it. 01199 * 01200 * @param dequeued Reference to an ACE_MESSAGE_TYPE * that will 01201 * be set to the address of the dequeued item. 01202 * @param timeout The absolute time the caller will wait until 01203 * for an item to be dequeued. 01204 * 01205 * @retval >=0 The number of items remaining in the queue. 01206 * @retval -1 On failure. errno holds the reason. Common errno values are: 01207 * - EWOULDBLOCK: the timeout elapsed 01208 * - ESHUTDOWN: the queue was deactivated or pulsed 01209 */ 01210 virtual int dequeue_prio (ACE_MESSAGE_TYPE *&dequeued, 01211 ACE_Time_Value *timeout = 0); 01212 01213 /** 01214 * Dequeue the item at the tail of the queue and return a pointer to it. 01215 * 01216 * @param dequeued Reference to an ACE_MESSAGE_TYPE * that will 01217 * be set to the address of the dequeued item. 01218 * @param timeout The absolute time the caller will wait until 01219 * for an item to be dequeued. 01220 * 01221 * @retval >=0 The number of items remaining in the queue. 01222 * @retval -1 On failure. errno holds the reason. Common errno values are: 01223 * - EWOULDBLOCK: the timeout elapsed 01224 * - ESHUTDOWN: the queue was deactivated or pulsed 01225 */ 01226 virtual int dequeue_tail (ACE_MESSAGE_TYPE *&dequeued, 01227 ACE_Time_Value *timeout = 0); 01228 01229 /** 01230 * Because there's deadline associated with enqueue_deadline(), this 01231 * method will behave just as dequeue_head(). 01232 */ 01233 virtual int dequeue_deadline (ACE_MESSAGE_TYPE *&dequeued, 01234 ACE_Time_Value *timeout = 0); 01235 //@} 01236 01237 /** @name Queue statistics methods 01238 */ 01239 //@{ 01240 01241 /// True if queue is full, else false. 01242 virtual bool is_full (void); 01243 01244 /// True if queue is empty, else false. 01245 virtual bool is_empty (void); 01246 01247 /** 01248 * Number of total bytes on the queue, i.e., sum of the message 01249 * block sizes. 01250 */ 01251 virtual size_t message_bytes (void); 01252 /** 01253 * Number of total length on the queue, i.e., sum of the message 01254 * block lengths. 01255 */ 01256 virtual size_t message_length (void); 01257 /** 01258 * Number of total messages on the queue. 01259 */ 01260 virtual size_t message_count (void); 01261 01262 // = Manual changes to these stats (used when queued message blocks 01263 // change size or lengths). 01264 /** 01265 * New value of the number of total bytes on the queue, i.e., sum of 01266 * the message block sizes. 01267 */ 01268 virtual void message_bytes (size_t new_size); 01269 /** 01270 * New value of the number of total length on the queue, i.e., sum 01271 * of the message block lengths. 01272 */ 01273 virtual void message_length (size_t new_length); 01274 01275 //@} 01276 01277 /** @name Water mark (flow control) methods 01278 */ 01279 //@{ 01280 01281 /** 01282 * Get high watermark. 01283 */ 01284 virtual size_t high_water_mark (void); 01285 /** 01286 * Set the high watermark, which determines how many bytes can be 01287 * stored in a queue before it's considered "full." 01288 */ 01289 virtual void high_water_mark (size_t hwm); 01290 01291 /** 01292 * Get low watermark. 01293 */ 01294 virtual size_t low_water_mark (void); 01295 /** 01296 * Set the low watermark, which determines how many bytes must be in 01297 * the queue before supplier threads are allowed to enqueue 01298 * additional <ACE_MESSAGE_TYPE>s. 01299 */ 01300 virtual void low_water_mark (size_t lwm); 01301 //@} 01302 01303 /** @name Activation and queue state methods 01304 * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of 01305 * queue states and transitions and how the transitions affect message 01306 * enqueueing and dequeueing operations. 01307 */ 01308 //@{ 01309 01310 /** 01311 * Deactivate the queue and wakeup all threads waiting on the queue 01312 * so they can continue. No messages are removed from the queue, 01313 * however. Any other operations called until the queue is 01314 * activated again will immediately return -1 with @c errno == 01315 * ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the 01316 * call and WAS_ACTIVE if queue was active before the call. 01317 */ 01318 virtual int deactivate (void); 01319 01320 /** 01321 * Reactivate the queue so that threads can enqueue and dequeue 01322 * messages again. Returns the state of the queue before the call. 01323 */ 01324 virtual int activate (void); 01325 01326 /** 01327 * Pulse the queue to wake up any waiting threads. Changes the 01328 * queue state to PULSED; future enqueue/dequeue operations proceed 01329 * as in ACTIVATED state. 01330 * 01331 * @retval The queue's state before this call. 01332 */ 01333 virtual int pulse (void); 01334 01335 /// Returns the current state of the queue, which can be one of 01336 /// ACTIVATED, DEACTIVATED, or PULSED. 01337 virtual int state (void); 01338 01339 /// Returns true if the state of the queue is DEACTIVATED, 01340 /// but false if the queue's state is ACTIVATED or PULSED. 01341 virtual int deactivated (void); 01342 //@} 01343 01344 /** @name Notification strategy methods 01345 */ 01346 //@{ 01347 01348 /** 01349 * This hook is automatically invoked by <enqueue_head>, 01350 * <enqueue_tail>, and <enqueue_prio> when a new item is inserted 01351 * into the queue. Subclasses can override this method to perform 01352 * specific notification strategies (e.g., signaling events for a 01353 * <WFMO_Reactor>, notifying a <Reactor>, etc.). In a 01354 * multi-threaded application with concurrent consumers, there is no 01355 * guarantee that the queue will be still be non-empty by the time 01356 * the notification occurs. 01357 */ 01358 virtual int notify (void); 01359 01360 /// Get the notification strategy for the <Message_Queue> 01361 virtual ACE_Notification_Strategy *notification_strategy (void); 01362 01363 /// Set the notification strategy for the <Message_Queue> 01364 virtual void notification_strategy (ACE_Notification_Strategy *s); 01365 //@} 01366 01367 /// Returns a reference to the lock used by the ACE_Message_Queue_Ex. 01368 virtual ACE_SYNCH_MUTEX_T &lock (void); 01369 01370 /// Dump the state of an object. 01371 virtual void dump (void) const; 01372 01373 /// Declare the dynamic allocation hooks. 01374 ACE_ALLOC_HOOK_DECLARE; 01375 01376 protected: 01377 /// Implement this via an ACE_Message_Queue. 01378 ACE_Message_Queue<ACE_SYNCH_USE> queue_; 01379 }; 01380 01381 /** 01382 * @class ACE_Message_Queue_Ex_N 01383 * 01384 * @brief A threaded message queueing facility, modeled after the 01385 * queueing facilities in System V STREAMs which can enqueue 01386 * multiple messages in one call. 01387 * 01388 * As ACE_Message_Queue_Ex, ACE_Message_Queue_Ex_N is a strongly-typed 01389 * version of the ACE_Message_Queue. If @c ACE_SYNCH_DECL is @c ACE_MT_SYNCH 01390 * then all operations are thread-safe. Otherwise, if it's @c ACE_NULL_SYNCH 01391 * then there's no locking overhead. 01392 * 01393 * The @c ACE_MESSAGE_TYPE messages that are sent to this 01394 * queue can be chained. Messages are expected to have a 01395 * @c next method that returns the next message in the chain; 01396 * ACE_Message_Queue_Ex_N uses this method to run through 01397 * all the incoming messages and enqueue them in one call. 01398 */ 01399 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> 01400 class ACE_Message_Queue_Ex_N : public ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE> 01401 { 01402 public: 01403 // = Initialization and termination methods. 01404 01405 /** 01406 * Initialize an ACE_Message_Queue_Ex_N. The @a high_water_mark 01407 * determines how many bytes can be stored in a queue before it's 01408 * considered "full." Supplier threads must block until the queue 01409 * is no longer full. The @a low_water_mark determines how many 01410 * bytes must be in the queue before supplier threads are allowed to 01411 * enqueue additional messages. By default, the @a high_water_mark 01412 * equals the @a low_water_mark, which means that suppliers will be 01413 * able to enqueue new messages as soon as a consumer removes any message 01414 * from the queue. Making the @a low_water_mark smaller than the 01415 * @a high_water_mark forces consumers to drain more messages from the 01416 * queue before suppliers can enqueue new messages, which can minimize 01417 * the "silly window syndrome." 01418 */ 01419 ACE_Message_Queue_Ex_N (size_t high_water_mark = ACE_Message_Queue_Base::DEFAULT_HWM, 01420 size_t low_water_mark = ACE_Message_Queue_Base::DEFAULT_LWM, 01421 ACE_Notification_Strategy * ns = 0); 01422 01423 /// Close down the message queue and release all resources. 01424 virtual ~ACE_Message_Queue_Ex_N (void); 01425 01426 /** 01427 * Enqueue one or more @c ACE_MESSAGE_TYPE objects at the head of the queue. 01428 * If the @a new_item @c next() pointer is non-zero, it is assumed to be the 01429 * start of a series of @c ACE_MESSAGE_TYPE objects connected via their 01430 * @c next() pointers. The series of blocks will be added to the queue in 01431 * the same order they are passed in as. 01432 * 01433 * @param new_item Pointer to an @c ACE_MESSAGE_TYPE that will be 01434 * added to the queue. If the block's @c next() pointer 01435 * is non-zero, all blocks chained from the @c next() 01436 * pointer are enqueued as well. 01437 * @param tv The absolute time the caller will wait until 01438 * for the block to be queued. 01439 * 01440 * @retval >0 The number of @c ACE_MESSAGE_TYPE objects on the queue after 01441 * adding the specified block(s). 01442 * @retval -1 On failure. errno holds the reason. Common errno values are: 01443 * - EWOULDBLOCK: the timeout elapsed 01444 * - ESHUTDOWN: the queue was deactivated or pulsed 01445 */ 01446 virtual int enqueue_head (ACE_MESSAGE_TYPE *new_item, ACE_Time_Value *tv = 0); 01447 01448 /** 01449 * Enqueue one or more @c ACE_MESSAGE_TYPE objects at the tail of the queue. 01450 * If the @a new_item @c next() pointer is non-zero, it is assumed to be the 01451 * start of a series of @c ACE_MESSAGE_TYPE objects connected via their 01452 * @c next() pointers. The series of blocks will be added to the queue in 01453 * the same order they are passed in as. 01454 * 01455 * @param new_item Pointer to an @c ACE_MESSAGE_TYPE that will be 01456 * added to the queue. If the block's @c next() pointer 01457 * is non-zero, all blocks chained from the @c next() 01458 * pointer are enqueued as well. 01459 * @param tv The absolute time the caller will wait until 01460 * for the block to be queued. 01461 * 01462 * @retval >0 The number of @c ACE_MESSAGE_TYPE objects on the queue after 01463 * adding the specified block(s). 01464 * @retval -1 On failure. errno holds the reason. Common errno values are: 01465 * - EWOULDBLOCK: the timeout elapsed 01466 * - ESHUTDOWN: the queue was deactivated or pulsed 01467 */ 01468 virtual int enqueue_tail (ACE_MESSAGE_TYPE *new_item, ACE_Time_Value *tv = 0); 01469 01470 /// Declare the dynamic allocation hooks. 01471 ACE_ALLOC_HOOK_DECLARE; 01472 01473 protected: 01474 /** 01475 * An helper method that wraps the incoming chain messages 01476 * with ACE_Message_Blocks. 01477 */ 01478 ACE_Message_Block *wrap_with_mbs_i (ACE_MESSAGE_TYPE *new_item); 01479 }; 01480 01481 ACE_END_VERSIONED_NAMESPACE_DECL 01482 01483 #if defined (ACE_TEMPLATES_REQUIRE_SOURCE) 01484 #include "ace/Message_Queue_T.cpp" 01485 #endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ 01486 01487 #if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) 01488 #pragma implementation ("Message_Queue_T.cpp") 01489 #endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ 01490 01491 #include /**/ "ace/post.h" 01492 01493 #endif /* ACE_MESSAGE_QUEUE_T_H */