ACE_Notification_Queue Class Reference

Implements a user-space queue to send Reactor notifications. More...

#include <Notification_Queue.h>

Inheritance diagram for ACE_Notification_Queue:

Inheritance graph
[legend]
Collaboration diagram for ACE_Notification_Queue:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ACE_Notification_Queue ()
 ~ACE_Notification_Queue ()
int open ()
 Pre-allocate resources in the queue.
void reset ()
 Release all resources in the queue.
int purge_pending_notifications (ACE_Event_Handler *eh, ACE_Reactor_Mask mask)
 Remove all elements in the queue matching eh and mask.
int push_new_notification (ACE_Notification_Buffer const &buffer)
 Add a new notification to the queue.
int pop_next_notification (ACE_Notification_Buffer &current, bool &more_messages_queued, ACE_Notification_Buffer &next)
 Extract the next notification from the queue.

Private Types

typedef ACE_Intrusive_List<
ACE_Notification_Queue_Node
Buffer_List

Private Member Functions

int allocate_more_buffers ()
 Allocate more memory for the queue.

Private Attributes

ACE_Unbounded_Queue< ACE_Notification_Queue_Node * > alloc_queue_
Buffer_List notify_queue_
 Keeps track of all pending notifications.
Buffer_List free_queue_
 Keeps track of all free buffers.
ACE_SYNCH_MUTEX notify_queue_lock_
 Synchronization for handling of queues.

Detailed Description

Implements a user-space queue to send Reactor notifications.

The ACE_Reactor uses a pipe to send wake up the thread running the event loop from other threads. This pipe can be limited in size under some operating systems. For some applications, this limit presents a problem. A user-space notification queue is used to overcome those limitations. The queue tries to use as few resources on the pipe as possible, while keeping all the data in user space.

This code was refactored from Select_Reactor_Base.

Definition at line 80 of file Notification_Queue.h.


Member Typedef Documentation

typedef ACE_Intrusive_List<ACE_Notification_Queue_Node> ACE_Notification_Queue::Buffer_List [private]

Definition at line 136 of file Notification_Queue.h.


Constructor & Destructor Documentation

ACE_BEGIN_VERSIONED_NAMESPACE_DECL ACE_Notification_Queue::ACE_Notification_Queue (  ) 

Definition at line 14 of file Notification_Queue.cpp.

00015   : ACE_Copy_Disabled()
00016   , alloc_queue_()
00017   , notify_queue_()
00018   , free_queue_()
00019 {
00020 }

ACE_Notification_Queue::~ACE_Notification_Queue (  ) 

Definition at line 23 of file Notification_Queue.cpp.

References reset().

00024 {
00025   reset();
00026 }


Member Function Documentation

int ACE_Notification_Queue::allocate_more_buffers (  )  [private]

Allocate more memory for the queue.

Definition at line 79 of file Notification_Queue.cpp.

References ACE_NEW_RETURN, ACE_REACTOR_NOTIFICATION_ARRAY_SIZE, ACE_TRACE, free_queue_, and ACE_Intrusive_List< T >::push_front().

Referenced by open(), and push_new_notification().

