Buffering_Strategy.cpp

Go to the documentation of this file.
00001 // $Id: Buffering_Strategy.cpp 79230 2007-08-06 18:18:07Z elliott_c $
00002 
00003 #include "orbsvcs/Notify/Buffering_Strategy.h"
00004 
00005 ACE_RCSID (Notify, Buffering_Strategy, "$Id: Buffering_Strategy.cpp 79230 2007-08-06 18:18:07Z elliott_c $")
00006 
00007 
00008 #include "orbsvcs/Notify/Method_Request.h"
00009 #include "orbsvcs/Notify/Notify_Extensions.h"
00010 #include "orbsvcs/Notify/QoSProperties.h"
00011 #include "orbsvcs/Notify/Notify_Extensions.h"
00012 
00013 #include "orbsvcs/CosNotificationC.h"
00014 #include "orbsvcs/Time_Utilities.h"
00015 
00016 #include "tao/debug.h"
00017 
00018 #include "ace/Message_Queue.h"
00019 
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021 
00022 TAO_Notify_Buffering_Strategy::TAO_Notify_Buffering_Strategy (
00023   TAO_Notify_Message_Queue& msg_queue,
00024   const TAO_Notify_AdminProperties::Ptr& admin_properties)
00025 : msg_queue_ (msg_queue)
00026 , admin_properties_ (admin_properties)
00027 , global_queue_lock_ (admin_properties->global_queue_lock ())
00028 , global_queue_length_ (admin_properties->global_queue_length ())
00029 , max_queue_length_ (admin_properties->max_global_queue_length ())
00030 , order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder)
00031 , discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder)
00032 , max_events_per_consumer_ (CosNotification::MaxEventsPerConsumer)
00033 , blocking_policy_ (TAO_Notify_Extensions::BlockingPolicy)
00034 , global_not_full_ (admin_properties->global_queue_not_full())
00035 , local_not_full_ (global_queue_lock_)
00036 , local_not_empty_ (global_queue_lock_)
00037 , shutdown_ (false)
00038 {
00039 }
00040 
00041 TAO_Notify_Buffering_Strategy::~TAO_Notify_Buffering_Strategy ()
00042 {
00043 }
00044 
00045 void
00046 TAO_Notify_Buffering_Strategy::update_qos_properties
00047   (const TAO_Notify_QoSProperties& qos_properties)
00048 {
00049   this->order_policy_.set (qos_properties);
00050   this->discard_policy_.set (qos_properties);
00051   this->max_events_per_consumer_.set(qos_properties);
00052   this->blocking_policy_.set (qos_properties);
00053 }
00054 
00055 void
00056 TAO_Notify_Buffering_Strategy::shutdown (void)
00057 {
00058   ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_);
00059 
00060   if (this->shutdown_)
00061   {
00062     return;
00063   }
00064 
00065   this->shutdown_ = true;
00066 
00067   this->local_not_empty_.broadcast ();
00068   this->global_not_full_.broadcast();
00069   this->local_not_full_.broadcast();
00070 }
00071 
00072 ACE_Time_Value
00073 TAO_Notify_Buffering_Strategy::oldest_event (void)
00074 {
00075   ACE_Time_Value tv (ACE_Time_Value::max_time);
00076   ACE_Message_Block* mb = 0;
00077 
00078   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, tv);
00079   TAO_Notify_Message_Queue::ITERATOR itr (this->msg_queue_);
00080   while(itr.next (mb))
00081     {
00082       TAO_Notify_Method_Request_Queueable* event =
00083         dynamic_cast<TAO_Notify_Method_Request_Queueable*> (mb);
00084       if (event != 0)
00085         {
00086           const ACE_Time_Value& etime = event->creation_time ();
00087           if (etime < tv)
00088             tv = etime;
00089         }
00090       itr.advance ();
00091     }
00092 
00093   return tv;
00094 }
00095 
00096 int
00097 TAO_Notify_Buffering_Strategy::enqueue (TAO_Notify_Method_Request_Queueable* method_request)
00098 {
00099   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
00100 
00101   if (this->shutdown_)
00102     return -1;
00103 
00104   bool discarded_existing = false;
00105 
00106   bool local_overflow = this->max_events_per_consumer_.is_valid() &&
00107     static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
00108 
00109   bool global_overflow = this->max_queue_length_.value () != 0 &&
00110     this->global_queue_length_ >= this->max_queue_length_.value ();
00111 
00112   while (local_overflow || global_overflow)
00113     {
00114       if (blocking_policy_.is_valid())
00115         {
00116           ACE_Time_Value timeout;
00117           ORBSVCS_Time::TimeT_to_Time_Value(timeout, blocking_policy_.value());
00118           // Condition variables take an absolute time
00119           timeout += ACE_OS::gettimeofday();
00120           if (local_overflow)
00121             {
00122               local_not_full_.wait(&timeout);
00123             }
00124           else
00125             {
00126               global_not_full_.wait(&timeout);
00127             }
00128           if (errno != ETIME)
00129             {
00130               local_overflow =
00131                 this->max_events_per_consumer_.is_valid() &&
00132                 static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
00133               global_overflow =
00134                 this->max_queue_length_.value () != 0 &&
00135                 this->global_queue_length_ >= this->max_queue_length_.value ();
00136               continue;
00137             }
00138         }
00139 
00140       discarded_existing = this->discard(method_request);
00141       if (discarded_existing)
00142         {
00143           --this->global_queue_length_;
00144           local_not_full_.signal();
00145           global_not_full_.signal();
00146         }
00147       break;
00148     }
00149 
00150   if (! (local_overflow || global_overflow) || discarded_existing)
00151     {
00152       if (this->queue (method_request) == -1)
00153         {
00154           ACE_DEBUG((LM_DEBUG,
00155                      "Notify (%P|%t) - Panic! failed to enqueue event\n"));
00156           return -1;
00157         }
00158 
00159       ++this->global_queue_length_;
00160 
00161       local_not_empty_.signal ();
00162     }
00163   return ACE_Utils::truncate_cast<int> (this->msg_queue_.message_count ());
00164 }
00165 
00166 int
00167 TAO_Notify_Buffering_Strategy::dequeue (TAO_Notify_Method_Request_Queueable* &method_request, const ACE_Time_Value *abstime)
00168 {
00169   ACE_Message_Block *mb;
00170 
00171   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
00172 
00173   if ( this->shutdown_ )
00174     return -1;
00175 
00176   while (this->msg_queue_.message_count () == 0)
00177     {
00178       this->local_not_empty_.wait (abstime);
00179 
00180       if (this->shutdown_)
00181         return -1;
00182 
00183       if (errno == ETIME)
00184         return 0;
00185     }
00186 
00187   if (this->msg_queue_.dequeue (mb) == -1)
00188     return -1;
00189 
00190   method_request = dynamic_cast<TAO_Notify_Method_Request_Queueable*>(mb);
00191 
00192   if (method_request == 0)
00193     return -1;
00194 
00195   --this->global_queue_length_;
00196   local_not_full_.signal();
00197   global_not_full_.signal();
00198 
00199   return 1;
00200 }
00201 
00202 int
00203 TAO_Notify_Buffering_Strategy::queue (TAO_Notify_Method_Request_Queueable* method_request)
00204 {
00205   if ( this->shutdown_ )
00206     return -1;
00207 
00208   CORBA::Short order = this->order_policy_.value();
00209 
00210   if (! this->order_policy_.is_valid() ||
00211       order == CosNotification::AnyOrder ||
00212       order == CosNotification::FifoOrder)
00213     {
00214       if (TAO_debug_level > 0)
00215         ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in fifo order\n"));
00216       return this->msg_queue_.enqueue_tail (method_request);
00217     }
00218 
00219   if (order == CosNotification::PriorityOrder)
00220     {
00221       if (TAO_debug_level > 0)
00222         ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in priority order\n"));
00223       return this->msg_queue_.enqueue_prio (method_request);
00224     }
00225 
00226   if (order == CosNotification::DeadlineOrder)
00227     {
00228       if (TAO_debug_level > 0)
00229         ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in deadline order\n"));
00230       return this->msg_queue_.enqueue_deadline (method_request);
00231     }
00232 
00233   if (TAO_debug_level > 0)
00234     ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n"));
00235   return this->msg_queue_.enqueue_tail (method_request);
00236 }
00237 
00238 bool
00239 TAO_Notify_Buffering_Strategy::discard (TAO_Notify_Method_Request_Queueable* method_request)
00240 {
00241   if (this->shutdown_)
00242     {
00243       return false;
00244     }
00245 
00246   ACE_Message_Block* mb = 0;
00247   int result = -1;
00248 
00249   if (this->discard_policy_.is_valid() == 0 ||
00250       this->discard_policy_ == CosNotification::AnyOrder ||
00251       this->discard_policy_ == CosNotification::FifoOrder)
00252     {
00253       result = this->msg_queue_.dequeue_head (mb);
00254     }
00255   else if (this->discard_policy_ == CosNotification::LifoOrder)
00256     {
00257       // The most current message is NOT the newest one in the queue. It's
00258       // the one we're about to add to the queue.
00259       result = -1;
00260     }
00261   else if (this->discard_policy_ == CosNotification::DeadlineOrder)
00262     {
00263       result = this->msg_queue_.dequeue_deadline (mb);
00264     }
00265   else if (this->discard_policy_ == CosNotification::PriorityOrder)
00266     {
00267       result = this->msg_queue_.dequeue_prio (mb);
00268       if (mb->msg_priority() >= method_request->msg_priority())
00269         {
00270           this->msg_queue_.enqueue_prio (mb);
00271           result = -1;
00272         }
00273     }
00274   else
00275     {
00276       if (TAO_debug_level > 0)
00277         ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid discard policy\n"));
00278       result = this->msg_queue_.dequeue_head (mb);
00279     }
00280 
00281   if (result != -1)
00282     {
00283       ACE_Message_Block::release (mb);
00284       return true;
00285     }
00286 
00287   return false;
00288 }
00289 
00290 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Sun Jan 27 15:39:52 2008 for TAO_CosNotification by doxygen 1.3.6