00001
00002
00003 #include "ace/Notification_Queue.h"
00004
00005 #if !defined (__ACE_INLINE__)
00006 #include "ace/Notification_Queue.inl"
00007 #endif
00008
00009 #include "ace/Guard_T.h"
00010
00011 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00012
00013 ACE_Notification_Queue::
00014 ACE_Notification_Queue()
00015 : ACE_Copy_Disabled()
00016 , alloc_queue_()
00017 , notify_queue_()
00018 , free_queue_()
00019 {
00020 }
00021
00022 ACE_Notification_Queue::
00023 ~ACE_Notification_Queue()
00024 {
00025 reset();
00026 }
00027
00028 int
00029 ACE_Notification_Queue::
00030 open()
00031 {
00032 ACE_TRACE ("ACE_Notification_Queue::open");
00033
00034 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00035
00036 if (!this->free_queue_.is_empty ())
00037 return 0;
00038
00039 return allocate_more_buffers();
00040 }
00041
00042 void
00043 ACE_Notification_Queue::
00044 reset()
00045 {
00046 ACE_TRACE ("ACE_Notification_Queue::reset");
00047
00048
00049 for (ACE_Notification_Queue_Node * node = notify_queue_.head();
00050 node != 0;
00051 node = node->next())
00052 {
00053 if (node->get().eh_ == 0)
00054 {
00055 continue;
00056 }
00057 (void) node->get().eh_->remove_reference();
00058 }
00059
00060
00061 ACE_Notification_Queue_Node **b = 0;
00062 for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Queue_Node *> alloc_iter (this->alloc_queue_);
00063 alloc_iter.next (b) != 0;
00064 alloc_iter.advance ())
00065 {
00066 delete [] *b;
00067 *b = 0;
00068 }
00069
00070
00071 this->alloc_queue_.reset ();
00072
00073
00074 Buffer_List().swap(notify_queue_);
00075 Buffer_List().swap(free_queue_);
00076 }
00077
00078 int ACE_Notification_Queue::
00079 allocate_more_buffers()
00080 {
00081 ACE_TRACE ("ACE_Notification_Queue::allocate_more_buffers");
00082
00083 ACE_Notification_Queue_Node *temp = 0;
00084
00085 ACE_NEW_RETURN (temp,
00086 ACE_Notification_Queue_Node[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00087 -1);
00088
00089 if (this->alloc_queue_.enqueue_head (temp) == -1)
00090 {
00091 delete [] temp;
00092 return -1;
00093 }
00094
00095 for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
00096 {
00097 free_queue_.push_front(temp + i);
00098 }
00099
00100 return 0;
00101 }
00102
00103 int
00104 ACE_Notification_Queue::
00105 purge_pending_notifications(ACE_Event_Handler * eh,
00106 ACE_Reactor_Mask mask)
00107 {
00108 ACE_TRACE ("ACE_Notification_Queue::purge_pending_notifications");
00109
00110 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00111
00112 if (this->notify_queue_.is_empty ())
00113 return 0;
00114
00115 int number_purged = 0;
00116 ACE_Notification_Queue_Node * node = notify_queue_.head();
00117 while(node != 0)
00118 {
00119 if (!node->matches_for_purging(eh))
00120 {
00121
00122 node = node->next();
00123 continue;
00124 }
00125
00126 if (!node->mask_disables_all_notifications(mask))
00127 {
00128
00129
00130 node->clear_mask(mask);
00131 node = node->next();
00132 continue;
00133 }
00134
00135
00136
00137
00138 ACE_Notification_Queue_Node * next = node->next();
00139
00140
00141 notify_queue_.unsafe_remove(node);
00142 ++number_purged;
00143
00144
00145 ACE_Event_Handler *event_handler = node->get().eh_;
00146 event_handler->remove_reference ();
00147
00148
00149 free_queue_.push_front(node);
00150
00151
00152 node = next;
00153 }
00154
00155 return number_purged;
00156 }
00157
00158 int ACE_Notification_Queue::
00159 push_new_notification(
00160 ACE_Notification_Buffer const & buffer)
00161 {
00162 ACE_TRACE ("ACE_Notification_Queue::push_new_notification");
00163
00164 bool notification_required = false;
00165
00166 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00167
00168
00169 if (this->notify_queue_.is_empty ())
00170 notification_required = true;
00171
00172 if (free_queue_.is_empty())
00173 {
00174 if (allocate_more_buffers() == -1)
00175 {
00176 return -1;
00177 }
00178 }
00179
00180 ACE_Notification_Queue_Node * node =
00181 free_queue_.pop_front();
00182
00183 ACE_ASSERT (node != 0);
00184 node->set(buffer);
00185
00186 notify_queue_.push_back(node);
00187
00188 if (!notification_required)
00189 {
00190 return 0;
00191 }
00192
00193 return 1;
00194 }
00195
00196 int
00197 ACE_Notification_Queue::pop_next_notification(
00198 ACE_Notification_Buffer & current,
00199 bool & more_messages_queued,
00200 ACE_Notification_Buffer & next)
00201 {
00202 ACE_TRACE ("ACE_Notification_Queue::pop_next_notification");
00203
00204 more_messages_queued = false;
00205
00206 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00207
00208 if (notify_queue_.is_empty ())
00209 {
00210 return 0;
00211 }
00212
00213 ACE_Notification_Queue_Node * node =
00214 notify_queue_.pop_front();
00215
00216 current = node->get();
00217 free_queue_.push_front(node);
00218
00219 if(!this->notify_queue_.is_empty())
00220 {
00221 more_messages_queued = true;
00222 next = notify_queue_.head()->get();
00223 }
00224
00225 return 1;
00226 }
00227
00228 ACE_END_VERSIONED_NAMESPACE_DECL