#include <Buffering_Strategy.h>
Collaboration diagram for TAO_Notify_Buffering_Strategy:

Definition at line 45 of file Buffering_Strategy.h.
|
||||||||||||
|
Definition at line 22 of file Buffering_Strategy.cpp. References TAO_Notify_AdminProperties::Ptr, and TAO_Notify_Message_Queue.
00025 : msg_queue_ (msg_queue) 00026 , admin_properties_ (admin_properties) 00027 , global_queue_lock_ (admin_properties->global_queue_lock ()) 00028 , global_queue_length_ (admin_properties->global_queue_length ()) 00029 , max_queue_length_ (admin_properties->max_global_queue_length ()) 00030 , order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder) 00031 , discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder) 00032 , max_events_per_consumer_ (CosNotification::MaxEventsPerConsumer) 00033 , blocking_policy_ (TAO_Notify_Extensions::BlockingPolicy) 00034 , global_not_full_ (admin_properties->global_queue_not_full()) 00035 , local_not_full_ (global_queue_lock_) 00036 , local_not_empty_ (global_queue_lock_) 00037 , shutdown_ (false) 00038 { 00039 } |
|
|
Definition at line 41 of file Buffering_Strategy.cpp.
00042 {
00043 }
|
|
||||||||||||
|
Dequeue batch. This method will block for abstime if non-zero or else blocks till an item is available. Return -1 on error or if nothing is available, else the number of items actually dequeued (1). Definition at line 143 of file Buffering_Strategy.cpp. References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, ACE_Message_Queue<>::dequeue(), ETIME, global_not_full_, local_not_empty_, local_not_full_, ACE_Message_Queue<>::message_count(), and shutdown_.
00144 {
00145 ACE_Message_Block *mb;
00146
00147 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
00148
00149 if ( this->shutdown_ )
00150 return -1;
00151
00152 while (this->msg_queue_.message_count () == 0)
00153 {
00154 this->local_not_empty_.wait (abstime);
00155
00156 if (this->shutdown_)
00157 return -1;
00158
00159 if (errno == ETIME)
00160 return 0;
00161 }
00162
00163 if (this->msg_queue_.dequeue (mb) == -1)
00164 return -1;
00165
00166 method_request = dynamic_cast<TAO_Notify_Method_Request_Queueable*>(mb);
00167
00168 if (method_request == 0)
00169 return -1;
00170
00171 --this->global_queue_length_;
00172 local_not_full_.signal();
00173 global_not_full_.signal();
00174
00175 return 1;
00176 }
|
|
|
Discard as per the Discard Policy.
Definition at line 215 of file Buffering_Strategy.cpp. References ACE_DEBUG, ACE_Message_Queue<>::dequeue_deadline(), ACE_Message_Queue<>::dequeue_head(), ACE_Message_Queue<>::dequeue_prio(), discard_policy_, ACE_Message_Queue<>::enqueue_prio(), TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, ACE_Message_Block::msg_priority(), ACE_Message_Block::release(), shutdown_, and TAO_debug_level. Referenced by enqueue().
00216 {
00217 if (this->shutdown_)
00218 {
00219 return false;
00220 }
00221
00222 ACE_Message_Block* mb = 0;
00223 int result = -1;
00224
00225 if (this->discard_policy_.is_valid() == 0 ||
00226 this->discard_policy_ == CosNotification::AnyOrder ||
00227 this->discard_policy_ == CosNotification::FifoOrder)
00228 {
00229 result = this->msg_queue_.dequeue_head (mb);
00230 }
00231 else if (this->discard_policy_ == CosNotification::LifoOrder)
00232 {
00233 // The most current message is NOT the newest one in the queue. It's
00234 // the one we're about to add to the queue.
00235 result = -1;
00236 }
00237 else if (this->discard_policy_ == CosNotification::DeadlineOrder)
00238 {
00239 result = this->msg_queue_.dequeue_deadline (mb);
00240 }
00241 else if (this->discard_policy_ == CosNotification::PriorityOrder)
00242 {
00243 result = this->msg_queue_.dequeue_prio (mb);
00244 if (mb->msg_priority() >= method_request->msg_priority())
00245 {
00246 this->msg_queue_.enqueue_prio (mb);
00247 result = -1;
00248 }
00249 }
00250 else
00251 {
00252 if (TAO_debug_level > 0)
00253 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid discard policy\n"));
00254 result = this->msg_queue_.dequeue_head (mb);
00255 }
00256
00257 if (result != -1)
00258 {
00259 ACE_Message_Block::release (mb);
00260 return true;
00261 }
00262
00263 return false;
00264 }
|
|
|
Enqueue according the enqueing strategy. Return -1 on error else the number of items in the queue. Definition at line 73 of file Buffering_Strategy.cpp. References ACE_DEBUG, ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, blocking_policy_, discard(), ETIME, ACE_OS::gettimeofday(), global_not_full_, TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, local_not_empty_, local_not_full_, max_events_per_consumer_, max_queue_length_, ACE_Message_Queue<>::message_count(), queue(), shutdown_, and TAO_Notify_PropertyBase_T< TYPE >::value().
00074 {
00075 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
00076
00077 if (this->shutdown_)
00078 return -1;
00079
00080 bool discarded_existing = false;
00081
00082 bool local_overflow = this->max_events_per_consumer_.is_valid() &&
00083 static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
00084
00085 bool global_overflow = this->max_queue_length_.value () != 0 &&
00086 this->global_queue_length_ >= this->max_queue_length_.value ();
00087
00088 while (local_overflow || global_overflow)
00089 {
00090 if (blocking_policy_.is_valid())
00091 {
00092 ACE_Time_Value timeout;
00093 ORBSVCS_Time::TimeT_to_Time_Value(timeout, blocking_policy_.value());
00094 // Condition variables take an absolute time
00095 timeout += ACE_OS::gettimeofday();
00096 if (local_overflow)
00097 {
00098 local_not_full_.wait(&timeout);
00099 }
00100 else
00101 {
00102 global_not_full_.wait(&timeout);
00103 }
00104 if (errno != ETIME)
00105 {
00106 local_overflow =
00107 this->max_events_per_consumer_.is_valid() &&
00108 static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
00109 global_overflow =
00110 this->max_queue_length_.value () != 0 &&
00111 this->global_queue_length_ >= this->max_queue_length_.value ();
00112 continue;
00113 }
00114 }
00115
00116 discarded_existing = this->discard(method_request);
00117 if (discarded_existing)
00118 {
00119 --this->global_queue_length_;
00120 local_not_full_.signal();
00121 global_not_full_.signal();
00122 }
00123 break;
00124 }
00125
00126 if (! (local_overflow || global_overflow) || discarded_existing)
00127 {
00128 if (this->queue (method_request) == -1)
00129 {
00130 ACE_DEBUG((LM_DEBUG,
00131 "Notify (%P|%t) - Panic! failed to enqueue event\n"));
00132 return -1;
00133 }
00134
00135 ++this->global_queue_length_;
00136
00137 local_not_empty_.signal ();
00138 }
00139 return this->msg_queue_.message_count ();
00140 }
|
|
|
Apply the Order Policy and queue. return -1 on error.
Definition at line 179 of file Buffering_Strategy.cpp. References ACE_DEBUG, ACE_Message_Queue<>::enqueue_deadline(), ACE_Message_Queue<>::enqueue_prio(), ACE_Message_Queue<>::enqueue_tail(), TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, order_policy_, shutdown_, TAO_debug_level, and TAO_Notify_PropertyBase_T< TYPE >::value(). Referenced by enqueue().
00180 {
00181 if ( this->shutdown_ )
00182 return -1;
00183
00184 CORBA::Short order = this->order_policy_.value();
00185
00186 if (! this->order_policy_.is_valid() ||
00187 order == CosNotification::AnyOrder ||
00188 order == CosNotification::FifoOrder)
00189 {
00190 if (TAO_debug_level > 0)
00191 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in fifo order\n"));
00192 return this->msg_queue_.enqueue_tail (method_request);
00193 }
00194
00195 if (order == CosNotification::PriorityOrder)
00196 {
00197 if (TAO_debug_level > 0)
00198 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in priority order\n"));
00199 return this->msg_queue_.enqueue_prio (method_request);
00200 }
00201
00202 if (order == CosNotification::DeadlineOrder)
00203 {
00204 if (TAO_debug_level > 0)
00205 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in deadline order\n"));
00206 return this->msg_queue_.enqueue_deadline (method_request);
00207 }
00208
00209 if (TAO_debug_level > 0)
00210 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n"));
00211 return this->msg_queue_.enqueue_tail (method_request);
00212 }
|
|
|
Shutdown.
Definition at line 56 of file Buffering_Strategy.cpp. References ACE_GUARD, ACE_SYNCH_MUTEX, global_not_full_, local_not_empty_, local_not_full_, and shutdown_.
00057 {
00058 ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_);
00059
00060 if (this->shutdown_)
00061 {
00062 return;
00063 }
00064
00065 this->shutdown_ = true;
00066
00067 this->local_not_empty_.broadcast ();
00068 this->global_not_full_.broadcast();
00069 this->local_not_full_.broadcast();
00070 }
|
|
|
Update state with the following QoS Properties: Order Policy Discard Policy MaxEventsPerConsumer Definition at line 47 of file Buffering_Strategy.cpp.
00048 {
00049 this->order_policy_.set (qos_properties);
00050 this->discard_policy_.set (qos_properties);
00051 this->max_events_per_consumer_.set(qos_properties);
00052 this->blocking_policy_.set (qos_properties);
00053 }
|
|
|
Reference to the properties per event channel.
Definition at line 86 of file Buffering_Strategy.h. |
|
|
Definition at line 104 of file Buffering_Strategy.h. Referenced by enqueue(). |
|
|
Policy to discard when buffers are full.
Definition at line 101 of file Buffering_Strategy.h. Referenced by discard(). |
|
|
Definition at line 107 of file Buffering_Strategy.h. Referenced by dequeue(), enqueue(), and shutdown(). |
|
|
The global queue length - queue length accross all the queues.
Definition at line 92 of file Buffering_Strategy.h. |
|
|
The shared global lock used by all the queues.
Definition at line 89 of file Buffering_Strategy.h. |
|
|
Condition that batch size reached.
Definition at line 111 of file Buffering_Strategy.h. Referenced by dequeue(), enqueue(), and shutdown(). |
|
|
Definition at line 108 of file Buffering_Strategy.h. Referenced by dequeue(), enqueue(), and shutdown(). |
|
|
Definition at line 103 of file Buffering_Strategy.h. Referenced by enqueue(). |
|
|
The maximum events that can be queued overall.
Definition at line 95 of file Buffering_Strategy.h. Referenced by enqueue(). |
|
|
= Data Members The local Message Queue
Definition at line 83 of file Buffering_Strategy.h. |
|
|
Order of events in internal buffers.
Definition at line 98 of file Buffering_Strategy.h. Referenced by queue(). |
|
|
Flag to shutdown.
Definition at line 114 of file Buffering_Strategy.h. Referenced by dequeue(), discard(), enqueue(), queue(), and shutdown(). |
1.3.6