#include <Buffering_Strategy.h>
Collaboration diagram for TAO_Notify_Buffering_Strategy:
Public Member Functions | |
TAO_Notify_Buffering_Strategy (TAO_Notify_Message_Queue &msg_queue, const TAO_Notify_AdminProperties::Ptr &admin_properties) | |
~TAO_Notify_Buffering_Strategy () | |
void | update_qos_properties (const TAO_Notify_QoSProperties &qos_properties) |
int | enqueue (TAO_Notify_Method_Request_Queueable *method_request) |
int | dequeue (TAO_Notify_Method_Request_Queueable *&method_request, const ACE_Time_Value *abstime) |
void | shutdown (void) |
Shutdown. | |
ACE_Time_Value | oldest_event (void) |
Provide the time value of the oldest event in the queue. | |
void | set_tracker (Tracker *tracker) |
Set the tracker object. This strategy does not own the tracker. | |
Private Member Functions | |
int | queue (TAO_Notify_Method_Request_Queueable *method_request) |
Apply the Order Policy and queue. return -1 on error. | |
bool | discard (TAO_Notify_Method_Request_Queueable *method_request) |
Discard as per the Discard Policy. | |
Private Attributes | |
TAO_Notify_Message_Queue & | msg_queue_ |
The local Message Queue. | |
TAO_Notify_AdminProperties::Ptr | admin_properties_ |
Reference to the properties per event channel. | |
TAO_SYNCH_MUTEX & | global_queue_lock_ |
The shared global lock used by all the queues. | |
CORBA::Long & | global_queue_length_ |
The global queue length - queue length accross all the queues. | |
const TAO_Notify_Property_Long & | max_queue_length_ |
The maximum events that can be queued overall. | |
TAO_Notify_Property_Short | order_policy_ |
Order of events in internal buffers. | |
TAO_Notify_Property_Short | discard_policy_ |
Policy to discard when buffers are full. | |
TAO_Notify_Property_Long | max_events_per_consumer_ |
TAO_Notify_Property_Time | blocking_policy_ |
TAO_SYNCH_CONDITION & | global_not_full_ |
TAO_SYNCH_CONDITION | local_not_full_ |
TAO_SYNCH_CONDITION | local_not_empty_ |
Condition that batch size reached. | |
bool | shutdown_ |
Flag to shutdown. | |
Tracker * | tracker_ |
Optional queue tracker. | |
Classes | |
class | Tracker |
This interface allows tracking of the queue size. More... |
Definition at line 45 of file Buffering_Strategy.h.
TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_Notify_Buffering_Strategy::TAO_Notify_Buffering_Strategy | ( | TAO_Notify_Message_Queue & | msg_queue, | |
const TAO_Notify_AdminProperties::Ptr & | admin_properties | |||
) |
Definition at line 22 of file Buffering_Strategy.cpp.
00025 : msg_queue_ (msg_queue) 00026 , admin_properties_ (admin_properties) 00027 , global_queue_lock_ (admin_properties->global_queue_lock ()) 00028 , global_queue_length_ (admin_properties->global_queue_length ()) 00029 , max_queue_length_ (admin_properties->max_global_queue_length ()) 00030 , order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder) 00031 , discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder) 00032 , max_events_per_consumer_ (CosNotification::MaxEventsPerConsumer) 00033 , blocking_policy_ (TAO_Notify_Extensions::BlockingPolicy) 00034 , global_not_full_ (admin_properties->global_queue_not_full()) 00035 , local_not_full_ (global_queue_lock_) 00036 , local_not_empty_ (global_queue_lock_) 00037 , shutdown_ (false) 00038 , tracker_ (0) 00039 { 00040 }
TAO_Notify_Buffering_Strategy::~TAO_Notify_Buffering_Strategy | ( | ) |
int TAO_Notify_Buffering_Strategy::dequeue | ( | TAO_Notify_Method_Request_Queueable *& | method_request, | |
const ACE_Time_Value * | abstime | |||
) |
Dequeue batch. This method will block for abstime if non-zero or else blocks till an item is available. Return -1 on error or if nothing is available, else the number of items actually dequeued (1).
Definition at line 177 of file Buffering_Strategy.cpp.
References ACE_GUARD_RETURN, global_not_full_, global_queue_length_, local_not_full_, TAO_SYNCH_MUTEX, tracker_, and TAO_Notify_Buffering_Strategy::Tracker::update_queue_count().
00178 { 00179 ACE_Message_Block *mb; 00180 00181 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1); 00182 00183 if ( this->shutdown_ ) 00184 return -1; 00185 00186 while (this->msg_queue_.message_count () == 0) 00187 { 00188 this->local_not_empty_.wait (abstime); 00189 00190 if (this->shutdown_) 00191 return -1; 00192 00193 if (errno == ETIME) 00194 return 0; 00195 } 00196 00197 if (this->msg_queue_.dequeue (mb) == -1) 00198 return -1; 00199 00200 if (this->tracker_ != 0) 00201 this->tracker_->update_queue_count (this->msg_queue_.message_count ()); 00202 00203 method_request = dynamic_cast<TAO_Notify_Method_Request_Queueable*>(mb); 00204 00205 if (method_request == 0) 00206 return -1; 00207 00208 --this->global_queue_length_; 00209 local_not_full_.signal(); 00210 global_not_full_.signal(); 00211 00212 return 1; 00213 }
bool TAO_Notify_Buffering_Strategy::discard | ( | TAO_Notify_Method_Request_Queueable * | method_request | ) | [private] |
Discard as per the Discard Policy.
Definition at line 259 of file Buffering_Strategy.cpp.
References ACE_DEBUG, CosNotification::AnyOrder, CosNotification::DeadlineOrder, ACE_Message_Queue<>::dequeue_deadline(), ACE_Message_Queue<>::dequeue_head(), ACE_Message_Queue<>::dequeue_prio(), discard_policy_, CosNotification::FifoOrder, CosNotification::LifoOrder, LM_DEBUG, ACE_Message_Block::msg_priority(), msg_queue_, CosNotification::PriorityOrder, ACE_Message_Block::release(), and TAO_debug_level.
Referenced by enqueue().
00260 { 00261 if (this->shutdown_) 00262 { 00263 return false; 00264 } 00265 00266 ACE_Message_Block* mb = 0; 00267 int result = -1; 00268 00269 if (this->discard_policy_.is_valid() == 0 || 00270 this->discard_policy_ == CosNotification::AnyOrder || 00271 this->discard_policy_ == CosNotification::FifoOrder) 00272 { 00273 result = this->msg_queue_.dequeue_head (mb); 00274 } 00275 else if (this->discard_policy_ == CosNotification::LifoOrder) 00276 { 00277 // The most current message is NOT the newest one in the queue. It's 00278 // the one we're about to add to the queue. 00279 result = -1; 00280 } 00281 else if (this->discard_policy_ == CosNotification::DeadlineOrder) 00282 { 00283 result = this->msg_queue_.dequeue_deadline (mb); 00284 } 00285 else if (this->discard_policy_ == CosNotification::PriorityOrder) 00286 { 00287 result = this->msg_queue_.dequeue_prio (mb); 00288 if (mb->msg_priority() >= method_request->msg_priority()) 00289 { 00290 this->msg_queue_.enqueue_prio (mb); 00291 result = -1; 00292 } 00293 } 00294 else 00295 { 00296 if (TAO_debug_level > 0) 00297 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid discard policy\n")); 00298 result = this->msg_queue_.dequeue_head (mb); 00299 } 00300 00301 if (result != -1) 00302 { 00303 ACE_Message_Block::release (mb); 00304 return true; 00305 } 00306 00307 return false; 00308 }
int TAO_Notify_Buffering_Strategy::enqueue | ( | TAO_Notify_Method_Request_Queueable * | method_request | ) |
Enqueue according the enqueing strategy. Return -1 on error else the number of items in the queue.
Definition at line 102 of file Buffering_Strategy.cpp.
References ACE_DEBUG, ACE_GUARD_RETURN, blocking_policy_, discard(), ACE_OS::gettimeofday(), global_not_full_, global_queue_length_, TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, local_not_empty_, local_not_full_, max_events_per_consumer_, max_queue_length_, ACE_Message_Queue<>::message_count(), msg_queue_, TAO_SYNCH_MUTEX, tracker_, TAO_Notify_Buffering_Strategy::Tracker::update_queue_count(), and TAO_Notify_PropertyBase_T< TYPE >::value().
00103 { 00104 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1); 00105 00106 if (this->shutdown_) 00107 return -1; 00108 00109 bool discarded_existing = false; 00110 00111 bool local_overflow = this->max_events_per_consumer_.is_valid() && 00112 static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value(); 00113 00114 bool global_overflow = this->max_queue_length_.value () != 0 && 00115 this->global_queue_length_ >= this->max_queue_length_.value (); 00116 00117 while (local_overflow || global_overflow) 00118 { 00119 if (blocking_policy_.is_valid()) 00120 { 00121 ACE_Time_Value timeout; 00122 ORBSVCS_Time::TimeT_to_Time_Value(timeout, blocking_policy_.value()); 00123 // Condition variables take an absolute time 00124 timeout += ACE_OS::gettimeofday(); 00125 if (local_overflow) 00126 { 00127 local_not_full_.wait(&timeout); 00128 } 00129 else 00130 { 00131 global_not_full_.wait(&timeout); 00132 } 00133 if (errno != ETIME) 00134 { 00135 local_overflow = 00136 this->max_events_per_consumer_.is_valid() && 00137 static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value(); 00138 global_overflow = 00139 this->max_queue_length_.value () != 0 && 00140 this->global_queue_length_ >= this->max_queue_length_.value (); 00141 continue; 00142 } 00143 } 00144 00145 discarded_existing = this->discard(method_request); 00146 if (discarded_existing) 00147 { 00148 --this->global_queue_length_; 00149 local_not_full_.signal(); 00150 global_not_full_.signal(); 00151 } 00152 break; 00153 } 00154 00155 if (! (local_overflow || global_overflow) || discarded_existing) 00156 { 00157 if (this->queue (method_request) == -1) 00158 { 00159 ACE_DEBUG((LM_DEBUG, 00160 "Notify (%P|%t) - Panic! failed to enqueue event\n")); 00161 return -1; 00162 } 00163 00164 ++this->global_queue_length_; 00165 00166 local_not_empty_.signal (); 00167 } 00168 00169 size_t count = this->msg_queue_.message_count (); 00170 if (this->tracker_ != 0) 00171 this->tracker_->update_queue_count (count); 00172 00173 return ACE_Utils::truncate_cast<int> (count); 00174 }
ACE_Time_Value TAO_Notify_Buffering_Strategy::oldest_event | ( | void | ) |
Provide the time value of the oldest event in the queue.
Definition at line 74 of file Buffering_Strategy.cpp.
References ACE_GUARD_RETURN, ACE_Time_Value::max_time, and TAO_SYNCH_MUTEX.
00075 { 00076 ACE_Time_Value tv (ACE_Time_Value::max_time); 00077 ACE_Message_Block* mb = 0; 00078 00079 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, tv); 00080 TAO_Notify_Message_Queue::ITERATOR itr (this->msg_queue_); 00081 while(itr.next (mb)) 00082 { 00083 TAO_Notify_Method_Request_Queueable* event = 00084 dynamic_cast<TAO_Notify_Method_Request_Queueable*> (mb); 00085 if (event != 0) 00086 { 00087 const ACE_Time_Value& etime = event->creation_time (); 00088 if (etime < tv) 00089 tv = etime; 00090 } 00091 itr.advance (); 00092 } 00093 00094 return tv; 00095 }
int TAO_Notify_Buffering_Strategy::queue | ( | TAO_Notify_Method_Request_Queueable * | method_request | ) | [private] |
Apply the Order Policy and queue. return -1 on error.
Definition at line 223 of file Buffering_Strategy.cpp.
References ACE_DEBUG, CosNotification::AnyOrder, CosNotification::DeadlineOrder, ACE_Message_Queue<>::enqueue_deadline(), ACE_Message_Queue<>::enqueue_prio(), ACE_Message_Queue<>::enqueue_tail(), CosNotification::FifoOrder, LM_DEBUG, msg_queue_, order_policy_, CosNotification::PriorityOrder, TAO_debug_level, and TAO_Notify_PropertyBase_T< TYPE >::value().
00224 { 00225 if ( this->shutdown_ ) 00226 return -1; 00227 00228 CORBA::Short order = this->order_policy_.value(); 00229 00230 if (! this->order_policy_.is_valid() || 00231 order == CosNotification::AnyOrder || 00232 order == CosNotification::FifoOrder) 00233 { 00234 if (TAO_debug_level > 0) 00235 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in fifo order\n")); 00236 return this->msg_queue_.enqueue_tail (method_request); 00237 } 00238 00239 if (order == CosNotification::PriorityOrder) 00240 { 00241 if (TAO_debug_level > 0) 00242 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in priority order\n")); 00243 return this->msg_queue_.enqueue_prio (method_request); 00244 } 00245 00246 if (order == CosNotification::DeadlineOrder) 00247 { 00248 if (TAO_debug_level > 0) 00249 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in deadline order\n")); 00250 return this->msg_queue_.enqueue_deadline (method_request); 00251 } 00252 00253 if (TAO_debug_level > 0) 00254 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n")); 00255 return this->msg_queue_.enqueue_tail (method_request); 00256 }
void TAO_Notify_Buffering_Strategy::set_tracker | ( | Tracker * | tracker | ) |
Set the tracker object. This strategy does not own the tracker.
Definition at line 216 of file Buffering_Strategy.cpp.
References tracker_.
00218 { 00219 this->tracker_ = tracker; 00220 }
void TAO_Notify_Buffering_Strategy::shutdown | ( | void | ) |
Shutdown.
Definition at line 57 of file Buffering_Strategy.cpp.
References ACE_GUARD, global_not_full_, local_not_empty_, local_not_full_, shutdown_, and TAO_SYNCH_MUTEX.
00058 { 00059 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_); 00060 00061 if (this->shutdown_) 00062 { 00063 return; 00064 } 00065 00066 this->shutdown_ = true; 00067 00068 this->local_not_empty_.broadcast (); 00069 this->global_not_full_.broadcast(); 00070 this->local_not_full_.broadcast(); 00071 }
void TAO_Notify_Buffering_Strategy::update_qos_properties | ( | const TAO_Notify_QoSProperties & | qos_properties | ) |
Update state with the following QoS Properties: Order Policy Discard Policy MaxEventsPerConsumer
Definition at line 48 of file Buffering_Strategy.cpp.
00049 { 00050 this->order_policy_.set (qos_properties); 00051 this->discard_policy_.set (qos_properties); 00052 this->max_events_per_consumer_.set(qos_properties); 00053 this->blocking_policy_.set (qos_properties); 00054 }
Policy to discard when buffers are full.
Definition at line 115 of file Buffering_Strategy.h.
Referenced by discard().
Definition at line 120 of file Buffering_Strategy.h.
Referenced by dequeue(), enqueue(), and shutdown().
The global queue length - queue length accross all the queues.
Definition at line 106 of file Buffering_Strategy.h.
Condition that batch size reached.
Definition at line 124 of file Buffering_Strategy.h.
Referenced by enqueue(), and shutdown().
Definition at line 121 of file Buffering_Strategy.h.
Referenced by dequeue(), enqueue(), and shutdown().
const TAO_Notify_Property_Long& TAO_Notify_Buffering_Strategy::max_queue_length_ [private] |
The maximum events that can be queued overall.
Definition at line 109 of file Buffering_Strategy.h.
Referenced by enqueue().
Order of events in internal buffers.
Definition at line 112 of file Buffering_Strategy.h.
Referenced by queue().
bool TAO_Notify_Buffering_Strategy::shutdown_ [private] |
Tracker* TAO_Notify_Buffering_Strategy::tracker_ [private] |
Optional queue tracker.
Definition at line 130 of file Buffering_Strategy.h.
Referenced by dequeue(), enqueue(), and set_tracker().