00001
00002
00003 #include "orbsvcs/Notify/Buffering_Strategy.h"
00004
00005 ACE_RCSID (Notify, Buffering_Strategy, "$Id: Buffering_Strategy.cpp 81070 2008-03-24 12:30:52Z elliott_c $")
00006
00007
00008 #include "orbsvcs/Notify/Method_Request.h"
00009 #include "orbsvcs/Notify/Notify_Extensions.h"
00010 #include "orbsvcs/Notify/QoSProperties.h"
00011 #include "orbsvcs/Notify/Notify_Extensions.h"
00012
00013 #include "orbsvcs/CosNotificationC.h"
00014 #include "orbsvcs/Time_Utilities.h"
00015
00016 #include "tao/debug.h"
00017
00018 #include "ace/Message_Queue.h"
00019
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021
00022 TAO_Notify_Buffering_Strategy::TAO_Notify_Buffering_Strategy (
00023 TAO_Notify_Message_Queue& msg_queue,
00024 const TAO_Notify_AdminProperties::Ptr& admin_properties)
00025 : msg_queue_ (msg_queue)
00026 , admin_properties_ (admin_properties)
00027 , global_queue_lock_ (admin_properties->global_queue_lock ())
00028 , global_queue_length_ (admin_properties->global_queue_length ())
00029 , max_queue_length_ (admin_properties->max_global_queue_length ())
00030 , order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder)
00031 , discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder)
00032 , max_events_per_consumer_ (CosNotification::MaxEventsPerConsumer)
00033 , blocking_policy_ (TAO_Notify_Extensions::BlockingPolicy)
00034 , global_not_full_ (admin_properties->global_queue_not_full())
00035 , local_not_full_ (global_queue_lock_)
00036 , local_not_empty_ (global_queue_lock_)
00037 , shutdown_ (false)
00038 , tracker_ (0)
00039 {
00040 }
00041
00042 TAO_Notify_Buffering_Strategy::~TAO_Notify_Buffering_Strategy ()
00043 {
00044 }
00045
00046 void
00047 TAO_Notify_Buffering_Strategy::update_qos_properties
00048 (const TAO_Notify_QoSProperties& qos_properties)
00049 {
00050 this->order_policy_.set (qos_properties);
00051 this->discard_policy_.set (qos_properties);
00052 this->max_events_per_consumer_.set(qos_properties);
00053 this->blocking_policy_.set (qos_properties);
00054 }
00055
00056 void
00057 TAO_Notify_Buffering_Strategy::shutdown (void)
00058 {
00059 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_);
00060
00061 if (this->shutdown_)
00062 {
00063 return;
00064 }
00065
00066 this->shutdown_ = true;
00067
00068 this->local_not_empty_.broadcast ();
00069 this->global_not_full_.broadcast();
00070 this->local_not_full_.broadcast();
00071 }
00072
00073 ACE_Time_Value
00074 TAO_Notify_Buffering_Strategy::oldest_event (void)
00075 {
00076 ACE_Time_Value tv (ACE_Time_Value::max_time);
00077 ACE_Message_Block* mb = 0;
00078
00079 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, tv);
00080 TAO_Notify_Message_Queue::ITERATOR itr (this->msg_queue_);
00081 while(itr.next (mb))
00082 {
00083 TAO_Notify_Method_Request_Queueable* event =
00084 dynamic_cast<TAO_Notify_Method_Request_Queueable*> (mb);
00085 if (event != 0)
00086 {
00087 const ACE_Time_Value& etime = event->creation_time ();
00088 if (etime < tv)
00089 tv = etime;
00090 }
00091 itr.advance ();
00092 }
00093
00094 return tv;
00095 }
00096
00097 TAO_Notify_Buffering_Strategy::Tracker::~Tracker (void)
00098 {
00099 }
00100
00101 int
00102 TAO_Notify_Buffering_Strategy::enqueue (TAO_Notify_Method_Request_Queueable* method_request)
00103 {
00104 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
00105
00106 if (this->shutdown_)
00107 return -1;
00108
00109 bool discarded_existing = false;
00110
00111 bool local_overflow = this->max_events_per_consumer_.is_valid() &&
00112 static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
00113
00114 bool global_overflow = this->max_queue_length_.value () != 0 &&
00115 this->global_queue_length_ >= this->max_queue_length_.value ();
00116
00117 while (local_overflow || global_overflow)
00118 {
00119 if (blocking_policy_.is_valid())
00120 {
00121 ACE_Time_Value timeout;
00122 ORBSVCS_Time::TimeT_to_Time_Value(timeout, blocking_policy_.value());
00123
00124 timeout += ACE_OS::gettimeofday();
00125 if (local_overflow)
00126 {
00127 local_not_full_.wait(&timeout);
00128 }
00129 else
00130 {
00131 global_not_full_.wait(&timeout);
00132 }
00133 if (errno != ETIME)
00134 {
00135 local_overflow =
00136 this->max_events_per_consumer_.is_valid() &&
00137 static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
00138 global_overflow =
00139 this->max_queue_length_.value () != 0 &&
00140 this->global_queue_length_ >= this->max_queue_length_.value ();
00141 continue;
00142 }
00143 }
00144
00145 discarded_existing = this->discard(method_request);
00146 if (discarded_existing)
00147 {
00148 --this->global_queue_length_;
00149 local_not_full_.signal();
00150 global_not_full_.signal();
00151 }
00152 break;
00153 }
00154
00155 if (! (local_overflow || global_overflow) || discarded_existing)
00156 {
00157 if (this->queue (method_request) == -1)
00158 {
00159 ACE_DEBUG((LM_DEBUG,
00160 "Notify (%P|%t) - Panic! failed to enqueue event\n"));
00161 return -1;
00162 }
00163
00164 ++this->global_queue_length_;
00165
00166 local_not_empty_.signal ();
00167 }
00168
00169 size_t count = this->msg_queue_.message_count ();
00170 if (this->tracker_ != 0)
00171 this->tracker_->update_queue_count (count);
00172
00173 return ACE_Utils::truncate_cast<int> (count);
00174 }
00175
00176 int
00177 TAO_Notify_Buffering_Strategy::dequeue (TAO_Notify_Method_Request_Queueable* &method_request, const ACE_Time_Value *abstime)
00178 {
00179 ACE_Message_Block *mb;
00180
00181 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
00182
00183 if ( this->shutdown_ )
00184 return -1;
00185
00186 while (this->msg_queue_.message_count () == 0)
00187 {
00188 this->local_not_empty_.wait (abstime);
00189
00190 if (this->shutdown_)
00191 return -1;
00192
00193 if (errno == ETIME)
00194 return 0;
00195 }
00196
00197 if (this->msg_queue_.dequeue (mb) == -1)
00198 return -1;
00199
00200 if (this->tracker_ != 0)
00201 this->tracker_->update_queue_count (this->msg_queue_.message_count ());
00202
00203 method_request = dynamic_cast<TAO_Notify_Method_Request_Queueable*>(mb);
00204
00205 if (method_request == 0)
00206 return -1;
00207
00208 --this->global_queue_length_;
00209 local_not_full_.signal();
00210 global_not_full_.signal();
00211
00212 return 1;
00213 }
00214
00215 void
00216 TAO_Notify_Buffering_Strategy::set_tracker (
00217 TAO_Notify_Buffering_Strategy::Tracker* tracker)
00218 {
00219 this->tracker_ = tracker;
00220 }
00221
00222 int
00223 TAO_Notify_Buffering_Strategy::queue (TAO_Notify_Method_Request_Queueable* method_request)
00224 {
00225 if ( this->shutdown_ )
00226 return -1;
00227
00228 CORBA::Short order = this->order_policy_.value();
00229
00230 if (! this->order_policy_.is_valid() ||
00231 order == CosNotification::AnyOrder ||
00232 order == CosNotification::FifoOrder)
00233 {
00234 if (TAO_debug_level > 0)
00235 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in fifo order\n"));
00236 return this->msg_queue_.enqueue_tail (method_request);
00237 }
00238
00239 if (order == CosNotification::PriorityOrder)
00240 {
00241 if (TAO_debug_level > 0)
00242 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in priority order\n"));
00243 return this->msg_queue_.enqueue_prio (method_request);
00244 }
00245
00246 if (order == CosNotification::DeadlineOrder)
00247 {
00248 if (TAO_debug_level > 0)
00249 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in deadline order\n"));
00250 return this->msg_queue_.enqueue_deadline (method_request);
00251 }
00252
00253 if (TAO_debug_level > 0)
00254 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n"));
00255 return this->msg_queue_.enqueue_tail (method_request);
00256 }
00257
00258 bool
00259 TAO_Notify_Buffering_Strategy::discard (TAO_Notify_Method_Request_Queueable* method_request)
00260 {
00261 if (this->shutdown_)
00262 {
00263 return false;
00264 }
00265
00266 ACE_Message_Block* mb = 0;
00267 int result = -1;
00268
00269 if (this->discard_policy_.is_valid() == 0 ||
00270 this->discard_policy_ == CosNotification::AnyOrder ||
00271 this->discard_policy_ == CosNotification::FifoOrder)
00272 {
00273 result = this->msg_queue_.dequeue_head (mb);
00274 }
00275 else if (this->discard_policy_ == CosNotification::LifoOrder)
00276 {
00277
00278
00279 result = -1;
00280 }
00281 else if (this->discard_policy_ == CosNotification::DeadlineOrder)
00282 {
00283 result = this->msg_queue_.dequeue_deadline (mb);
00284 }
00285 else if (this->discard_policy_ == CosNotification::PriorityOrder)
00286 {
00287 result = this->msg_queue_.dequeue_prio (mb);
00288 if (mb->msg_priority() >= method_request->msg_priority())
00289 {
00290 this->msg_queue_.enqueue_prio (mb);
00291 result = -1;
00292 }
00293 }
00294 else
00295 {
00296 if (TAO_debug_level > 0)
00297 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid discard policy\n"));
00298 result = this->msg_queue_.dequeue_head (mb);
00299 }
00300
00301 if (result != -1)
00302 {
00303 ACE_Message_Block::release (mb);
00304 return true;
00305 }
00306
00307 return false;
00308 }
00309
00310 TAO_END_VERSIONED_NAMESPACE_DECL