Buffering_Strategy.cpp

Go to the documentation of this file.
00001 // Buffering_Strategy.cpp,v 1.22 2006/03/14 06:14:34 jtc Exp
00002 
00003 #include "orbsvcs/Notify/Buffering_Strategy.h"
00004 
00005 ACE_RCSID (Notify, Buffering_Strategy, "Buffering_Strategy.cpp,v 1.22 2006/03/14 06:14:34 jtc Exp")
00006 
00007 
00008 #include "orbsvcs/Notify/Method_Request.h"
00009 #include "orbsvcs/Notify/Notify_Extensions.h"
00010 #include "orbsvcs/Notify/QoSProperties.h"
00011 #include "orbsvcs/Notify/Notify_Extensions.h"
00012 
00013 #include "orbsvcs/CosNotificationC.h"
00014 #include "orbsvcs/Time_Utilities.h"
00015 
00016 #include "tao/debug.h"
00017 
00018 #include "ace/Message_Queue.h"
00019 
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021 
00022 TAO_Notify_Buffering_Strategy::TAO_Notify_Buffering_Strategy (
00023   TAO_Notify_Message_Queue& msg_queue,
00024   const TAO_Notify_AdminProperties::Ptr& admin_properties)
00025 : msg_queue_ (msg_queue)
00026 , admin_properties_ (admin_properties)
00027 , global_queue_lock_ (admin_properties->global_queue_lock ())
00028 , global_queue_length_ (admin_properties->global_queue_length ())
00029 , max_queue_length_ (admin_properties->max_global_queue_length ())
00030 , order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder)
00031 , discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder)
00032 , max_events_per_consumer_ (CosNotification::MaxEventsPerConsumer)
00033 , blocking_policy_ (TAO_Notify_Extensions::BlockingPolicy)
00034 , global_not_full_ (admin_properties->global_queue_not_full())
00035 , local_not_full_ (global_queue_lock_)
00036 , local_not_empty_ (global_queue_lock_)
00037 , shutdown_ (false)
00038 {
00039 }
00040 
00041 TAO_Notify_Buffering_Strategy::~TAO_Notify_Buffering_Strategy ()
00042 {
00043 }
00044 
00045 void
00046 TAO_Notify_Buffering_Strategy::update_qos_properties
00047   (const TAO_Notify_QoSProperties& qos_properties)
00048 {
00049   this->order_policy_.set (qos_properties);
00050   this->discard_policy_.set (qos_properties);
00051   this->max_events_per_consumer_.set(qos_properties);
00052   this->blocking_policy_.set (qos_properties);
00053 }
00054 
00055 void
00056 TAO_Notify_Buffering_Strategy::shutdown (void)
00057 {
00058   ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_);
00059 
00060   if (this->shutdown_)
00061   {
00062     return;
00063   }
00064 
00065   this->shutdown_ = true;
00066 
00067   this->local_not_empty_.broadcast ();
00068   this->global_not_full_.broadcast();
00069   this->local_not_full_.broadcast();
00070 }
00071 
00072 int
00073 TAO_Notify_Buffering_Strategy::enqueue (TAO_Notify_Method_Request_Queueable* method_request)
00074 {
00075   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
00076 
00077   if (this->shutdown_)
00078     return -1;
00079 
00080   bool discarded_existing = false;
00081 
00082   bool local_overflow = this->max_events_per_consumer_.is_valid() &&
00083     static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
00084 
00085   bool global_overflow = this->max_queue_length_.value () != 0 &&
00086     this->global_queue_length_ >= this->max_queue_length_.value ();
00087 
00088   while (local_overflow || global_overflow)
00089     {
00090       if (blocking_policy_.is_valid())
00091         {
00092           ACE_Time_Value timeout;
00093           ORBSVCS_Time::TimeT_to_Time_Value(timeout, blocking_policy_.value());
00094           // Condition variables take an absolute time
00095           timeout += ACE_OS::gettimeofday();
00096           if (local_overflow)
00097             {
00098               local_not_full_.wait(&timeout);
00099             }
00100           else
00101             {
00102               global_not_full_.wait(&timeout);
00103             }
00104           if (errno != ETIME)
00105             {
00106               local_overflow =
00107                 this->max_events_per_consumer_.is_valid() &&
00108                 static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
00109               global_overflow =
00110                 this->max_queue_length_.value () != 0 &&
00111                 this->global_queue_length_ >= this->max_queue_length_.value ();
00112               continue;
00113             }
00114         }
00115 
00116       discarded_existing = this->discard(method_request);
00117       if (discarded_existing)
00118         {
00119           --this->global_queue_length_;
00120           local_not_full_.signal();
00121           global_not_full_.signal();
00122         }
00123       break;
00124     }
00125 
00126   if (! (local_overflow || global_overflow) || discarded_existing)
00127     {
00128       if (this->queue (method_request) == -1)
00129         {
00130           ACE_DEBUG((LM_DEBUG,
00131                      "Notify (%P|%t) - Panic! failed to enqueue event\n"));
00132           return -1;
00133         }
00134 
00135       ++this->global_queue_length_;
00136 
00137       local_not_empty_.signal ();
00138     }
00139   return this->msg_queue_.message_count ();
00140 }
00141 
00142 int
00143 TAO_Notify_Buffering_Strategy::dequeue (TAO_Notify_Method_Request_Queueable* &method_request, const ACE_Time_Value *abstime)
00144 {
00145   ACE_Message_Block *mb;
00146 
00147   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
00148 
00149   if ( this->shutdown_ )
00150     return -1;
00151 
00152   while (this->msg_queue_.message_count () == 0)
00153     {
00154       this->local_not_empty_.wait (abstime);
00155 
00156       if (this->shutdown_)
00157         return -1;
00158 
00159       if (errno == ETIME)
00160         return 0;
00161     }
00162 
00163   if (this->msg_queue_.dequeue (mb) == -1)
00164     return -1;
00165 
00166   method_request = dynamic_cast<TAO_Notify_Method_Request_Queueable*>(mb);
00167 
00168   if (method_request == 0)
00169     return -1;
00170 
00171   --this->global_queue_length_;
00172   local_not_full_.signal();
00173   global_not_full_.signal();
00174 
00175   return 1;
00176 }
00177 
00178 int
00179 TAO_Notify_Buffering_Strategy::queue (TAO_Notify_Method_Request_Queueable* method_request)
00180 {
00181   if ( this->shutdown_ )
00182     return -1;
00183 
00184   CORBA::Short order = this->order_policy_.value();
00185 
00186   if (! this->order_policy_.is_valid() ||
00187       order == CosNotification::AnyOrder ||
00188       order == CosNotification::FifoOrder)
00189     {
00190       if (TAO_debug_level > 0)
00191         ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in fifo order\n"));
00192       return this->msg_queue_.enqueue_tail (method_request);
00193     }
00194 
00195   if (order == CosNotification::PriorityOrder)
00196     {
00197       if (TAO_debug_level > 0)
00198         ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in priority order\n"));
00199       return this->msg_queue_.enqueue_prio (method_request);
00200     }
00201 
00202   if (order == CosNotification::DeadlineOrder)
00203     {
00204       if (TAO_debug_level > 0)
00205         ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in deadline order\n"));
00206       return this->msg_queue_.enqueue_deadline (method_request);
00207     }
00208 
00209   if (TAO_debug_level > 0)
00210     ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n"));
00211   return this->msg_queue_.enqueue_tail (method_request);
00212 }
00213 
00214 bool
00215 TAO_Notify_Buffering_Strategy::discard (TAO_Notify_Method_Request_Queueable* method_request)
00216 {
00217   if (this->shutdown_)
00218     {
00219       return false;
00220     }
00221 
00222   ACE_Message_Block* mb = 0;
00223   int result = -1;
00224 
00225   if (this->discard_policy_.is_valid() == 0 ||
00226       this->discard_policy_ == CosNotification::AnyOrder ||
00227       this->discard_policy_ == CosNotification::FifoOrder)
00228     {
00229       result = this->msg_queue_.dequeue_head (mb);
00230     }
00231   else if (this->discard_policy_ == CosNotification::LifoOrder)
00232     {
00233       // The most current message is NOT the newest one in the queue. It's
00234       // the one we're about to add to the queue.
00235       result = -1;
00236     }
00237   else if (this->discard_policy_ == CosNotification::DeadlineOrder)
00238     {
00239       result = this->msg_queue_.dequeue_deadline (mb);
00240     }
00241   else if (this->discard_policy_ == CosNotification::PriorityOrder)
00242     {
00243       result = this->msg_queue_.dequeue_prio (mb);
00244       if (mb->msg_priority() >= method_request->msg_priority())
00245         {
00246           this->msg_queue_.enqueue_prio (mb);
00247           result = -1;
00248         }
00249     }
00250   else
00251     {
00252       if (TAO_debug_level > 0)
00253         ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid discard policy\n"));
00254       result = this->msg_queue_.dequeue_head (mb);
00255     }
00256 
00257   if (result != -1)
00258     {
00259       ACE_Message_Block::release (mb);
00260       return true;
00261     }
00262 
00263   return false;
00264 }
00265 
00266 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:24:08 2006 for TAO_CosNotification by doxygen 1.3.6