00001 /* -*- C++ -*- */ 00002 /** 00003 * @file Buffering_Strategy.h 00004 * 00005 * $Id: Buffering_Strategy.h 87253 2009-10-28 23:29:32Z dai_y $ 00006 * 00007 * @author Pradeep Gore <pradeep@oomworks.com> 00008 * 00009 * 00010 */ 00011 00012 #ifndef TAO_Notify_BUFFERING_STRATEGY_H 00013 #define TAO_Notify_BUFFERING_STRATEGY_H 00014 00015 #include /**/ "ace/pre.h" 00016 00017 #include "orbsvcs/Notify/notify_serv_export.h" 00018 00019 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00020 # pragma once 00021 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00022 00023 #include "ace/Null_Condition.h" 00024 #include "ace/Message_Queue.h" 00025 00026 #include "orbsvcs/TimeBaseC.h" 00027 00028 #include "orbsvcs/Notify/Property.h" 00029 #include "orbsvcs/Notify/Property_T.h" 00030 #include "orbsvcs/Notify/AdminProperties.h" 00031 00032 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00033 00034 class TAO_Notify_Method_Request_Queueable; 00035 class TAO_Notify_QoSProperties; 00036 00037 typedef ACE_Message_Queue<ACE_NULL_SYNCH> TAO_Notify_Message_Queue; 00038 00039 /** 00040 * @class TAO_Notify_Buffering_Strategy 00041 * 00042 * @brief Base Strategy to enqueue and dequeue items from a Message Queue. 00043 * 00044 */ 00045 class TAO_Notify_Serv_Export TAO_Notify_Buffering_Strategy 00046 { 00047 public: 00048 TAO_Notify_Buffering_Strategy ( 00049 TAO_Notify_Message_Queue& msg_queue, 00050 const TAO_Notify_AdminProperties::Ptr& admin_properties); 00051 00052 ~TAO_Notify_Buffering_Strategy (); 00053 00054 /// Update state with the following QoS Properties: 00055 /// Order Policy 00056 /// Discard Policy 00057 /// MaxEventsPerConsumer 00058 void update_qos_properties (const TAO_Notify_QoSProperties& qos_properties); 00059 00060 /// Enqueue according the enqueing strategy. 00061 /// Return -1 on error else the number of items in the queue. 00062 int enqueue (TAO_Notify_Method_Request_Queueable* method_request); 00063 00064 /// Dequeue batch. This method will block for @a abstime if non-zero or else blocks till an item is available. 00065 /// Return -1 on error or if nothing is available, else the number of items actually dequeued (1). 00066 int dequeue (TAO_Notify_Method_Request_Queueable* &method_request, 00067 const ACE_Time_Value *abstime); 00068 00069 /// Shutdown 00070 void shutdown (void); 00071 00072 /// Provide the time value of the oldest event in the queue. 00073 ACE_Time_Value oldest_event (void); 00074 00075 /// This interface allows tracking of the queue size 00076 class TAO_Notify_Serv_Export Tracker 00077 { 00078 public: 00079 Tracker (void); 00080 virtual ~Tracker (void); 00081 virtual void update_queue_count (size_t count) = 0; 00082 virtual void count_queue_overflow (bool local_overflow, bool global_overflow) = 0; 00083 void register_child (Tracker * child); 00084 void unregister_child (Tracker * child); 00085 protected: 00086 Tracker * child_; 00087 }; 00088 00089 /// Set the tracker object. This strategy does not own the tracker. 00090 void set_tracker (Tracker* tracker); 00091 00092 private: 00093 00094 /// Apply the Order Policy and queue. return -1 on error. 00095 int queue (TAO_Notify_Method_Request_Queueable* method_request); 00096 00097 /// Discard as per the Discard Policy. 00098 bool discard (TAO_Notify_Method_Request_Queueable* method_request); 00099 00100 ///= Data Members 00101 00102 /// The local Message Queue 00103 TAO_Notify_Message_Queue& msg_queue_; 00104 00105 /// Reference to the properties per event channel. 00106 TAO_Notify_AdminProperties::Ptr admin_properties_; 00107 00108 /// The shared global lock used by all the queues. 00109 TAO_SYNCH_MUTEX& global_queue_lock_; 00110 00111 /// The global queue length - queue length accross all the queues. 00112 CORBA::Long& global_queue_length_; 00113 00114 /// The maximum events that can be queued overall. 00115 const TAO_Notify_Property_Long& max_queue_length_; 00116 00117 /// Order of events in internal buffers. 00118 TAO_Notify_Property_Short order_policy_; 00119 00120 /// Policy to discard when buffers are full. 00121 TAO_Notify_Property_Short discard_policy_; 00122 00123 TAO_Notify_Property_Long max_events_per_consumer_; 00124 TAO_Notify_Property_Time blocking_policy_; 00125 00126 TAO_SYNCH_CONDITION& global_not_full_; 00127 TAO_SYNCH_CONDITION local_not_full_; 00128 00129 /// Condition that batch size reached. 00130 TAO_SYNCH_CONDITION local_not_empty_; 00131 00132 /// Flag to shutdown. 00133 bool shutdown_; 00134 00135 /// Optional queue tracker 00136 Tracker* tracker_; 00137 }; 00138 00139 TAO_END_VERSIONED_NAMESPACE_DECL 00140 00141 #include /**/ "ace/post.h" 00142 00143 #endif /* TAO_Notify_BUFFERING_STRATEGY_H */