#include <Buffering_Strategy.h>
Collaboration diagram for TAO_Notify_Buffering_Strategy:
Definition at line 45 of file Buffering_Strategy.h.
|
Definition at line 22 of file Buffering_Strategy.cpp. References TAO_Notify_AdminProperties::Ptr, and TAO_Notify_Message_Queue.
00025 : msg_queue_ (msg_queue) 00026 , admin_properties_ (admin_properties) 00027 , global_queue_lock_ (admin_properties->global_queue_lock ()) 00028 , global_queue_length_ (admin_properties->global_queue_length ()) 00029 , max_queue_length_ (admin_properties->max_global_queue_length ()) 00030 , order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder) 00031 , discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder) 00032 , max_events_per_consumer_ (CosNotification::MaxEventsPerConsumer) 00033 , blocking_policy_ (TAO_Notify_Extensions::BlockingPolicy) 00034 , global_not_full_ (admin_properties->global_queue_not_full()) 00035 , local_not_full_ (global_queue_lock_) 00036 , local_not_empty_ (global_queue_lock_) 00037 , shutdown_ (false) 00038 { 00039 } |
|
Definition at line 41 of file Buffering_Strategy.cpp.
00042 { 00043 } |
|
Dequeue batch. This method will block for abstime if non-zero or else blocks till an item is available. Return -1 on error or if nothing is available, else the number of items actually dequeued (1). Definition at line 143 of file Buffering_Strategy.cpp. References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, ACE_Message_Queue<>::dequeue(), ETIME, global_not_full_, local_not_empty_, local_not_full_, ACE_Message_Queue<>::message_count(), and shutdown_.
00144 { 00145 ACE_Message_Block *mb; 00146 00147 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1); 00148 00149 if ( this->shutdown_ ) 00150 return -1; 00151 00152 while (this->msg_queue_.message_count () == 0) 00153 { 00154 this->local_not_empty_.wait (abstime); 00155 00156 if (this->shutdown_) 00157 return -1; 00158 00159 if (errno == ETIME) 00160 return 0; 00161 } 00162 00163 if (this->msg_queue_.dequeue (mb) == -1) 00164 return -1; 00165 00166 method_request = dynamic_cast<TAO_Notify_Method_Request_Queueable*>(mb); 00167 00168 if (method_request == 0) 00169 return -1; 00170 00171 --this->global_queue_length_; 00172 local_not_full_.signal(); 00173 global_not_full_.signal(); 00174 00175 return 1; 00176 } |
|
Discard as per the Discard Policy.
Definition at line 215 of file Buffering_Strategy.cpp. References ACE_DEBUG, ACE_Message_Queue<>::dequeue_deadline(), ACE_Message_Queue<>::dequeue_head(), ACE_Message_Queue<>::dequeue_prio(), discard_policy_, ACE_Message_Queue<>::enqueue_prio(), TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, ACE_Message_Block::msg_priority(), ACE_Message_Block::release(), shutdown_, and TAO_debug_level. Referenced by enqueue().
00216 { 00217 if (this->shutdown_) 00218 { 00219 return false; 00220 } 00221 00222 ACE_Message_Block* mb = 0; 00223 int result = -1; 00224 00225 if (this->discard_policy_.is_valid() == 0 || 00226 this->discard_policy_ == CosNotification::AnyOrder || 00227 this->discard_policy_ == CosNotification::FifoOrder) 00228 { 00229 result = this->msg_queue_.dequeue_head (mb); 00230 } 00231 else if (this->discard_policy_ == CosNotification::LifoOrder) 00232 { 00233 // The most current message is NOT the newest one in the queue. It's 00234 // the one we're about to add to the queue. 00235 result = -1; 00236 } 00237 else if (this->discard_policy_ == CosNotification::DeadlineOrder) 00238 { 00239 result = this->msg_queue_.dequeue_deadline (mb); 00240 } 00241 else if (this->discard_policy_ == CosNotification::PriorityOrder) 00242 { 00243 result = this->msg_queue_.dequeue_prio (mb); 00244 if (mb->msg_priority() >= method_request->msg_priority()) 00245 { 00246 this->msg_queue_.enqueue_prio (mb); 00247 result = -1; 00248 } 00249 } 00250 else 00251 { 00252 if (TAO_debug_level > 0) 00253 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid discard policy\n")); 00254 result = this->msg_queue_.dequeue_head (mb); 00255 } 00256 00257 if (result != -1) 00258 { 00259 ACE_Message_Block::release (mb); 00260 return true; 00261 } 00262 00263 return false; 00264 } |
|
Enqueue according the enqueing strategy. Return -1 on error else the number of items in the queue. Definition at line 73 of file Buffering_Strategy.cpp. References ACE_DEBUG, ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, blocking_policy_, discard(), ETIME, ACE_OS::gettimeofday(), global_not_full_, TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, local_not_empty_, local_not_full_, max_events_per_consumer_, max_queue_length_, ACE_Message_Queue<>::message_count(), queue(), shutdown_, and TAO_Notify_PropertyBase_T< TYPE >::value().
00074 { 00075 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1); 00076 00077 if (this->shutdown_) 00078 return -1; 00079 00080 bool discarded_existing = false; 00081 00082 bool local_overflow = this->max_events_per_consumer_.is_valid() && 00083 static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value(); 00084 00085 bool global_overflow = this->max_queue_length_.value () != 0 && 00086 this->global_queue_length_ >= this->max_queue_length_.value (); 00087 00088 while (local_overflow || global_overflow) 00089 { 00090 if (blocking_policy_.is_valid()) 00091 { 00092 ACE_Time_Value timeout; 00093 ORBSVCS_Time::TimeT_to_Time_Value(timeout, blocking_policy_.value()); 00094 // Condition variables take an absolute time 00095 timeout += ACE_OS::gettimeofday(); 00096 if (local_overflow) 00097 { 00098 local_not_full_.wait(&timeout); 00099 } 00100 else 00101 { 00102 global_not_full_.wait(&timeout); 00103 } 00104 if (errno != ETIME) 00105 { 00106 local_overflow = 00107 this->max_events_per_consumer_.is_valid() && 00108 static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value(); 00109 global_overflow = 00110 this->max_queue_length_.value () != 0 && 00111 this->global_queue_length_ >= this->max_queue_length_.value (); 00112 continue; 00113 } 00114 } 00115 00116 discarded_existing = this->discard(method_request); 00117 if (discarded_existing) 00118 { 00119 --this->global_queue_length_; 00120 local_not_full_.signal(); 00121 global_not_full_.signal(); 00122 } 00123 break; 00124 } 00125 00126 if (! (local_overflow || global_overflow) || discarded_existing) 00127 { 00128 if (this->queue (method_request) == -1) 00129 { 00130 ACE_DEBUG((LM_DEBUG, 00131 "Notify (%P|%t) - Panic! failed to enqueue event\n")); 00132 return -1; 00133 } 00134 00135 ++this->global_queue_length_; 00136 00137 local_not_empty_.signal (); 00138 } 00139 return this->msg_queue_.message_count (); 00140 } |
|
Apply the Order Policy and queue. return -1 on error.
Definition at line 179 of file Buffering_Strategy.cpp. References ACE_DEBUG, ACE_Message_Queue<>::enqueue_deadline(), ACE_Message_Queue<>::enqueue_prio(), ACE_Message_Queue<>::enqueue_tail(), TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, order_policy_, shutdown_, TAO_debug_level, and TAO_Notify_PropertyBase_T< TYPE >::value(). Referenced by enqueue().
00180 { 00181 if ( this->shutdown_ ) 00182 return -1; 00183 00184 CORBA::Short order = this->order_policy_.value(); 00185 00186 if (! this->order_policy_.is_valid() || 00187 order == CosNotification::AnyOrder || 00188 order == CosNotification::FifoOrder) 00189 { 00190 if (TAO_debug_level > 0) 00191 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in fifo order\n")); 00192 return this->msg_queue_.enqueue_tail (method_request); 00193 } 00194 00195 if (order == CosNotification::PriorityOrder) 00196 { 00197 if (TAO_debug_level > 0) 00198 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in priority order\n")); 00199 return this->msg_queue_.enqueue_prio (method_request); 00200 } 00201 00202 if (order == CosNotification::DeadlineOrder) 00203 { 00204 if (TAO_debug_level > 0) 00205 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in deadline order\n")); 00206 return this->msg_queue_.enqueue_deadline (method_request); 00207 } 00208 00209 if (TAO_debug_level > 0) 00210 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n")); 00211 return this->msg_queue_.enqueue_tail (method_request); 00212 } |
|
Shutdown.
Definition at line 56 of file Buffering_Strategy.cpp. References ACE_GUARD, ACE_SYNCH_MUTEX, global_not_full_, local_not_empty_, local_not_full_, and shutdown_.
00057 { 00058 ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_); 00059 00060 if (this->shutdown_) 00061 { 00062 return; 00063 } 00064 00065 this->shutdown_ = true; 00066 00067 this->local_not_empty_.broadcast (); 00068 this->global_not_full_.broadcast(); 00069 this->local_not_full_.broadcast(); 00070 } |
|
Update state with the following QoS Properties: Order Policy Discard Policy MaxEventsPerConsumer Definition at line 47 of file Buffering_Strategy.cpp.
00048 { 00049 this->order_policy_.set (qos_properties); 00050 this->discard_policy_.set (qos_properties); 00051 this->max_events_per_consumer_.set(qos_properties); 00052 this->blocking_policy_.set (qos_properties); 00053 } |
|
Reference to the properties per event channel.
Definition at line 86 of file Buffering_Strategy.h. |
|
Definition at line 104 of file Buffering_Strategy.h. Referenced by enqueue(). |
|
Policy to discard when buffers are full.
Definition at line 101 of file Buffering_Strategy.h. Referenced by discard(). |
|
Definition at line 107 of file Buffering_Strategy.h. Referenced by dequeue(), enqueue(), and shutdown(). |
|
The global queue length - queue length accross all the queues.
Definition at line 92 of file Buffering_Strategy.h. |
|
The shared global lock used by all the queues.
Definition at line 89 of file Buffering_Strategy.h. |
|
Condition that batch size reached.
Definition at line 111 of file Buffering_Strategy.h. Referenced by dequeue(), enqueue(), and shutdown(). |
|
Definition at line 108 of file Buffering_Strategy.h. Referenced by dequeue(), enqueue(), and shutdown(). |
|
Definition at line 103 of file Buffering_Strategy.h. Referenced by enqueue(). |
|
The maximum events that can be queued overall.
Definition at line 95 of file Buffering_Strategy.h. Referenced by enqueue(). |
|
= Data Members The local Message Queue
Definition at line 83 of file Buffering_Strategy.h. |
|
Order of events in internal buffers.
Definition at line 98 of file Buffering_Strategy.h. Referenced by queue(). |
|
Flag to shutdown.
Definition at line 114 of file Buffering_Strategy.h. Referenced by dequeue(), discard(), enqueue(), queue(), and shutdown(). |