00001
00002
00003 #include "orbsvcs/Notify/Buffering_Strategy.h"
00004
00005 ACE_RCSID (Notify, Buffering_Strategy, "Buffering_Strategy.cpp,v 1.22 2006/03/14 06:14:34 jtc Exp")
00006
00007
00008 #include "orbsvcs/Notify/Method_Request.h"
00009 #include "orbsvcs/Notify/Notify_Extensions.h"
00010 #include "orbsvcs/Notify/QoSProperties.h"
00011 #include "orbsvcs/Notify/Notify_Extensions.h"
00012
00013 #include "orbsvcs/CosNotificationC.h"
00014 #include "orbsvcs/Time_Utilities.h"
00015
00016 #include "tao/debug.h"
00017
00018 #include "ace/Message_Queue.h"
00019
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021
00022 TAO_Notify_Buffering_Strategy::TAO_Notify_Buffering_Strategy (
00023 TAO_Notify_Message_Queue& msg_queue,
00024 const TAO_Notify_AdminProperties::Ptr& admin_properties)
00025 : msg_queue_ (msg_queue)
00026 , admin_properties_ (admin_properties)
00027 , global_queue_lock_ (admin_properties->global_queue_lock ())
00028 , global_queue_length_ (admin_properties->global_queue_length ())
00029 , max_queue_length_ (admin_properties->max_global_queue_length ())
00030 , order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder)
00031 , discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder)
00032 , max_events_per_consumer_ (CosNotification::MaxEventsPerConsumer)
00033 , blocking_policy_ (TAO_Notify_Extensions::BlockingPolicy)
00034 , global_not_full_ (admin_properties->global_queue_not_full())
00035 , local_not_full_ (global_queue_lock_)
00036 , local_not_empty_ (global_queue_lock_)
00037 , shutdown_ (false)
00038 {
00039 }
00040
00041 TAO_Notify_Buffering_Strategy::~TAO_Notify_Buffering_Strategy ()
00042 {
00043 }
00044
00045 void
00046 TAO_Notify_Buffering_Strategy::update_qos_properties
00047 (const TAO_Notify_QoSProperties& qos_properties)
00048 {
00049 this->order_policy_.set (qos_properties);
00050 this->discard_policy_.set (qos_properties);
00051 this->max_events_per_consumer_.set(qos_properties);
00052 this->blocking_policy_.set (qos_properties);
00053 }
00054
00055 void
00056 TAO_Notify_Buffering_Strategy::shutdown (void)
00057 {
00058 ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_);
00059
00060 if (this->shutdown_)
00061 {
00062 return;
00063 }
00064
00065 this->shutdown_ = true;
00066
00067 this->local_not_empty_.broadcast ();
00068 this->global_not_full_.broadcast();
00069 this->local_not_full_.broadcast();
00070 }
00071
00072 int
00073 TAO_Notify_Buffering_Strategy::enqueue (TAO_Notify_Method_Request_Queueable* method_request)
00074 {
00075 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
00076
00077 if (this->shutdown_)
00078 return -1;
00079
00080 bool discarded_existing = false;
00081
00082 bool local_overflow = this->max_events_per_consumer_.is_valid() &&
00083 static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
00084
00085 bool global_overflow = this->max_queue_length_.value () != 0 &&
00086 this->global_queue_length_ >= this->max_queue_length_.value ();
00087
00088 while (local_overflow || global_overflow)
00089 {
00090 if (blocking_policy_.is_valid())
00091 {
00092 ACE_Time_Value timeout;
00093 ORBSVCS_Time::TimeT_to_Time_Value(timeout, blocking_policy_.value());
00094
00095 timeout += ACE_OS::gettimeofday();
00096 if (local_overflow)
00097 {
00098 local_not_full_.wait(&timeout);
00099 }
00100 else
00101 {
00102 global_not_full_.wait(&timeout);
00103 }
00104 if (errno != ETIME)
00105 {
00106 local_overflow =
00107 this->max_events_per_consumer_.is_valid() &&
00108 static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
00109 global_overflow =
00110 this->max_queue_length_.value () != 0 &&
00111 this->global_queue_length_ >= this->max_queue_length_.value ();
00112 continue;
00113 }
00114 }
00115
00116 discarded_existing = this->discard(method_request);
00117 if (discarded_existing)
00118 {
00119 --this->global_queue_length_;
00120 local_not_full_.signal();
00121 global_not_full_.signal();
00122 }
00123 break;
00124 }
00125
00126 if (! (local_overflow || global_overflow) || discarded_existing)
00127 {
00128 if (this->queue (method_request) == -1)
00129 {
00130 ACE_DEBUG((LM_DEBUG,
00131 "Notify (%P|%t) - Panic! failed to enqueue event\n"));
00132 return -1;
00133 }
00134
00135 ++this->global_queue_length_;
00136
00137 local_not_empty_.signal ();
00138 }
00139 return this->msg_queue_.message_count ();
00140 }
00141
00142 int
00143 TAO_Notify_Buffering_Strategy::dequeue (TAO_Notify_Method_Request_Queueable* &method_request, const ACE_Time_Value *abstime)
00144 {
00145 ACE_Message_Block *mb;
00146
00147 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
00148
00149 if ( this->shutdown_ )
00150 return -1;
00151
00152 while (this->msg_queue_.message_count () == 0)
00153 {
00154 this->local_not_empty_.wait (abstime);
00155
00156 if (this->shutdown_)
00157 return -1;
00158
00159 if (errno == ETIME)
00160 return 0;
00161 }
00162
00163 if (this->msg_queue_.dequeue (mb) == -1)
00164 return -1;
00165
00166 method_request = dynamic_cast<TAO_Notify_Method_Request_Queueable*>(mb);
00167
00168 if (method_request == 0)
00169 return -1;
00170
00171 --this->global_queue_length_;
00172 local_not_full_.signal();
00173 global_not_full_.signal();
00174
00175 return 1;
00176 }
00177
00178 int
00179 TAO_Notify_Buffering_Strategy::queue (TAO_Notify_Method_Request_Queueable* method_request)
00180 {
00181 if ( this->shutdown_ )
00182 return -1;
00183
00184 CORBA::Short order = this->order_policy_.value();
00185
00186 if (! this->order_policy_.is_valid() ||
00187 order == CosNotification::AnyOrder ||
00188 order == CosNotification::FifoOrder)
00189 {
00190 if (TAO_debug_level > 0)
00191 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in fifo order\n"));
00192 return this->msg_queue_.enqueue_tail (method_request);
00193 }
00194
00195 if (order == CosNotification::PriorityOrder)
00196 {
00197 if (TAO_debug_level > 0)
00198 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in priority order\n"));
00199 return this->msg_queue_.enqueue_prio (method_request);
00200 }
00201
00202 if (order == CosNotification::DeadlineOrder)
00203 {
00204 if (TAO_debug_level > 0)
00205 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in deadline order\n"));
00206 return this->msg_queue_.enqueue_deadline (method_request);
00207 }
00208
00209 if (TAO_debug_level > 0)
00210 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n"));
00211 return this->msg_queue_.enqueue_tail (method_request);
00212 }
00213
00214 bool
00215 TAO_Notify_Buffering_Strategy::discard (TAO_Notify_Method_Request_Queueable* method_request)
00216 {
00217 if (this->shutdown_)
00218 {
00219 return false;
00220 }
00221
00222 ACE_Message_Block* mb = 0;
00223 int result = -1;
00224
00225 if (this->discard_policy_.is_valid() == 0 ||
00226 this->discard_policy_ == CosNotification::AnyOrder ||
00227 this->discard_policy_ == CosNotification::FifoOrder)
00228 {
00229 result = this->msg_queue_.dequeue_head (mb);
00230 }
00231 else if (this->discard_policy_ == CosNotification::LifoOrder)
00232 {
00233
00234
00235 result = -1;
00236 }
00237 else if (this->discard_policy_ == CosNotification::DeadlineOrder)
00238 {
00239 result = this->msg_queue_.dequeue_deadline (mb);
00240 }
00241 else if (this->discard_policy_ == CosNotification::PriorityOrder)
00242 {
00243 result = this->msg_queue_.dequeue_prio (mb);
00244 if (mb->msg_priority() >= method_request->msg_priority())
00245 {
00246 this->msg_queue_.enqueue_prio (mb);
00247 result = -1;
00248 }
00249 }
00250 else
00251 {
00252 if (TAO_debug_level > 0)
00253 ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid discard policy\n"));
00254 result = this->msg_queue_.dequeue_head (mb);
00255 }
00256
00257 if (result != -1)
00258 {
00259 ACE_Message_Block::release (mb);
00260 return true;
00261 }
00262
00263 return false;
00264 }
00265
00266 TAO_END_VERSIONED_NAMESPACE_DECL