ACE_Notification_Queue Class Reference

Implements a user-space queue to send Reactor notifications. More...

#include <Notification_Queue.h>

Inheritance diagram for ACE_Notification_Queue:

Inheritance graph
[legend]
Collaboration diagram for ACE_Notification_Queue:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ACE_Notification_Queue ()
 ~ACE_Notification_Queue ()
int open ()
 Pre-allocate resources in the queue.

void reset ()
 Release all resources in the queue.

int purge_pending_notifications (ACE_Event_Handler *eh, ACE_Reactor_Mask mask)
 Remove all elements in the queue matching eh and mask.

int push_new_notification (ACE_Notification_Buffer const &buffer)
 Add a new notification to the queue.

int pop_next_notification (ACE_Notification_Buffer &current, bool &more_messages_queued, ACE_Notification_Buffer &next)
 Extract the next notification from the queue.


Private Types

typedef ACE_Intrusive_List<
ACE_Notification_Queue_Node
Buffer_List

Private Member Functions

int allocate_more_buffers ()
 Allocate more memory for the queue.


Private Attributes

ACE_Unbounded_Queue< ACE_Notification_Queue_Node * > alloc_queue_
Buffer_List notify_queue_
 Keeps track of all pending notifications.

Buffer_List free_queue_
 Keeps track of all free buffers.

ACE_SYNCH_MUTEX notify_queue_lock_
 Synchronization for handling of queues.


Detailed Description

Implements a user-space queue to send Reactor notifications.

The ACE_Reactor uses a pipe to send wake up the thread running the event loop from other threads. This pipe can be limited in size under some operating systems. For some applications, this limit presents a problem. A user-space notification queue is used to overcome those limitations. The queue tries to use as few resources on the pipe as possible, while keeping all the data in user space.

This code was refactored from Select_Reactor_Base.

Definition at line 80 of file Notification_Queue.h.


Member Typedef Documentation

typedef ACE_Intrusive_List<ACE_Notification_Queue_Node> ACE_Notification_Queue::Buffer_List [private]
 

Definition at line 136 of file Notification_Queue.h.

Referenced by reset().


Constructor & Destructor Documentation

ACE_Notification_Queue::ACE_Notification_Queue  ) 
 

Definition at line 10 of file Notification_Queue.cpp.

00011   : ACE_Copy_Disabled()
00012   , alloc_queue_()
00013   , notify_queue_()
00014   , free_queue_()
00015 {
00016 }

ACE_Notification_Queue::~ACE_Notification_Queue  ) 
 

Definition at line 19 of file Notification_Queue.cpp.

References reset().

00020 {
00021   reset();
00022 }


Member Function Documentation

int ACE_Notification_Queue::allocate_more_buffers  )  [private]
 

Allocate more memory for the queue.

Definition at line 75 of file Notification_Queue.cpp.

References ACE_NEW_RETURN, ACE_REACTOR_NOTIFICATION_ARRAY_SIZE, ACE_TRACE, alloc_queue_, ACE_Unbounded_Queue< ACE_Notification_Queue_Node * >::enqueue_head(), free_queue_, and ACE_Intrusive_List< T >::push_front().

Referenced by open(), and push_new_notification().

00076 {
00077   ACE_TRACE ("ACE_Notification_Queue::allocate_more_buffers");
00078 
00079   ACE_Notification_Queue_Node *temp = 0;
00080 
00081   ACE_NEW_RETURN (temp,
00082                   ACE_Notification_Queue_Node[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00083                   -1);
00084 
00085   if (this->alloc_queue_.enqueue_head (temp) == -1)
00086     {
00087       delete [] temp;
00088       return -1;
00089     }
00090 
00091   for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
00092     {
00093       free_queue_.push_front(temp + i);
00094     }
00095 
00096   return 0;
00097 }

int ACE_Notification_Queue::open  ) 
 

Pre-allocate resources in the queue.

Definition at line 26 of file Notification_Queue.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, ACE_TRACE, allocate_more_buffers(), free_queue_, and ACE_Intrusive_List< T >::is_empty().

00027 {
00028   ACE_TRACE ("ACE_Notification_Queue::open");
00029 
00030   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00031 
00032   if (!this->free_queue_.is_empty ())
00033     return 0;
00034 
00035   return allocate_more_buffers();
00036 }

