Go to the documentation of this file.00001
00002
00003 #include "ace/Notification_Queue.h"
00004
00005 #if !defined (__ACE_INLINE__)
00006 #include "ace/Notification_Queue.inl"
00007 #endif
00008
00009 #include "ace/Guard_T.h"
00010
00011 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00012
00013 ACE_Notification_Queue::
00014 ACE_Notification_Queue()
00015 : ACE_Copy_Disabled()
00016 , alloc_queue_()
00017 , notify_queue_()
00018 , free_queue_()
00019 {
00020 }
00021
00022 ACE_Notification_Queue::
00023 ~ACE_Notification_Queue()
00024 {
00025 reset();
00026 }
00027
00028 int
00029 ACE_Notification_Queue::open()
00030 {
00031 ACE_TRACE ("ACE_Notification_Queue::open");
00032
00033 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00034
00035 if (!this->free_queue_.is_empty ())
00036 return 0;
00037
00038 return allocate_more_buffers();
00039 }
00040
00041 void
00042 ACE_Notification_Queue::reset()
00043 {
00044 ACE_TRACE ("ACE_Notification_Queue::reset");
00045
00046
00047 for (ACE_Notification_Queue_Node * node = notify_queue_.head();
00048 node != 0;
00049 node = node->next())
00050 {
00051 if (node->get().eh_ == 0)
00052 {
00053 continue;
00054 }
00055 (void) node->get().eh_->remove_reference();
00056 }
00057
00058
00059 ACE_Notification_Queue_Node **b = 0;
00060 for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Queue_Node *> alloc_iter (this->alloc_queue_);
00061 alloc_iter.next (b) != 0;
00062 alloc_iter.advance ())
00063 {
00064 delete [] *b;
00065 *b = 0;
00066 }
00067
00068
00069 this->alloc_queue_.reset ();
00070
00071
00072 Buffer_List().swap(notify_queue_);
00073 Buffer_List().swap(free_queue_);
00074 }
00075
00076 int
00077 ACE_Notification_Queue::allocate_more_buffers()
00078 {
00079 ACE_TRACE ("ACE_Notification_Queue::allocate_more_buffers");
00080
00081 ACE_Notification_Queue_Node *temp = 0;
00082
00083 ACE_NEW_RETURN (temp,
00084 ACE_Notification_Queue_Node[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00085 -1);
00086
00087 if (this->alloc_queue_.enqueue_head (temp) == -1)
00088 {
00089 delete [] temp;
00090 return -1;
00091 }
00092
00093 for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
00094 {
00095 free_queue_.push_front(temp + i);
00096 }
00097
00098 return 0;
00099 }
00100
00101 int
00102 ACE_Notification_Queue::purge_pending_notifications(
00103 ACE_Event_Handler * eh,
00104 ACE_Reactor_Mask mask)
00105 {
00106 ACE_TRACE ("ACE_Notification_Queue::purge_pending_notifications");
00107
00108 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00109
00110 if (this->notify_queue_.is_empty ())
00111 return 0;
00112
00113 int number_purged = 0;
00114 ACE_Notification_Queue_Node * node = notify_queue_.head();
00115 while(node != 0)
00116 {
00117 if (!node->matches_for_purging(eh))
00118 {
00119
00120 node = node->next();
00121 continue;
00122 }
00123
00124 if (!node->mask_disables_all_notifications(mask))
00125 {
00126
00127
00128 node->clear_mask(mask);
00129 node = node->next();
00130 continue;
00131 }
00132
00133
00134
00135
00136 ACE_Notification_Queue_Node * next = node->next();
00137
00138
00139 notify_queue_.unsafe_remove(node);
00140 ++number_purged;
00141
00142
00143 ACE_Event_Handler *event_handler = node->get().eh_;
00144 event_handler->remove_reference ();
00145
00146
00147 free_queue_.push_front(node);
00148
00149
00150 node = next;
00151 }
00152
00153 return number_purged;
00154 }
00155
00156 int
00157 ACE_Notification_Queue::push_new_notification(
00158 ACE_Notification_Buffer const & buffer)
00159 {
00160 ACE_TRACE ("ACE_Notification_Queue::push_new_notification");
00161
00162 bool notification_required = false;
00163
00164 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00165
00166
00167 if (this->notify_queue_.is_empty ())
00168 notification_required = true;
00169
00170 if (free_queue_.is_empty())
00171 {
00172 if (allocate_more_buffers() == -1)
00173 {
00174 return -1;
00175 }
00176 }
00177
00178 ACE_Notification_Queue_Node * node =
00179 free_queue_.pop_front();
00180
00181 ACE_ASSERT (node != 0);
00182 node->set(buffer);
00183
00184 notify_queue_.push_back(node);
00185
00186 if (!notification_required)
00187 {
00188 return 0;
00189 }
00190
00191 return 1;
00192 }
00193
00194 int
00195 ACE_Notification_Queue::pop_next_notification(
00196 ACE_Notification_Buffer & current,
00197 bool & more_messages_queued,
00198 ACE_Notification_Buffer & next)
00199 {
00200 ACE_TRACE ("ACE_Notification_Queue::pop_next_notification");
00201
00202 more_messages_queued = false;
00203
00204 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00205
00206 if (notify_queue_.is_empty ())
00207 {
00208 return 0;
00209 }
00210
00211 ACE_Notification_Queue_Node * node = notify_queue_.pop_front();
00212
00213 current = node->get();
00214 free_queue_.push_front(node);
00215
00216 if(!this->notify_queue_.is_empty())
00217 {
00218 more_messages_queued = true;
00219 next = notify_queue_.head()->get();
00220 }
00221
00222 return 1;
00223 }
00224
00225 ACE_END_VERSIONED_NAMESPACE_DECL