#include <Notification_Queue.h>
Inheritance diagram for ACE_Notification_Queue:
Public Member Functions | |
ACE_Notification_Queue () | |
~ACE_Notification_Queue () | |
int | open () |
Pre-allocate resources in the queue. | |
void | reset () |
Release all resources in the queue. | |
int | purge_pending_notifications (ACE_Event_Handler *eh, ACE_Reactor_Mask mask) |
Remove all elements in the queue matching eh and mask . | |
int | push_new_notification (ACE_Notification_Buffer const &buffer) |
Add a new notification to the queue. | |
int | pop_next_notification (ACE_Notification_Buffer ¤t, bool &more_messages_queued, ACE_Notification_Buffer &next) |
Extract the next notification from the queue. | |
Private Types | |
typedef ACE_Intrusive_List< ACE_Notification_Queue_Node > | Buffer_List |
Private Member Functions | |
int | allocate_more_buffers () |
Allocate more memory for the queue. | |
Private Attributes | |
ACE_Unbounded_Queue< ACE_Notification_Queue_Node * > | alloc_queue_ |
Buffer_List | notify_queue_ |
Keeps track of all pending notifications. | |
Buffer_List | free_queue_ |
Keeps track of all free buffers. | |
ACE_SYNCH_MUTEX | notify_queue_lock_ |
Synchronization for handling of queues. |
The ACE_Reactor uses a pipe to send wake up the thread running the event loop from other threads. This pipe can be limited in size under some operating systems. For some applications, this limit presents a problem. A user-space notification queue is used to overcome those limitations. The queue tries to use as few resources on the pipe as possible, while keeping all the data in user space.
This code was refactored from Select_Reactor_Base.
Definition at line 80 of file Notification_Queue.h.
|
Definition at line 136 of file Notification_Queue.h. Referenced by reset(). |
|
Definition at line 10 of file Notification_Queue.cpp.
00011 : ACE_Copy_Disabled() 00012 , alloc_queue_() 00013 , notify_queue_() 00014 , free_queue_() 00015 { 00016 } |
|
Definition at line 19 of file Notification_Queue.cpp. References reset().
00020 { 00021 reset(); 00022 } |
|
Allocate more memory for the queue.
Definition at line 75 of file Notification_Queue.cpp. References ACE_NEW_RETURN, ACE_REACTOR_NOTIFICATION_ARRAY_SIZE, ACE_TRACE, alloc_queue_, ACE_Unbounded_Queue< ACE_Notification_Queue_Node * >::enqueue_head(), free_queue_, and ACE_Intrusive_List< T >::push_front(). Referenced by open(), and push_new_notification().
00076 { 00077 ACE_TRACE ("ACE_Notification_Queue::allocate_more_buffers"); 00078 00079 ACE_Notification_Queue_Node *temp = 0; 00080 00081 ACE_NEW_RETURN (temp, 00082 ACE_Notification_Queue_Node[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], 00083 -1); 00084 00085 if (this->alloc_queue_.enqueue_head (temp) == -1) 00086 { 00087 delete [] temp; 00088 return -1; 00089 } 00090 00091 for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i) 00092 { 00093 free_queue_.push_front(temp + i); 00094 } 00095 00096 return 0; 00097 } |
|
Pre-allocate resources in the queue.
Definition at line 26 of file Notification_Queue.cpp. References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, ACE_TRACE, allocate_more_buffers(), free_queue_, and ACE_Intrusive_List< T >::is_empty().
00027 { 00028 ACE_TRACE ("ACE_Notification_Queue::open"); 00029 00030 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); 00031 00032 if (!this->free_queue_.is_empty ()) 00033 return 0; 00034 00035 return allocate_more_buffers(); 00036 } |
|
Extract the next notification from the queue.
Definition at line 193 of file Notification_Queue.cpp. References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, ACE_TRACE, free_queue_, ACE_Notification_Queue_Node::get(), ACE_Intrusive_List< T >::head(), ACE_Intrusive_List< T >::is_empty(), notify_queue_, ACE_Intrusive_List< T >::pop_front(), and ACE_Intrusive_List< T >::push_front().
00197 { 00198 ACE_TRACE ("ACE_Notification_Queue::pop_next_notification"); 00199 00200 more_messages_queued = false; 00201 00202 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); 00203 00204 if (notify_queue_.is_empty ()) 00205 { 00206 return 0; 00207 } 00208 00209 ACE_Notification_Queue_Node * node = 00210 notify_queue_.pop_front(); 00211 00212 current = node->get(); 00213 free_queue_.push_front(node); 00214 00215 if(!this->notify_queue_.is_empty()) 00216 { 00217 more_messages_queued = true; 00218 next = notify_queue_.head()->get(); 00219 } 00220 00221 return 1; 00222 } |
|
Remove all elements in the queue matching I suggest reading the documentation in ACE_Reactor to find a more detailed description. This is just a helper function. Definition at line 101 of file Notification_Queue.cpp. References ACE_GUARD_RETURN, ACE_Reactor_Mask, ACE_SYNCH_MUTEX, ACE_TRACE, ACE_Notification_Queue_Node::clear_mask(), ACE_Notification_Buffer::eh_, free_queue_, ACE_Notification_Queue_Node::get(), ACE_Intrusive_List< T >::head(), ACE_Intrusive_List< T >::is_empty(), ACE_Notification_Queue_Node::mask_disables_all_notifications(), ACE_Notification_Queue_Node::matches_for_purging(), ACE_Intrusive_List_Node< ACE_Notification_Queue_Node >::next(), notify_queue_, ACE_Intrusive_List< T >::push_front(), ACE_Event_Handler::remove_reference(), and ACE_Intrusive_List< T >::unsafe_remove().
00103 { 00104 ACE_TRACE ("ACE_Notification_Queue::purge_pending_notifications"); 00105 00106 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); 00107 00108 if (this->notify_queue_.is_empty ()) 00109 return 0; 00110 00111 int number_purged = 0; 00112 ACE_Notification_Queue_Node * node = notify_queue_.head(); 00113 while(node != 0) 00114 { 00115 if (!node->matches_for_purging(eh)) 00116 { 00117 // Easy case, skip to the next node 00118 node = node->next(); 00119 continue; 00120 } 00121 00122 if (!node->mask_disables_all_notifications(mask)) 00123 { 00124 // ... another easy case, skip this node too, but clear the 00125 // mask first ... 00126 node->clear_mask(mask); 00127 node = node->next(); 00128 continue; 00129 } 00130 00131 // ... this is the more complicated case, we want to remove the 00132 // node from the notify_queue_ list. First save the next node 00133 // on the list: 00134 ACE_Notification_Queue_Node * next = node->next(); 00135 00136 // ... then remove it ... 00137 notify_queue_.unsafe_remove(node); 00138 ++number_purged; 00139 00140 // ... release resources ... 00141 ACE_Event_Handler *event_handler = node->get().eh_; 00142 event_handler->remove_reference (); 00143 00144 // ... now this is a free node ... 00145 free_queue_.push_front(node); 00146 00147 // ... go to the next node, if there is one ... 00148 node = next; 00149 } 00150 00151 return number_purged; 00152 } |
|
Add a new notification to the queue.
Definition at line 155 of file Notification_Queue.cpp. References ACE_ASSERT, ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, ACE_TRACE, allocate_more_buffers(), free_queue_, ACE_Intrusive_List< T >::is_empty(), notify_queue_, ACE_Intrusive_List< T >::pop_front(), ACE_Intrusive_List< T >::push_back(), and ACE_Notification_Queue_Node::set().
00157 { 00158 ACE_TRACE ("ACE_Notification_Queue::push_new_notification"); 00159 00160 bool notification_required = false; 00161 00162 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); 00163 00164 // No pending notifications. 00165 if (this->notify_queue_.is_empty ()) 00166 notification_required = true; 00167 00168 if (free_queue_.is_empty()) 00169 { 00170 if (allocate_more_buffers() == -1) 00171 { 00172 return -1; 00173 } 00174 } 00175 00176 ACE_Notification_Queue_Node * node = 00177 free_queue_.pop_front(); 00178 00179 ACE_ASSERT (node != 0); 00180 node->set(buffer); 00181 00182 notify_queue_.push_back(node); 00183 00184 if (!notification_required) 00185 { 00186 return 0; 00187 } 00188 00189 return 1; 00190 } |
|
Release all resources in the queue.
Definition at line 40 of file Notification_Queue.cpp. References ACE_TRACE, ACE_Unbounded_Queue_Iterator< T >::advance(), alloc_queue_, Buffer_List, ACE_Notification_Buffer::eh_, free_queue_, ACE_Notification_Queue_Node::get(), ACE_Intrusive_List< T >::head(), ACE_Unbounded_Queue_Iterator< T >::next(), ACE_Intrusive_List_Node< ACE_Notification_Queue_Node >::next(), notify_queue_, ACE_Event_Handler::remove_reference(), ACE_Unbounded_Queue< ACE_Notification_Queue_Node * >::reset(), and ACE_Intrusive_List< ACE_Notification_Queue_Node >::swap(). Referenced by ~ACE_Notification_Queue().
00041 { 00042 ACE_TRACE ("ACE_Notification_Queue::reset"); 00043 00044 // Release all the event handlers still in the queue ... 00045 for (ACE_Notification_Queue_Node * node = notify_queue_.head(); 00046 node != 0; 00047 node = node->next()) 00048 { 00049 if (node->get().eh_ == 0) 00050 { 00051 continue; 00052 } 00053 (void) node->get().eh_->remove_reference(); 00054 } 00055 00056 // ... free up the dynamically allocated resources ... 00057 ACE_Notification_Queue_Node **b = 0; 00058 for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Queue_Node *> alloc_iter (this->alloc_queue_); 00059 alloc_iter.next (b) != 0; 00060 alloc_iter.advance ()) 00061 { 00062 delete [] *b; 00063 *b = 0; 00064 } 00065 00066 // ... cleanup the list of allocated blocks ... 00067 this->alloc_queue_.reset (); 00068 00069 // ... swap with empty lists to reset the contents ... 00070 Buffer_List().swap(notify_queue_); 00071 Buffer_List().swap(free_queue_); 00072 } |
|
Keeps track of allocated arrays of type ACE_Notification_Buffer. The idea is to amortize allocation costs by allocating multiple ACE_Notification_Buffer objects at a time. Definition at line 134 of file Notification_Queue.h. Referenced by allocate_more_buffers(), and reset(). |
|
Keeps track of all free buffers.
Definition at line 142 of file Notification_Queue.h. Referenced by allocate_more_buffers(), open(), pop_next_notification(), purge_pending_notifications(), push_new_notification(), and reset(). |
|
Keeps track of all pending notifications.
Definition at line 139 of file Notification_Queue.h. Referenced by pop_next_notification(), purge_pending_notifications(), push_new_notification(), and reset(). |
|
Synchronization for handling of queues.
Definition at line 145 of file Notification_Queue.h. |