int ACE_Notification_Queue::pop_next_notification ACE_Notification_Buffer current,
bool &  more_messages_queued,
ACE_Notification_Buffer next
 

Extract the next notification from the queue.

Returns:
-1 on failure, 1 if a message was popped, 0 otherwise

Definition at line 193 of file Notification_Queue.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, ACE_TRACE, free_queue_, ACE_Notification_Queue_Node::get(), ACE_Intrusive_List< T >::head(), ACE_Intrusive_List< T >::is_empty(), notify_queue_, ACE_Intrusive_List< T >::pop_front(), and ACE_Intrusive_List< T >::push_front().

00197 {
00198   ACE_TRACE ("ACE_Notification_Queue::pop_next_notification");
00199 
00200   more_messages_queued = false;
00201 
00202   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00203 
00204   if (notify_queue_.is_empty ())
00205     {
00206       return 0;
00207     }
00208 
00209   ACE_Notification_Queue_Node * node =
00210     notify_queue_.pop_front();
00211 
00212   current = node->get();
00213   free_queue_.push_front(node);
00214 
00215   if(!this->notify_queue_.is_empty())
00216     {
00217       more_messages_queued = true;
00218       next = notify_queue_.head()->get();
00219     }
00220 
00221   return 1;
00222 }

int ACE_Notification_Queue::purge_pending_notifications ACE_Event_Handler eh,
ACE_Reactor_Mask  mask
 

Remove all elements in the queue matching eh and mask.

I suggest reading the documentation in ACE_Reactor to find a more detailed description. This is just a helper function.

Definition at line 101 of file Notification_Queue.cpp.

References ACE_GUARD_RETURN, ACE_Reactor_Mask, ACE_SYNCH_MUTEX, ACE_TRACE, ACE_Notification_Queue_Node::clear_mask(), ACE_Notification_Buffer::eh_, free_queue_, ACE_Notification_Queue_Node::get(), ACE_Intrusive_List< T >::head(), ACE_Intrusive_List< T >::is_empty(), ACE_Notification_Queue_Node::mask_disables_all_notifications(), ACE_Notification_Queue_Node::matches_for_purging(), ACE_Intrusive_List_Node< ACE_Notification_Queue_Node >::next(), notify_queue_, ACE_Intrusive_List< T >::push_front(), ACE_Event_Handler::remove_reference(), and ACE_Intrusive_List< T >::unsafe_remove().

00103 {
00104   ACE_TRACE ("ACE_Notification_Queue::purge_pending_notifications");
00105 
00106   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00107 
00108   if (this->notify_queue_.is_empty ())
00109     return 0;
00110 
00111   int number_purged = 0;
00112   ACE_Notification_Queue_Node * node = notify_queue_.head();
00113   while(node != 0)
00114     {
00115       if (!node->matches_for_purging(eh))
00116         {
00117           // Easy case, skip to the next node
00118           node = node->next();
00119           continue;
00120         }
00121 
00122       if (!node->mask_disables_all_notifications(mask))
00123         {
00124           // ... another easy case, skip this node too, but clear the
00125           // mask first ...
00126           node->clear_mask(mask);
00127           node = node->next();
00128           continue;
00129         }
00130 
00131       // ... this is the more complicated case, we want to remove the
00132       // node from the notify_queue_ list.  First save the next node
00133       // on the list:
00134       ACE_Notification_Queue_Node * next = node->next();
00135 
00136       // ... then remove it ...
00137       notify_queue_.unsafe_remove(node);
00138       ++number_purged;
00139 
00140       // ... release resources ...
00141       ACE_Event_Handler *event_handler = node->get().eh_;
00142       event_handler->remove_reference ();
00143 
00144       // ... now this is a free node ...
00145       free_queue_.push_front(node);
00146 
00147       // ... go to the next node, if there is one ...
00148       node = next;
00149     }
00150 
00151   return number_purged;
00152 }

int ACE_Notification_Queue::push_new_notification ACE_Notification_Buffer const &  buffer  ) 
 

Add a new notification to the queue.

Returns:
-1 on failure, 1 if a new message should be sent through the pipe and 0 otherwise.

Definition at line 155 of file Notification_Queue.cpp.