00080 {
00081   ACE_TRACE ("ACE_Notification_Queue::allocate_more_buffers");
00082 
00083   ACE_Notification_Queue_Node *temp = 0;
00084 
00085   ACE_NEW_RETURN (temp,
00086                   ACE_Notification_Queue_Node[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00087                   -1);
00088 
00089   if (this->alloc_queue_.enqueue_head (temp) == -1)
00090     {
00091       delete [] temp;
00092       return -1;
00093     }
00094 
00095   for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
00096     {
00097       free_queue_.push_front(temp + i);
00098     }
00099 
00100   return 0;
00101 }

int ACE_Notification_Queue::open (  ) 

Pre-allocate resources in the queue.

Definition at line 30 of file Notification_Queue.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, ACE_TRACE, and allocate_more_buffers().

00031 {
00032   ACE_TRACE ("ACE_Notification_Queue::open");
00033 
00034   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00035 
00036   if (!this->free_queue_.is_empty ())
00037     return 0;
00038 
00039   return allocate_more_buffers();
00040 }

int ACE_Notification_Queue::pop_next_notification ( ACE_Notification_Buffer current,
bool &  more_messages_queued,
ACE_Notification_Buffer next 
)

Extract the next notification from the queue.

Returns:
-1 on failure, 1 if a message was popped, 0 otherwise

Definition at line 197 of file Notification_Queue.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, ACE_TRACE, free_queue_, ACE_Notification_Queue_Node::get(), ACE_Intrusive_List< T >::head(), ACE_Intrusive_List< T >::is_empty(), ACE_Unbounded_Stack_Iterator< T >::next(), notify_queue_, ACE_Intrusive_List< T >::pop_front(), and ACE_Intrusive_List< T >::push_front().

00201 {
00202   ACE_TRACE ("ACE_Notification_Queue::pop_next_notification");
00203 
00204   more_messages_queued = false;
00205 
00206   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00207 
00208   if (notify_queue_.is_empty ())
00209     {
00210       return 0;
00211     }
00212 
00213   ACE_Notification_Queue_Node * node =
00214     notify_queue_.pop_front();
00215 
00216   current = node->get();
00217   free_queue_.push_front(node);
00218 
00219   if(!this->notify_queue_.is_empty())
00220     {
00221       more_messages_queued = true;
00222       next = notify_queue_.head()->get();
00223     }
00224 
00225   return 1;
00226 }

int ACE_Notification_Queue::purge_pending_notifications ( ACE_Event_Handler eh,
ACE_Reactor_Mask  mask 
)

Remove all elements in the queue matching eh and mask.

I suggest reading the documentation in ACE_Reactor to find a more detailed description. This is just a helper function.

Definition at line 105 of file Notification_Queue.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, ACE_TRACE, ACE_Notification_Queue_Node::clear_mask(), ACE_Notification_Buffer::eh_, free_queue_, ACE_Notification_Queue_Node::get(), ACE_Intrusive_List< T >::head(), ACE_Notification_Queue_Node::mask_disables_all_notifications(), ACE_Notification_Queue_Node::matches_for_purging(), ACE_Unbounded_Stack_Iterator< T >::next(), ACE_Intrusive_List_Node< T >::next(), notify_queue_, ACE_Intrusive_List< T >::push_front(), ACE_Event_Handler::remove_reference(), and ACE_Intrusive_List< T >::unsafe_remove().

00107 {
00108   ACE_TRACE ("ACE_Notification_Queue::purge_pending_notifications");
00109 
00110   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00111 
00112   if (this->notify_queue_.is_empty ())
00113     return 0;
00114 
00115   int number_purged = 0;
00116   ACE_Notification_Queue_Node * node = notify_queue_.head();
00117   while(node != 0)
00118     {
00119       if (!node->matches_for_purging(eh))
00120         {
00121           // Easy case, skip to the next node
00122           node = node->next();
00123           continue;
00124         }
00125 
00126       if (!node->mask_disables_all_notifications(mask))
00127         {
00128           // ... another easy case, skip this node too, but clear the
00129           // mask first ...
00130           node->clear_mask(mask);
00131           node = node->next();
00132           continue;
00133         }
00134 
00135       // ... this is the more complicated case, we want to remove the
00136       // node from the notify_queue_ list.  First save the next node
00137       // on the list:
00138       ACE_Notification_Queue_Node * next = node->next();
00139 
00140       // ... then remove it ...
00141       notify_queue_.unsafe_remove(node);
00142       ++number_purged;
00143 
00144       // ... release resources ...
00145       ACE_Event_Handler *event_handler = node->get().eh_;
00146       event_handler->remove_reference ();
00147 
00148       // ... now this is a free node ...
00149       free_queue_.push_front(node);
00150 
00151       // ... go to the next node, if there is one ...
00152       node = next;
00153     }
00154 
00155   return number_purged;
00156 }

int ACE_Notification_Queue::push_new_notification ( ACE_Notification_Buffer const &  buffer  ) 

Add a new notification to the queue.

Returns:
-1 on failure, 1 if a new message should be sent through the pipe and 0 otherwise.

Definition at line 159 of file Notification_Queue.cpp.

References ACE_ASSERT, ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, ACE_TRACE, allocate_more_buffers(), free_queue_, ACE_Intrusive_List< T >::is_empty(), notify_queue_, ACE_Intrusive_List< T >::pop_front(), ACE_Intrusive_List< T >::push_back(), and ACE_Notification_Queue_Node::set().

00161 {
00162   ACE_TRACE ("ACE_Notification_Queue::push_new_notification");
00163 
00164   bool notification_required = false;
00165 
00166   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00167 
00168   // No pending notifications.
00169   if (this->notify_queue_.is_empty ())
00170     notification_required = true;
00171 
00172   if (free_queue_.is_empty())
00173     {
00174       if (allocate_more_buffers() == -1)
00175         {
00176           return -1;
00177         }
00178     }
00179 
00180   ACE_Notification_Queue_Node * node =
00181     free_queue_.pop_front();
00182 
00183   ACE_ASSERT (node != 0);
00184   node->set(buffer);
00185 
00186   notify_queue_.push_back(node);
00187 
00188   if (!notification_required)
00189     {
00190       return 0;
00191     }
00192 
00193   return 1;
00194 }

void ACE_Notification_Queue::reset (  ) 

Release all resources in the queue.

Definition at line 44 of file Notification_Queue.cpp.

References ACE_TRACE, ACE_Unbounded_Queue_Iterator< T >::advance(), alloc_queue_, free_queue_, ACE_Intrusive_List< T >::head(), ACE_Unbounded_Queue_Iterator< T >::next(), notify_queue_, and ACE_Unbounded_Queue< T >::reset().

Referenced by ~ACE_Notification_Queue().

00045 {
00046   ACE_TRACE ("ACE_Notification_Queue::reset");
00047 
00048   // Release all the event handlers still in the queue ...
00049   for (ACE_Notification_Queue_Node * node = notify_queue_.head();
00050        node != 0;
00051        node = node->next())
00052     {
00053       if (node->get().eh_ == 0)
00054         {
00055           continue;
00056         }
00057       (void) node->get().eh_->remove_reference();
00058     }
00059 
00060   // ... free up the dynamically allocated resources ...
00061   ACE_Notification_Queue_Node **b = 0;
00062   for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Queue_Node *> alloc_iter (this->alloc_queue_);
00063        alloc_iter.next (b) != 0;
00064        alloc_iter.advance ())
00065     {
00066       delete [] *b;
00067       *b = 0;
00068     }
00069 
00070   // ... cleanup the list of allocated blocks ...
00071   this->alloc_queue_.reset ();
00072 
00073   // ... swap with empty lists to reset the contents ...
00074   Buffer_List().swap(notify_queue_);
00075   Buffer_List().swap(free_queue_);
00076 }


Member Data Documentation

ACE_Unbounded_Queue<ACE_Notification_Queue_Node*> ACE_Notification_Queue::alloc_queue_ [private]

Keeps track of allocated arrays of type ACE_Notification_Buffer. The idea is to amortize allocation costs by allocating multiple ACE_Notification_Buffer objects at a time.

Definition at line 134 of file Notification_Queue.h.

Referenced by reset().

Buffer_List ACE_Notification_Queue::free_queue_ [private]

Keeps track of all free buffers.

Definition at line 142 of file Notification_Queue.h.

Referenced by allocate_more_buffers(), pop_next_notification(), purge_pending_notifications(), push_new_notification(), and reset().

Buffer_List ACE_Notification_Queue::notify_queue_ [private]

Keeps track of all pending notifications.

Definition at line 139 of file Notification_Queue.h.

Referenced by pop_next_notification(), purge_pending_notifications(), push_new_notification(), and reset().

ACE_SYNCH_MUTEX ACE_Notification_Queue::notify_queue_lock_ [private]

Synchronization for handling of queues.

Definition at line 145 of file Notification_Queue.h.


The documentation for this class was generated from the following files:
Generated on Tue Feb 2 17:35:23 2010 for ACE by  doxygen 1.4.7