00001
00002
00003 #include "ace/Notification_Queue.h"
00004
00005 #if !defined (__ACE_INLINE__)
00006 #include "ace/Notification_Queue.inl"
00007 #endif
00008
00009 ACE_Notification_Queue::
00010 ACE_Notification_Queue()
00011 : ACE_Copy_Disabled()
00012 , alloc_queue_()
00013 , notify_queue_()
00014 , free_queue_()
00015 {
00016 }
00017
00018 ACE_Notification_Queue::
00019 ~ACE_Notification_Queue()
00020 {
00021 reset();
00022 }
00023
00024 int
00025 ACE_Notification_Queue::
00026 open()
00027 {
00028 ACE_TRACE ("ACE_Notification_Queue::open");
00029
00030 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00031
00032 if (!this->free_queue_.is_empty ())
00033 return 0;
00034
00035 return allocate_more_buffers();
00036 }
00037
00038 void
00039 ACE_Notification_Queue::
00040 reset()
00041 {
00042 ACE_TRACE ("ACE_Notification_Queue::reset");
00043
00044
00045 for (ACE_Notification_Queue_Node * node = notify_queue_.head();
00046 node != 0;
00047 node = node->next())
00048 {
00049 if (node->get().eh_ == 0)
00050 {
00051 continue;
00052 }
00053 (void) node->get().eh_->remove_reference();
00054 }
00055
00056
00057 ACE_Notification_Queue_Node **b = 0;
00058 for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Queue_Node *> alloc_iter (this->alloc_queue_);
00059 alloc_iter.next (b) != 0;
00060 alloc_iter.advance ())
00061 {
00062 delete [] *b;
00063 *b = 0;
00064 }
00065
00066
00067 this->alloc_queue_.reset ();
00068
00069
00070 Buffer_List().swap(notify_queue_);
00071 Buffer_List().swap(free_queue_);
00072 }
00073
00074 int ACE_Notification_Queue::
00075 allocate_more_buffers()
00076 {
00077 ACE_TRACE ("ACE_Notification_Queue::allocate_more_buffers");
00078
00079 ACE_Notification_Queue_Node *temp = 0;
00080
00081 ACE_NEW_RETURN (temp,
00082 ACE_Notification_Queue_Node[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00083 -1);
00084
00085 if (this->alloc_queue_.enqueue_head (temp) == -1)
00086 {
00087 delete [] temp;
00088 return -1;
00089 }
00090
00091 for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
00092 {
00093 free_queue_.push_front(temp + i);
00094 }
00095
00096 return 0;
00097 }
00098
00099 int
00100 ACE_Notification_Queue::
00101 purge_pending_notifications(ACE_Event_Handler * eh,
00102 ACE_Reactor_Mask mask)
00103 {
00104 ACE_TRACE ("ACE_Notification_Queue::purge_pending_notifications");
00105
00106 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00107
00108 if (this->notify_queue_.is_empty ())
00109 return 0;
00110
00111 int number_purged = 0;
00112 ACE_Notification_Queue_Node * node = notify_queue_.head();
00113 while(node != 0)
00114 {
00115 if (!node->matches_for_purging(eh))
00116 {
00117
00118 node = node->next();
00119 continue;
00120 }
00121
00122 if (!node->mask_disables_all_notifications(mask))
00123 {
00124
00125
00126 node->clear_mask(mask);
00127 node = node->next();
00128 continue;
00129 }
00130
00131
00132
00133
00134 ACE_Notification_Queue_Node * next = node->next();
00135
00136
00137 notify_queue_.unsafe_remove(node);
00138 ++number_purged;
00139
00140
00141 ACE_Event_Handler *event_handler = node->get().eh_;
00142 event_handler->remove_reference ();
00143
00144
00145 free_queue_.push_front(node);
00146
00147
00148 node = next;
00149 }
00150
00151 return number_purged;
00152 }
00153
00154 int ACE_Notification_Queue::
00155 push_new_notification(
00156 ACE_Notification_Buffer const & buffer)
00157 {
00158 ACE_TRACE ("ACE_Notification_Queue::push_new_notification");
00159
00160 bool notification_required = false;
00161
00162 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00163
00164
00165 if (this->notify_queue_.is_empty ())
00166 notification_required = true;
00167
00168 if (free_queue_.is_empty())
00169 {
00170 if (allocate_more_buffers() == -1)
00171 {
00172 return -1;
00173 }
00174 }
00175
00176 ACE_Notification_Queue_Node * node =
00177 free_queue_.pop_front();
00178
00179 ACE_ASSERT (node != 0);
00180 node->set(buffer);
00181
00182 notify_queue_.push_back(node);
00183
00184 if (!notification_required)
00185 {
00186 return 0;
00187 }
00188
00189 return 1;
00190 }
00191
00192 int
00193 ACE_Notification_Queue::pop_next_notification(
00194 ACE_Notification_Buffer & current,
00195 bool & more_messages_queued,
00196 ACE_Notification_Buffer & next)
00197 {
00198 ACE_TRACE ("ACE_Notification_Queue::pop_next_notification");
00199
00200 more_messages_queued = false;
00201
00202 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00203
00204 if (notify_queue_.is_empty ())
00205 {
00206 return 0;
00207 }
00208
00209 ACE_Notification_Queue_Node * node =
00210 notify_queue_.pop_front();
00211
00212 current = node->get();
00213 free_queue_.push_front(node);
00214
00215 if(!this->notify_queue_.is_empty())
00216 {
00217 more_messages_queued = true;
00218 next = notify_queue_.head()->get();
00219 }
00220
00221 return 1;
00222 }