#include <Buffering_Strategy.h>
Collaboration diagram for TAO_Notify_Buffering_Strategy:
Public Member Functions | |
TAO_Notify_Buffering_Strategy (TAO_Notify_Message_Queue &msg_queue, const TAO_Notify_AdminProperties::Ptr &admin_properties) | |
~TAO_Notify_Buffering_Strategy () | |
void | update_qos_properties (const TAO_Notify_QoSProperties &qos_properties) |
int | enqueue (TAO_Notify_Method_Request_Queueable *method_request) |
int | dequeue (TAO_Notify_Method_Request_Queueable *&method_request, const ACE_Time_Value *abstime) |
void | shutdown (void) |
Shutdown. | |
ACE_Time_Value | oldest_event (void) |
Provide the time value of the oldest event in the queue. | |
Private Member Functions | |
int | queue (TAO_Notify_Method_Request_Queueable *method_request) |
Apply the Order Policy and queue. return -1 on error. | |
bool | discard (TAO_Notify_Method_Request_Queueable *method_request) |
Discard as per the Discard Policy. | |
Private Attributes | |
TAO_Notify_Message_Queue & | msg_queue_ |
= Data Members The local Message Queue | |
TAO_Notify_AdminProperties::Ptr | admin_properties_ |
Reference to the properties per event channel. | |
ACE_SYNCH_MUTEX & | global_queue_lock_ |
The shared global lock used by all the queues. | |
CORBA::Long & | global_queue_length_ |
The global queue length - queue length accross all the queues. | |
const TAO_Notify_Property_Long & | max_queue_length_ |
The maximum events that can be queued overall. | |
TAO_Notify_Property_Short | order_policy_ |
Order of events in internal buffers. | |
TAO_Notify_Property_Short | discard_policy_ |
Policy to discard when buffers are full. | |
TAO_Notify_Property_Long | max_events_per_consumer_ |
TAO_Notify_Property_Time | blocking_policy_ |
TAO_SYNCH_CONDITION & | global_not_full_ |
TAO_SYNCH_CONDITION | local_not_full_ |
TAO_SYNCH_CONDITION | local_not_empty_ |
Condition that batch size reached. | |
bool | shutdown_ |
Flag to shutdown. |
Definition at line 45 of file Buffering_Strategy.h.
|
Definition at line 22 of file Buffering_Strategy.cpp. References TAO_Notify_AdminProperties::Ptr, and TAO_Notify_Message_Queue.
00025 : msg_queue_ (msg_queue) 00026 , admin_properties_ (admin_properties) 00027 , global_queue_lock_ (admin_properties->global_queue_lock ()) 00028 , global_queue_length_ (admin_properties->global_queue_length ()) 00029 , max_queue_length_ (admin_properties->max_global_queue_length ()) 00030 , order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder) 00031 , discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder) 00032 , max_events_per_consumer_ (CosNotification::MaxEventsPerConsumer) 00033 , blocking_policy_ (TAO_Notify_Extensions::BlockingPolicy) 00034 , global_not_full_ (admin_properties->global_queue_not_full()) 00035 , local_not_full_ (global_queue_lock_) 00036 , local_not_empty_ (global_queue_lock_) 00037 , shutdown_ (false) 00038 { 00039 } |
|
Definition at line 41 of file Buffering_Strategy.cpp.
00042 { 00043 } |
|
Dequeue batch. This method will block for abstime if non-zero or else blocks till an item is available. Return -1 on error or if nothing is available, else the number of items actually dequeued (1). Definition at line 167 of file Buffering_Strategy.cpp. References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, ACE_Message_Queue<>::dequeue(), ETIME, global_not_full_, local_not_empty_, local_not_full_, ACE_Message_Queue<>::message_count(), and shutdown_.
00168 { 00169 ACE_Message_Block *mb; 00170 00171 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1); 00172 00173 if ( this->shutdown_ ) 00174 return -1; 00175 00176 while (this->msg_queue_.message_count () == 0) 00177 { 00178 this->local_not_empty_.wait (abstime); 00179 00180 if (this->shutdown_) 00181 return -1; 00182 00183 if (errno == ETIME) 00184 return 0; 00185 } 00186 00187 if (this->msg_queue_.dequeue (mb) == -1) 00188 return -1; 00189 00190 method_request = dynamic_cast<TAO_Notify_Method_Request_Queueable*>(mb); 00191 00192 if (method_request == 0) 00193 return -1; 00194 00195 --this->global_queue_length_; 00196 local_not_full_.signal(); 00197 global_not_full_.signal(); 00198 00199 return 1; 00200 } |
|
Discard as per the Discard Policy.
Definition at line 239 of file Buffering_Strategy.cpp. References ACE_DEBUG, ACE_Message_Queue<>::dequeue_deadline(), ACE_Message_Queue<>::dequeue_head(), ACE_Message_Queue<>::dequeue_prio(), discard_policy_, ACE_Message_Queue<>::enqueue_prio(), TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, ACE_Message_Block::msg_priority(), ACE_Message_Block::release(), shutdown_, and TAO_debug_level. Referenced by enqueue().
00240 { 00241 if (this->shutdown_) 00242 { 00243 return false; 00244 } 00245 00246 ACE_Message_Block* mb = 0; 00247 int result = -1; 00248 00249 if (this->discard_policy_.is_valid() == 0 || 00250 this->discard_policy_ == CosNotification::AnyOrder || 00251 this->discard_policy_ == CosNotification::FifoOrder) 00252 { 00253 result = this->msg_queue_.dequeue_head (mb); 00254 } 00255 else if (this->discard_policy_ == CosNotification::LifoOrder) 00256 { 00257 // The most current message is NOT the newest one in the queue. It's 00258 // the one we're about to add to the queue. 00259 result = -1; 00260 } 00261 else if (this->discard_policy_ == CosNotification::DeadlineOrder) 00262 { 00263 result = this->msg_queue_.dequeue_deadline (mb); 00264 } 00265 else if (this->discard_policy_ == CosNotification::PriorityOrder) 00266 { 00267 result = this->msg_queue_.dequeue_prio (mb); 00268 if (mb->msg_priority() >= method_request->msg_priority()) 00269 { 00270 this->msg_queue_.enqueue_prio (mb); 00271 result = -1; 00272 } 00273 } 00274 else 00275 { 00276 if (TAO_debug_level > 0) 00277 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid discard policy\n")); 00278 result = this->msg_queue_.dequeue_head (mb); 00279 } 00280 00281 if (result != -1) 00282 { 00283 ACE_Message_Block::release (mb); 00284 return true; 00285 } 00286 00287 return false; 00288 } |
|
Enqueue according the enqueing strategy. Return -1 on error else the number of items in the queue. Definition at line 97 of file Buffering_Strategy.cpp. References ACE_DEBUG, ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, blocking_policy_, discard(), ETIME, ACE_OS::gettimeofday(), global_not_full_, TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, local_not_empty_, local_not_full_, max_events_per_consumer_, max_queue_length_, ACE_Message_Queue<>::message_count(), queue(), shutdown_, and TAO_Notify_PropertyBase_T< TYPE >::value().
00098 { 00099 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1); 00100 00101 if (this->shutdown_) 00102 return -1; 00103 00104 bool discarded_existing = false; 00105 00106 bool local_overflow = this->max_events_per_consumer_.is_valid() && 00107 static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value(); 00108 00109 bool global_overflow = this->max_queue_length_.value () != 0 && 00110 this->global_queue_length_ >= this->max_queue_length_.value (); 00111 00112 while (local_overflow || global_overflow) 00113 { 00114 if (blocking_policy_.is_valid()) 00115 { 00116 ACE_Time_Value timeout; 00117 ORBSVCS_Time::TimeT_to_Time_Value(timeout, blocking_policy_.value()); 00118 // Condition variables take an absolute time 00119 timeout += ACE_OS::gettimeofday(); 00120 if (local_overflow) 00121 { 00122 local_not_full_.wait(&timeout); 00123 } 00124 else 00125 { 00126 global_not_full_.wait(&timeout); 00127 } 00128 if (errno != ETIME) 00129 { 00130 local_overflow = 00131 this->max_events_per_consumer_.is_valid() && 00132 static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value(); 00133 global_overflow = 00134 this->max_queue_length_.value () != 0 && 00135 this->global_queue_length_ >= this->max_queue_length_.value (); 00136 continue; 00137 } 00138 } 00139 00140 discarded_existing = this->discard(method_request); 00141 if (discarded_existing) 00142 { 00143 --this->global_queue_length_; 00144 local_not_full_.signal(); 00145 global_not_full_.signal(); 00146 } 00147 break; 00148 } 00149 00150 if (! (local_overflow || global_overflow) || discarded_existing) 00151 { 00152 if (this->queue (method_request) == -1) 00153 { 00154 ACE_DEBUG((LM_DEBUG, 00155 "Notify (%P|%t) - Panic! failed to enqueue event\n")); 00156 return -1; 00157 } 00158 00159 ++this->global_queue_length_; 00160 00161 local_not_empty_.signal (); 00162 } 00163 return ACE_Utils::truncate_cast<int> (this->msg_queue_.message_count ()); 00164 } |
|
Provide the time value of the oldest event in the queue.
Definition at line 73 of file Buffering_Strategy.cpp. References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, and TAO_Notify_Method_Request_Queueable::creation_time().
00074 { 00075 ACE_Time_Value tv (ACE_Time_Value::max_time); 00076 ACE_Message_Block* mb = 0; 00077 00078 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, tv); 00079 TAO_Notify_Message_Queue::ITERATOR itr (this->msg_queue_); 00080 while(itr.next (mb)) 00081 { 00082 TAO_Notify_Method_Request_Queueable* event = 00083 dynamic_cast<TAO_Notify_Method_Request_Queueable*> (mb); 00084 if (event != 0) 00085 { 00086 const ACE_Time_Value& etime = event->creation_time (); 00087 if (etime < tv) 00088 tv = etime; 00089 } 00090 itr.advance (); 00091 } 00092 00093 return tv; 00094 } |
|
Apply the Order Policy and queue. return -1 on error.
Definition at line 203 of file Buffering_Strategy.cpp. References ACE_DEBUG, ACE_Message_Queue<>::enqueue_deadline(), ACE_Message_Queue<>::enqueue_prio(), ACE_Message_Queue<>::enqueue_tail(), TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, order_policy_, shutdown_, TAO_debug_level, and TAO_Notify_PropertyBase_T< TYPE >::value(). Referenced by enqueue().
00204 { 00205 if ( this->shutdown_ ) 00206 return -1; 00207 00208 CORBA::Short order = this->order_policy_.value(); 00209 00210 if (! this->order_policy_.is_valid() || 00211 order == CosNotification::AnyOrder || 00212 order == CosNotification::FifoOrder) 00213 { 00214 if (TAO_debug_level > 0) 00215 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in fifo order\n")); 00216 return this->msg_queue_.enqueue_tail (method_request); 00217 } 00218 00219 if (order == CosNotification::PriorityOrder) 00220 { 00221 if (TAO_debug_level > 0) 00222 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in priority order\n")); 00223 return this->msg_queue_.enqueue_prio (method_request); 00224 } 00225 00226 if (order == CosNotification::DeadlineOrder) 00227 { 00228 if (TAO_debug_level > 0) 00229 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in deadline order\n")); 00230 return this->msg_queue_.enqueue_deadline (method_request); 00231 } 00232 00233 if (TAO_debug_level > 0) 00234 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n")); 00235 return this->msg_queue_.enqueue_tail (method_request); 00236 } |
|
Shutdown.
Definition at line 56 of file Buffering_Strategy.cpp. References ACE_GUARD, ACE_SYNCH_MUTEX, global_not_full_, local_not_empty_, local_not_full_, and shutdown_.
00057 { 00058 ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_); 00059 00060 if (this->shutdown_) 00061 { 00062 return; 00063 } 00064 00065 this->shutdown_ = true; 00066 00067 this->local_not_empty_.broadcast (); 00068 this->global_not_full_.broadcast(); 00069 this->local_not_full_.broadcast(); 00070 } |
|
Update state with the following QoS Properties: Order Policy Discard Policy MaxEventsPerConsumer Definition at line 47 of file Buffering_Strategy.cpp.
00048 { 00049 this->order_policy_.set (qos_properties); 00050 this->discard_policy_.set (qos_properties); 00051 this->max_events_per_consumer_.set(qos_properties); 00052 this->blocking_policy_.set (qos_properties); 00053 } |
|
Reference to the properties per event channel.
Definition at line 89 of file Buffering_Strategy.h. |
|
Definition at line 107 of file Buffering_Strategy.h. Referenced by enqueue(). |
|
Policy to discard when buffers are full.
Definition at line 104 of file Buffering_Strategy.h. Referenced by discard(). |
|
Definition at line 110 of file Buffering_Strategy.h. Referenced by dequeue(), enqueue(), and shutdown(). |
|
The global queue length - queue length accross all the queues.
Definition at line 95 of file Buffering_Strategy.h. |
|
The shared global lock used by all the queues.
Definition at line 92 of file Buffering_Strategy.h. |
|
Condition that batch size reached.
Definition at line 114 of file Buffering_Strategy.h. Referenced by dequeue(), enqueue(), and shutdown(). |
|
Definition at line 111 of file Buffering_Strategy.h. Referenced by dequeue(), enqueue(), and shutdown(). |
|
Definition at line 106 of file Buffering_Strategy.h. Referenced by enqueue(). |
|
The maximum events that can be queued overall.
Definition at line 98 of file Buffering_Strategy.h. Referenced by enqueue(). |
|
= Data Members The local Message Queue
Definition at line 86 of file Buffering_Strategy.h. |
|
Order of events in internal buffers.
Definition at line 101 of file Buffering_Strategy.h. Referenced by queue(). |
|
Flag to shutdown.
Definition at line 117 of file Buffering_Strategy.h. Referenced by dequeue(), discard(), enqueue(), queue(), and shutdown(). |