00001
00002
00003 #include "orbsvcs/Notify/Buffering_Strategy.h"
00004
00005 ACE_RCSID (Notify, Buffering_Strategy, "$Id: Buffering_Strategy.cpp 79230 2007-08-06 18:18:07Z elliott_c $")
00006
00007
00008 #include "orbsvcs/Notify/Method_Request.h"
00009 #include "orbsvcs/Notify/Notify_Extensions.h"
00010 #include "orbsvcs/Notify/QoSProperties.h"
00011 #include "orbsvcs/Notify/Notify_Extensions.h"
00012
00013 #include "orbsvcs/CosNotificationC.h"
00014 #include "orbsvcs/Time_Utilities.h"
00015
00016 #include "tao/debug.h"
00017
00018 #include "ace/Message_Queue.h"
00019
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021
00022 TAO_Notify_Buffering_Strategy::TAO_Notify_Buffering_Strategy (
00023 TAO_Notify_Message_Queue& msg_queue,
00024 const TAO_Notify_AdminProperties::Ptr& admin_properties)
00025 : msg_queue_ (msg_queue)
00026 , admin_properties_ (admin_properties)
00027 , global_queue_lock_ (admin_properties->global_queue_lock ())
00028 , global_queue_length_ (admin_properties->global_queue_length ())
00029 , max_queue_length_ (admin_properties->max_global_queue_length ())
00030 , order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder)
00031 , discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder)
00032 , max_events_per_consumer_ (CosNotification::MaxEventsPerConsumer)
00033 , blocking_policy_ (TAO_Notify_Extensions::BlockingPolicy)
00034 , global_not_full_ (admin_properties->global_queue_not_full())
00035 , local_not_full_ (global_queue_lock_)
00036 , local_not_empty_ (global_queue_lock_)
00037 , shutdown_ (false)
00038 {
00039 }
00040
00041 TAO_Notify_Buffering_Strategy::~TAO_Notify_Buffering_Strategy ()
00042 {
00043 }
00044
00045 void
00046 TAO_Notify_Buffering_Strategy::update_qos_properties
00047 (const TAO_Notify_QoSProperties& qos_properties)
00048 {
00049 this->order_policy_.set (qos_properties);
00050 this->discard_policy_.set (qos_properties);
00051 this->max_events_per_consumer_.set(qos_properties);
00052 this->blocking_policy_.set (qos_properties);
00053 }
00054
00055 void
00056 TAO_Notify_Buffering_Strategy::shutdown (void)
00057 {
00058 ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_);
00059
00060 if (this->shutdown_)
00061 {
00062 return;
00063 }
00064
00065 this->shutdown_ = true;
00066
00067 this->local_not_empty_.broadcast ();
00068 this->global_not_full_.broadcast();
00069 this->local_not_full_.broadcast();
00070 }
00071
00072 ACE_Time_Value
00073 TAO_Notify_Buffering_Strategy::oldest_event (void)
00074 {
00075 ACE_Time_Value tv (ACE_Time_Value::max_time);
00076 ACE_Message_Block* mb = 0;
00077
00078 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, tv);
00079 TAO_Notify_Message_Queue::ITERATOR itr (this->msg_queue_);
00080 while(itr.next (mb))
00081 {
00082 TAO_Notify_Method_Request_Queueable* event =
00083 dynamic_cast<TAO_Notify_Method_Request_Queueable*> (mb);
00084 if (event != 0)
00085 {
00086 const ACE_Time_Value& etime = event->creation_time ();
00087 if (etime < tv)
00088 tv = etime;
00089 }
00090 itr.advance ();
00091 }
00092
00093 return tv;
00094 }
00095
00096 int
00097 TAO_Notify_Buffering_Strategy::enqueue (TAO_Notify_Method_Request_Queueable* method_request)
00098 {
00099 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
00100
00101 if (this->shutdown_)
00102 return -1;
00103
00104 bool discarded_existing = false;
00105
00106 bool local_overflow = this->max_events_per_consumer_.is_valid() &&
00107 static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
00108
00109 bool global_overflow = this->max_queue_length_.value () != 0 &&
00110 this->global_queue_length_ >= this->max_queue_length_.value ();
00111
00112 while (local_overflow || global_overflow)
00113 {
00114 if (blocking_policy_.is_valid())
00115 {
00116 ACE_Time_Value timeout;
00117 ORBSVCS_Time::TimeT_to_Time_Value(timeout, blocking_policy_.value());
00118
00119 timeout += ACE_OS::gettimeofday();
00120 if (local_overflow)
00121 {
00122 local_not_full_.wait(&timeout);
00123 }
00124 else
00125 {
00126 global_not_full_.wait(&timeout);
00127 }
00128 if (errno != ETIME)
00129 {
00130 local_overflow =
00131 this->max_events_per_consumer_.is_valid() &&
00132 static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
00133 global_overflow =
00134 this->max_queue_length_.value () != 0 &&
00135 this->global_queue_length_ >= this->max_queue_length_.value ();
00136 continue;
00137 }
00138 }
00139
00140 discarded_existing = this->discard(method_request);
00141 if (discarded_existing)
00142 {
00143 --this->global_queue_length_;
00144 local_not_full_.signal();
00145 global_not_full_.signal();
00146 }
00147 break;
00148 }
00149
00150 if (! (local_overflow || global_overflow) || discarded_existing)
00151 {
00152 if (this->queue (method_request) == -1)
00153 {
00154 ACE_DEBUG((LM_DEBUG,
00155 "Notify (%P|%t) - Panic! failed to enqueue event\n"));
00156 return -1;
00157 }
00158
00159 ++this->global_queue_length_;
00160
00161 local_not_empty_.signal ();
00162 }
00163 return ACE_Utils::truncate_cast<int> (this->msg_queue_.message_count ());
00164 }
00165
00166 int
00167 TAO_Notify_Buffering_Strategy::dequeue (TAO_Notify_Method_Request_Queueable* &method_request, const ACE_Time_Value *abstime)
00168 {
00169 ACE_Message_Block *mb;
00170
00171 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
00172
00173 if ( this->shutdown_ )
00174 return -1;
00175
00176 while (this->msg_queue_.message_count () == 0)
00177 {
00178 this->local_not_empty_.wait (abstime);
00179
00180 if (this->shutdown_)
00181 return -1;
00182
00183 if (errno == ETIME)
00184 return 0;
00185 }
00186
00187 if (this->msg_queue_.dequeue (mb) == -1)
00188 return -1;
00189
00190 method_request = dynamic_cast<TAO_Notify_Method_Request_Queueable*>(mb);
00191
00192 if (method_request == 0)
00193 return -1;
00194
00195 --this->global_queue_length_;
00196 local_not_full_.signal();
00197 global_not_full_.signal();
00198
00199 return 1;
00200 }
00201
00202 int
00203 TAO_Notify_Buffering_Strategy::queue (TAO_Notify_Method_Request_Queueable* method_request)
00204 {
00205 if ( this->shutdown_ )
00206 return -1;
00207
00208 CORBA::Short order = this->order_policy_.value();
00209
00210 if (! this->order_policy_.is_valid() ||
00211 order == CosNotification::AnyOrder ||
00212 order == CosNotification::FifoOrder)
00213 {
00214 if (TAO_debug_level > 0)
00215 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in fifo order\n"));
00216 return this->msg_queue_.enqueue_tail (method_request);
00217 }
00218
00219 if (order == CosNotification::PriorityOrder)
00220 {
00221 if (TAO_debug_level > 0)
00222 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in priority order\n"));
00223 return this->msg_queue_.enqueue_prio (method_request);
00224 }
00225
00226 if (order == CosNotification::DeadlineOrder)
00227 {
00228 if (TAO_debug_level > 0)
00229 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in deadline order\n"));
00230 return this->msg_queue_.enqueue_deadline (method_request);
00231 }
00232
00233 if (TAO_debug_level > 0)
00234 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n"));
00235 return this->msg_queue_.enqueue_tail (method_request);
00236 }
00237
00238 bool
00239 TAO_Notify_Buffering_Strategy::discard (TAO_Notify_Method_Request_Queueable* method_request)
00240 {
00241 if (this->shutdown_)
00242 {
00243 return false;
00244 }
00245
00246 ACE_Message_Block* mb = 0;
00247 int result = -1;
00248
00249 if (this->discard_policy_.is_valid() == 0 ||
00250 this->discard_policy_ == CosNotification::AnyOrder ||
00251 this->discard_policy_ == CosNotification::FifoOrder)
00252 {
00253 result = this->msg_queue_.dequeue_head (mb);
00254 }
00255 else if (this->discard_policy_ == CosNotification::LifoOrder)
00256 {
00257
00258
00259 result = -1;
00260 }
00261 else if (this->discard_policy_ == CosNotification::DeadlineOrder)
00262 {
00263 result = this->msg_queue_.dequeue_deadline (mb);
00264 }
00265 else if (this->discard_policy_ == CosNotification::PriorityOrder)
00266 {
00267 result = this->msg_queue_.dequeue_prio (mb);
00268 if (mb->msg_priority() >= method_request->msg_priority())
00269 {
00270 this->msg_queue_.enqueue_prio (mb);
00271 result = -1;
00272 }
00273 }
00274 else
00275 {
00276 if (TAO_debug_level > 0)
00277 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid discard policy\n"));
00278 result = this->msg_queue_.dequeue_head (mb);
00279 }
00280
00281 if (result != -1)
00282 {
00283 ACE_Message_Block::release (mb);
00284 return true;
00285 }
00286
00287 return false;
00288 }
00289
00290 TAO_END_VERSIONED_NAMESPACE_DECL