Buffering_Strategy.cpp

Go to the documentation of this file.
00001 // $Id: Buffering_Strategy.cpp 81070 2008-03-24 12:30:52Z elliott_c $
00002 
00003 #include "orbsvcs/Notify/Buffering_Strategy.h"
00004 
00005 ACE_RCSID (Notify, Buffering_Strategy, "$Id: Buffering_Strategy.cpp 81070 2008-03-24 12:30:52Z elliott_c $")
00006 
00007 
00008 #include "orbsvcs/Notify/Method_Request.h"
00009 #include "orbsvcs/Notify/Notify_Extensions.h"
00010 #include "orbsvcs/Notify/QoSProperties.h"
00011 #include "orbsvcs/Notify/Notify_Extensions.h"
00012 
00013 #include "orbsvcs/CosNotificationC.h"
00014 #include "orbsvcs/Time_Utilities.h"
00015 
00016 #include "tao/debug.h"
00017 
00018 #include "ace/Message_Queue.h"
00019 
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021 
00022 TAO_Notify_Buffering_Strategy::TAO_Notify_Buffering_Strategy (
00023   TAO_Notify_Message_Queue& msg_queue,
00024   const TAO_Notify_AdminProperties::Ptr& admin_properties)
00025 : msg_queue_ (msg_queue)
00026 , admin_properties_ (admin_properties)
00027 , global_queue_lock_ (admin_properties->global_queue_lock ())
00028 , global_queue_length_ (admin_properties->global_queue_length ())
00029 , max_queue_length_ (admin_properties->max_global_queue_length ())
00030 , order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder)
00031 , discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder)
00032 , max_events_per_consumer_ (CosNotification::MaxEventsPerConsumer)
00033 , blocking_policy_ (TAO_Notify_Extensions::BlockingPolicy)
00034 , global_not_full_ (admin_properties->global_queue_not_full())
00035 , local_not_full_ (global_queue_lock_)
00036 , local_not_empty_ (global_queue_lock_)
00037 , shutdown_ (false)
00038 , tracker_ (0)
00039 {
00040 }
00041 
00042 TAO_Notify_Buffering_Strategy::~TAO_Notify_Buffering_Strategy ()
00043 {
00044 }
00045 
00046 void
00047 TAO_Notify_Buffering_Strategy::update_qos_properties
00048   (const TAO_Notify_QoSProperties& qos_properties)
00049 {
00050   this->order_policy_.set (qos_properties);
00051   this->discard_policy_.set (qos_properties);
00052   this->max_events_per_consumer_.set(qos_properties);
00053   this->blocking_policy_.set (qos_properties);
00054 }
00055 
00056 void
00057 TAO_Notify_Buffering_Strategy::shutdown (void)
00058 {
00059   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_);
00060 
00061   if (this->shutdown_)
00062   {
00063     return;
00064   }
00065 
00066   this->shutdown_ = true;
00067 
00068   this->local_not_empty_.broadcast ();
00069   this->global_not_full_.broadcast();
00070   this->local_not_full_.broadcast();
00071 }
00072 
00073 ACE_Time_Value
00074 TAO_Notify_Buffering_Strategy::oldest_event (void)
00075 {
00076   ACE_Time_Value tv (ACE_Time_Value::max_time);
00077   ACE_Message_Block* mb = 0;
00078 
00079   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, tv);
00080   TAO_Notify_Message_Queue::ITERATOR itr (this->msg_queue_);
00081   while(itr.next (mb))
00082     {
00083       TAO_Notify_Method_Request_Queueable* event =
00084         dynamic_cast<TAO_Notify_Method_Request_Queueable*> (mb);
00085       if (event != 0)
00086         {
00087           const ACE_Time_Value& etime = event->creation_time ();
00088           if (etime < tv)
00089             tv = etime;
00090         }
00091       itr.advance ();
00092     }
00093 
00094   return tv;
00095 }
00096 
00097 TAO_Notify_Buffering_Strategy::Tracker::~Tracker (void)
00098 {
00099 }
00100 
00101 int
00102 TAO_Notify_Buffering_Strategy::enqueue (TAO_Notify_Method_Request_Queueable* method_request)
00103 {
00104   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
00105 
00106   if (this->shutdown_)
00107     return -1;
00108 
00109   bool discarded_existing = false;
00110 
00111   bool local_overflow = this->max_events_per_consumer_.is_valid() &&
00112     static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
00113 
00114   bool global_overflow = this->max_queue_length_.value () != 0 &&
00115     this->global_queue_length_ >= this->max_queue_length_.value ();
00116 
00117   while (local_overflow || global_overflow)
00118     {
00119       if (blocking_policy_.is_valid())
00120         {
00121           ACE_Time_Value timeout;
00122           ORBSVCS_Time::TimeT_to_Time_Value(timeout, blocking_policy_.value());
00123           // Condition variables take an absolute time
00124           timeout += ACE_OS::gettimeofday();
00125           if (local_overflow)
00126             {
00127               local_not_full_.wait(&timeout);
00128             }
00129           else
00130             {
00131               global_not_full_.wait(&timeout);
00132             }
00133           if (errno != ETIME)
00134             {
00135               local_overflow =
00136                 this->max_events_per_consumer_.is_valid() &&
00137                 static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
00138               global_overflow =
00139                 this->max_queue_length_.value () != 0 &&
00140                 this->global_queue_length_ >= this->max_queue_length_.value ();
00141               continue;
00142             }
00143         }
00144 
00145       discarded_existing = this->discard(method_request);
00146       if (discarded_existing)
00147         {
00148           --this->global_queue_length_;
00149           local_not_full_.signal();
00150           global_not_full_.signal();
00151         }
00152       break;
00153     }
00154 
00155   if (! (local_overflow || global_overflow) || discarded_existing)
00156     {
00157       if (this->queue (method_request) == -1)
00158         {
00159           ACE_DEBUG((LM_DEBUG,
00160                      "Notify (%P|%t) - Panic! failed to enqueue event\n"));
00161           return -1;
00162         }
00163 
00164       ++this->global_queue_length_;
00165 
00166       local_not_empty_.signal ();
00167     }
00168 
00169   size_t count = this->msg_queue_.message_count ();
00170   if (this->tracker_ != 0)
00171     this->tracker_->update_queue_count (count);
00172 
00173   return ACE_Utils::truncate_cast<int> (count);
00174 }
00175 
00176 int
00177 TAO_Notify_Buffering_Strategy::dequeue (TAO_Notify_Method_Request_Queueable* &method_request, const ACE_Time_Value *abstime)
00178 {
00179   ACE_Message_Block *mb;
00180 
00181   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
00182 
00183   if ( this->shutdown_ )
00184     return -1;
00185 
00186   while (this->msg_queue_.message_count () == 0)
00187     {
00188       this->local_not_empty_.wait (abstime);
00189 
00190       if (this->shutdown_)
00191         return -1;
00192 
00193       if (errno == ETIME)
00194         return 0;
00195     }
00196 
00197   if (this->msg_queue_.dequeue (mb) == -1)
00198     return -1;
00199 
00200   if (this->tracker_ != 0)
00201     this->tracker_->update_queue_count (this->msg_queue_.message_count ());
00202 
00203   method_request = dynamic_cast<TAO_Notify_Method_Request_Queueable*>(mb);
00204 
00205   if (method_request == 0)
00206     return -1;
00207 
00208   --this->global_queue_length_;
00209   local_not_full_.signal();
00210   global_not_full_.signal();
00211 
00212   return 1;
00213 }
00214 
00215 void
00216 TAO_Notify_Buffering_Strategy::set_tracker (
00217                         TAO_Notify_Buffering_Strategy::Tracker* tracker)
00218 {
00219   this->tracker_ = tracker;
00220 }
00221 
00222 int
00223 TAO_Notify_Buffering_Strategy::queue (TAO_Notify_Method_Request_Queueable* method_request)
00224 {
00225   if ( this->shutdown_ )
00226     return -1;
00227 
00228   CORBA::Short order = this->order_policy_.value();
00229 
00230   if (! this->order_policy_.is_valid() ||
00231       order == CosNotification::AnyOrder ||
00232       order == CosNotification::FifoOrder)
00233     {
00234       if (TAO_debug_level > 0)
00235         ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in fifo order\n"));
00236       return this->msg_queue_.enqueue_tail (method_request);
00237     }
00238 
00239   if (order == CosNotification::PriorityOrder)
00240     {
00241       if (TAO_debug_level > 0)
00242         ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in priority order\n"));
00243       return this->msg_queue_.enqueue_prio (method_request);
00244     }
00245 
00246   if (order == CosNotification::DeadlineOrder)
00247     {
00248       if (TAO_debug_level > 0)
00249         ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in deadline order\n"));
00250       return this->msg_queue_.enqueue_deadline (method_request);
00251     }
00252 
00253   if (TAO_debug_level > 0)
00254     ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n"));
00255   return this->msg_queue_.enqueue_tail (method_request);
00256 }
00257 
00258 bool
00259 TAO_Notify_Buffering_Strategy::discard (TAO_Notify_Method_Request_Queueable* method_request)
00260 {
00261   if (this->shutdown_)
00262     {
00263       return false;
00264     }
00265 
00266   ACE_Message_Block* mb = 0;
00267   int result = -1;
00268 
00269   if (this->discard_policy_.is_valid() == 0 ||
00270       this->discard_policy_ == CosNotification::AnyOrder ||
00271       this->discard_policy_ == CosNotification::FifoOrder)
00272     {
00273       result = this->msg_queue_.dequeue_head (mb);
00274     }
00275   else if (this->discard_policy_ == CosNotification::LifoOrder)
00276     {
00277       // The most current message is NOT the newest one in the queue. It's
00278       // the one we're about to add to the queue.
00279       result = -1;
00280     }
00281   else if (this->discard_policy_ == CosNotification::DeadlineOrder)
00282     {
00283       result = this->msg_queue_.dequeue_deadline (mb);
00284     }
00285   else if (this->discard_policy_ == CosNotification::PriorityOrder)
00286     {
00287       result = this->msg_queue_.dequeue_prio (mb);
00288       if (mb->msg_priority() >= method_request->msg_priority())
00289         {
00290           this->msg_queue_.enqueue_prio (mb);
00291           result = -1;
00292         }
00293     }
00294   else
00295     {
00296       if (TAO_debug_level > 0)
00297         ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid discard policy\n"));
00298       result = this->msg_queue_.dequeue_head (mb);
00299     }
00300 
00301   if (result != -1)
00302     {
00303       ACE_Message_Block::release (mb);
00304       return true;
00305     }
00306 
00307   return false;
00308 }
00309 
00310 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:45:28 2010 for TAO_CosNotification by  doxygen 1.4.7