Go to the documentation of this file.00001
00002
00003 #include "orbsvcs/Notify/Buffering_Strategy.h"
00004
00005 ACE_RCSID (Notify, Buffering_Strategy, "$Id: Buffering_Strategy.cpp 87253 2009-10-28 23:29:32Z dai_y $")
00006
00007
00008 #include "orbsvcs/Notify/Method_Request.h"
00009 #include "orbsvcs/Notify/Notify_Extensions.h"
00010 #include "orbsvcs/Notify/QoSProperties.h"
00011 #include "orbsvcs/Notify/Notify_Extensions.h"
00012
00013 #include "orbsvcs/CosNotificationC.h"
00014 #include "orbsvcs/Time_Utilities.h"
00015
00016 #include "tao/debug.h"
00017
00018 #include "ace/Message_Queue.h"
00019
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021
00022 TAO_Notify_Buffering_Strategy::TAO_Notify_Buffering_Strategy (
00023 TAO_Notify_Message_Queue& msg_queue,
00024 const TAO_Notify_AdminProperties::Ptr& admin_properties)
00025 : msg_queue_ (msg_queue)
00026 , admin_properties_ (admin_properties)
00027 , global_queue_lock_ (admin_properties->global_queue_lock ())
00028 , global_queue_length_ (admin_properties->global_queue_length ())
00029 , max_queue_length_ (admin_properties->max_global_queue_length ())
00030 , order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder)
00031 , discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder)
00032 , max_events_per_consumer_ (CosNotification::MaxEventsPerConsumer)
00033 , blocking_policy_ (TAO_Notify_Extensions::BlockingPolicy)
00034 , global_not_full_ (admin_properties->global_queue_not_full())
00035 , local_not_full_ (global_queue_lock_)
00036 , local_not_empty_ (global_queue_lock_)
00037 , shutdown_ (false)
00038 , tracker_ (0)
00039 {
00040 }
00041
00042 TAO_Notify_Buffering_Strategy::~TAO_Notify_Buffering_Strategy ()
00043 {
00044 }
00045
00046 void
00047 TAO_Notify_Buffering_Strategy::update_qos_properties
00048 (const TAO_Notify_QoSProperties& qos_properties)
00049 {
00050 this->order_policy_.set (qos_properties);
00051 this->discard_policy_.set (qos_properties);
00052 this->max_events_per_consumer_.set(qos_properties);
00053 this->blocking_policy_.set (qos_properties);
00054 }
00055
00056 void
00057 TAO_Notify_Buffering_Strategy::shutdown (void)
00058 {
00059 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_);
00060
00061 if (this->shutdown_)
00062 {
00063 return;
00064 }
00065
00066 this->shutdown_ = true;
00067
00068 this->local_not_empty_.broadcast ();
00069 this->global_not_full_.broadcast();
00070 this->local_not_full_.broadcast();
00071 }
00072
00073 ACE_Time_Value
00074 TAO_Notify_Buffering_Strategy::oldest_event (void)
00075 {
00076 ACE_Time_Value tv (ACE_Time_Value::max_time);
00077 ACE_Message_Block* mb = 0;
00078
00079 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, tv);
00080 TAO_Notify_Message_Queue::ITERATOR itr (this->msg_queue_);
00081 while(itr.next (mb))
00082 {
00083 TAO_Notify_Method_Request_Queueable* event =
00084 dynamic_cast<TAO_Notify_Method_Request_Queueable*> (mb);
00085 if (event != 0)
00086 {
00087 const ACE_Time_Value& etime = event->creation_time ();
00088 if (etime < tv)
00089 tv = etime;
00090 }
00091 itr.advance ();
00092 }
00093
00094 return tv;
00095 }
00096
00097
00098 TAO_Notify_Buffering_Strategy::Tracker::Tracker (void)
00099 : child_ (0)
00100 {
00101 }
00102
00103
00104 TAO_Notify_Buffering_Strategy::Tracker::~Tracker (void)
00105 {
00106 }
00107
00108
00109 void
00110 TAO_Notify_Buffering_Strategy::Tracker::register_child (TAO_Notify_Buffering_Strategy::Tracker * child)
00111 {
00112 if (this->child_ == 0)
00113 {
00114 this->child_ = child;
00115 }
00116 else if (this->child_ != child)
00117 {
00118 this->child_->register_child (child);
00119 }
00120
00121 }
00122
00123 void
00124 TAO_Notify_Buffering_Strategy::Tracker::unregister_child (TAO_Notify_Buffering_Strategy::Tracker * child)
00125 {
00126 if (this->child_ == child)
00127 {
00128 this->child_ = this->child_->child_;
00129 }
00130 else if (this->child_ != 0)
00131 {
00132 this->child_->unregister_child (child);
00133 }
00134 }
00135
00136
00137 int
00138 TAO_Notify_Buffering_Strategy::enqueue (TAO_Notify_Method_Request_Queueable* method_request)
00139 {
00140 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
00141
00142 if (this->shutdown_)
00143 return -1;
00144
00145 bool discarded_existing = false;
00146
00147 bool local_overflow = this->max_events_per_consumer_.is_valid() &&
00148 static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
00149
00150 bool global_overflow = this->max_queue_length_.value () != 0 &&
00151 this->global_queue_length_ >= this->max_queue_length_.value ();
00152
00153 while (local_overflow || global_overflow)
00154 {
00155 if (blocking_policy_.is_valid())
00156 {
00157 ACE_Time_Value timeout;
00158 ORBSVCS_Time::TimeT_to_Time_Value(timeout, blocking_policy_.value());
00159
00160 timeout += ACE_OS::gettimeofday();
00161 if (local_overflow)
00162 {
00163 local_not_full_.wait(&timeout);
00164 }
00165 else
00166 {
00167 global_not_full_.wait(&timeout);
00168 }
00169 if (errno != ETIME)
00170 {
00171 local_overflow =
00172 this->max_events_per_consumer_.is_valid() &&
00173 static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
00174 global_overflow =
00175 this->max_queue_length_.value () != 0 &&
00176 this->global_queue_length_ >= this->max_queue_length_.value ();
00177 continue;
00178 }
00179 }
00180 if (tracker_ != 0)
00181 {
00182 tracker_->count_queue_overflow (local_overflow, global_overflow);
00183 }
00184
00185 discarded_existing = this->discard(method_request);
00186 if (discarded_existing)
00187 {
00188 --this->global_queue_length_;
00189 local_not_full_.signal();
00190 global_not_full_.signal();
00191 }
00192 break;
00193 }
00194
00195 if (! (local_overflow || global_overflow) || discarded_existing)
00196 {
00197 if (this->queue (method_request) == -1)
00198 {
00199 ACE_DEBUG((LM_DEBUG,
00200 "Notify (%P|%t) - Panic! failed to enqueue event\n"));
00201 return -1;
00202 }
00203
00204 ++this->global_queue_length_;
00205
00206 local_not_empty_.signal ();
00207 }
00208 else
00209 {
00210 ACE_DEBUG((LM_DEBUG,
00211 "Notify (%P|%t) - Panic! did not attempt to enqueue event\n"));
00212 return -1;
00213 }
00214
00215 size_t count = this->msg_queue_.message_count ();
00216 if (this->tracker_ != 0)
00217 {
00218 this->tracker_->update_queue_count (count);
00219 }
00220
00221 return ACE_Utils::truncate_cast<int> (count);
00222 }
00223
00224 int
00225 TAO_Notify_Buffering_Strategy::dequeue (TAO_Notify_Method_Request_Queueable* &method_request, const ACE_Time_Value *abstime)
00226 {
00227 ACE_Message_Block *mb;
00228
00229 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
00230
00231 if ( this->shutdown_ )
00232 return -1;
00233
00234 while (this->msg_queue_.message_count () == 0)
00235 {
00236 this->local_not_empty_.wait (abstime);
00237
00238 if (this->shutdown_)
00239 return -1;
00240
00241 if (errno == ETIME)
00242 return 0;
00243 }
00244
00245 if (this->msg_queue_.dequeue (mb) == -1)
00246 return -1;
00247
00248 if (this->tracker_ != 0)
00249 {
00250 this->tracker_->update_queue_count (this->msg_queue_.message_count ());
00251 }
00252
00253 method_request = dynamic_cast<TAO_Notify_Method_Request_Queueable*>(mb);
00254
00255 if (method_request == 0)
00256 return -1;
00257
00258 --this->global_queue_length_;
00259 local_not_full_.signal();
00260 global_not_full_.signal();
00261
00262 return 1;
00263 }
00264
00265 void
00266 TAO_Notify_Buffering_Strategy::set_tracker (
00267 TAO_Notify_Buffering_Strategy::Tracker* tracker)
00268 {
00269 if (this->tracker_ == 0)
00270 {
00271 this->tracker_ = tracker;
00272 }
00273 else if (this->tracker_ != tracker)
00274 {
00275 this->tracker_->register_child (tracker);
00276 }
00277 }
00278
00279 int
00280 TAO_Notify_Buffering_Strategy::queue (TAO_Notify_Method_Request_Queueable* method_request)
00281 {
00282 if ( this->shutdown_ )
00283 return -1;
00284
00285 CORBA::Short order = this->order_policy_.value();
00286
00287 if (! this->order_policy_.is_valid() ||
00288 order == CosNotification::AnyOrder ||
00289 order == CosNotification::FifoOrder)
00290 {
00291 if (TAO_debug_level > 0)
00292 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in fifo order\n"));
00293 return this->msg_queue_.enqueue_tail (method_request);
00294 }
00295
00296 if (order == CosNotification::PriorityOrder)
00297 {
00298 if (TAO_debug_level > 0)
00299 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in priority order\n"));
00300 return this->msg_queue_.enqueue_prio (method_request);
00301 }
00302
00303 if (order == CosNotification::DeadlineOrder)
00304 {
00305 if (TAO_debug_level > 0)
00306 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in deadline order\n"));
00307 return this->msg_queue_.enqueue_deadline (method_request);
00308 }
00309
00310 if (TAO_debug_level > 0)
00311 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n"));
00312 return this->msg_queue_.enqueue_tail (method_request);
00313 }
00314
00315 bool
00316 TAO_Notify_Buffering_Strategy::discard (TAO_Notify_Method_Request_Queueable* method_request)
00317 {
00318 if (this->shutdown_)
00319 {
00320 return false;
00321 }
00322
00323 ACE_Message_Block* mb = 0;
00324 int result = -1;
00325
00326 if (this->discard_policy_.is_valid() == 0 ||
00327 this->discard_policy_ == CosNotification::AnyOrder ||
00328 this->discard_policy_ == CosNotification::FifoOrder)
00329 {
00330 result = this->msg_queue_.dequeue_head (mb);
00331 }
00332 else if (this->discard_policy_ == CosNotification::LifoOrder)
00333 {
00334
00335
00336 result = -1;
00337 }
00338 else if (this->discard_policy_ == CosNotification::DeadlineOrder)
00339 {
00340 result = this->msg_queue_.dequeue_deadline (mb);
00341 }
00342 else if (this->discard_policy_ == CosNotification::PriorityOrder)
00343 {
00344 result = this->msg_queue_.dequeue_prio (mb);
00345 if (mb->msg_priority() >= method_request->msg_priority())
00346 {
00347 this->msg_queue_.enqueue_prio (mb);
00348 result = -1;
00349 }
00350 }
00351 else
00352 {
00353 if (TAO_debug_level > 0)
00354 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid discard policy\n"));
00355 result = this->msg_queue_.dequeue_head (mb);
00356 }
00357
00358 if (result != -1)
00359 {
00360 ACE_Message_Block::release (mb);
00361 return true;
00362 }
00363
00364 return false;
00365 }
00366
00367
00368
00369 TAO_END_VERSIONED_NAMESPACE_DECL