References ACE_ASSERT, ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, ACE_TRACE, allocate_more_buffers(), free_queue_, ACE_Intrusive_List< T >::is_empty(), notify_queue_, ACE_Intrusive_List< T >::pop_front(), ACE_Intrusive_List< T >::push_back(), and ACE_Notification_Queue_Node::set().

00157 {
00158   ACE_TRACE ("ACE_Notification_Queue::push_new_notification");
00159 
00160   bool notification_required = false;
00161 
00162   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00163 
00164   // No pending notifications.
00165   if (this->notify_queue_.is_empty ())
00166     notification_required = true;
00167 
00168   if (free_queue_.is_empty())
00169     {
00170       if (allocate_more_buffers() == -1)
00171         {
00172           return -1;
00173         }
00174     }
00175 
00176   ACE_Notification_Queue_Node * node =
00177     free_queue_.pop_front();
00178 
00179   ACE_ASSERT (node != 0);
00180   node->set(buffer);
00181 
00182   notify_queue_.push_back(node);
00183 
00184   if (!notification_required)
00185     {
00186       return 0;
00187     }
00188 
00189   return 1;
00190 }

void ACE_Notification_Queue::reset  ) 
 

Release all resources in the queue.

Definition at line 40 of file Notification_Queue.cpp.

References ACE_TRACE, ACE_Unbounded_Queue_Iterator< T >::advance(), alloc_queue_, Buffer_List, ACE_Notification_Buffer::eh_, free_queue_, ACE_Notification_Queue_Node::get(), ACE_Intrusive_List< T >::head(), ACE_Unbounded_Queue_Iterator< T >::next(), ACE_Intrusive_List_Node< ACE_Notification_Queue_Node >::next(), notify_queue_, ACE_Event_Handler::remove_reference(), ACE_Unbounded_Queue< ACE_Notification_Queue_Node * >::reset(), and ACE_Intrusive_List< ACE_Notification_Queue_Node >::swap().

Referenced by ~ACE_Notification_Queue().

00041 {
00042   ACE_TRACE ("ACE_Notification_Queue::reset");
00043 
00044   // Release all the event handlers still in the queue ...
00045   for (ACE_Notification_Queue_Node * node = notify_queue_.head();
00046        node != 0;
00047        node = node->next())
00048     {
00049       if (node->get().eh_ == 0)
00050         {
00051           continue;
00052         }
00053       (void) node->get().eh_->remove_reference();
00054     }
00055 
00056   // ... free up the dynamically allocated resources ...
00057   ACE_Notification_Queue_Node **b = 0;
00058   for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Queue_Node *> alloc_iter (this->alloc_queue_);
00059        alloc_iter.next (b) != 0;
00060        alloc_iter.advance ())
00061     {
00062       delete [] *b;
00063       *b = 0;
00064     }
00065 
00066   // ... cleanup the list of allocated blocks ...
00067   this->alloc_queue_.reset ();
00068 
00069   // ... swap with empty lists to reset the contents ...
00070   Buffer_List().swap(notify_queue_);
00071   Buffer_List().swap(free_queue_);
00072 }


Member Data Documentation

ACE_Unbounded_Queue<ACE_Notification_Queue_Node*> ACE_Notification_Queue::alloc_queue_ [private]
 

Keeps track of allocated arrays of type ACE_Notification_Buffer. The idea is to amortize allocation costs by allocating multiple ACE_Notification_Buffer objects at a time.

Definition at line 134 of file Notification_Queue.h.

Referenced by allocate_more_buffers(), and reset().

Buffer_List ACE_Notification_Queue::free_queue_ [private]
 

Keeps track of all free buffers.

Definition at line 142 of file Notification_Queue.h.

Referenced by allocate_more_buffers(), open(), pop_next_notification(), purge_pending_notifications(), push_new_notification(), and reset().

Buffer_List ACE_Notification_Queue::notify_queue_ [private]
 

Keeps track of all pending notifications.

Definition at line 139 of file Notification_Queue.h.

Referenced by pop_next_notification(), purge_pending_notifications(), push_new_notification(), and reset().

ACE_SYNCH_MUTEX ACE_Notification_Queue::notify_queue_lock_ [private]
 

Synchronization for handling of queues.

Definition at line 145 of file Notification_Queue.h.


The documentation for this class was generated from the following files:
Generated on Sun Jan 27 12:56:04 2008 for ACE by doxygen 1.3.6