00001 /* -*- C++ -*- */ 00002 /** 00003 * @file Buffering_Strategy.h 00004 * 00005 * $Id: Buffering_Strategy.h 81418 2008-04-24 11:11:22Z johnnyw $ 00006 * 00007 * @author Pradeep Gore <pradeep@oomworks.com> 00008 * 00009 * 00010 */ 00011 00012 #ifndef TAO_Notify_BUFFERING_STRATEGY_H 00013 #define TAO_Notify_BUFFERING_STRATEGY_H 00014 00015 #include /**/ "ace/pre.h" 00016 00017 #include "orbsvcs/Notify/notify_serv_export.h" 00018 00019 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00020 # pragma once 00021 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00022 00023 #include "ace/Null_Condition.h" 00024 #include "ace/Message_Queue.h" 00025 00026 #include "orbsvcs/TimeBaseC.h" 00027 00028 #include "orbsvcs/Notify/Property.h" 00029 #include "orbsvcs/Notify/Property_T.h" 00030 #include "orbsvcs/Notify/AdminProperties.h" 00031 00032 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00033 00034 class TAO_Notify_Method_Request_Queueable; 00035 class TAO_Notify_QoSProperties; 00036 00037 typedef ACE_Message_Queue<ACE_NULL_SYNCH> TAO_Notify_Message_Queue; 00038 00039 /** 00040 * @class TAO_Notify_Buffering_Strategy 00041 * 00042 * @brief Base Strategy to enqueue and dequeue items from a Message Queue. 00043 * 00044 */ 00045 class TAO_Notify_Serv_Export TAO_Notify_Buffering_Strategy 00046 { 00047 public: 00048 TAO_Notify_Buffering_Strategy ( 00049 TAO_Notify_Message_Queue& msg_queue, 00050 const TAO_Notify_AdminProperties::Ptr& admin_properties); 00051 00052 ~TAO_Notify_Buffering_Strategy (); 00053 00054 /// Update state with the following QoS Properties: 00055 /// Order Policy 00056 /// Discard Policy 00057 /// MaxEventsPerConsumer 00058 void update_qos_properties (const TAO_Notify_QoSProperties& qos_properties); 00059 00060 /// Enqueue according the enqueing strategy. 00061 /// Return -1 on error else the number of items in the queue. 00062 int enqueue (TAO_Notify_Method_Request_Queueable* method_request); 00063 00064 /// Dequeue batch. This method will block for @a abstime if non-zero or else blocks till an item is available. 00065 /// Return -1 on error or if nothing is available, else the number of items actually dequeued (1). 00066 int dequeue (TAO_Notify_Method_Request_Queueable* &method_request, 00067 const ACE_Time_Value *abstime); 00068 00069 /// Shutdown 00070 void shutdown (void); 00071 00072 /// Provide the time value of the oldest event in the queue. 00073 ACE_Time_Value oldest_event (void); 00074 00075 /// This interface allows tracking of the queue size 00076 class TAO_Notify_Serv_Export Tracker 00077 { 00078 public: 00079 virtual ~Tracker (void); 00080 virtual void update_queue_count (size_t count) = 0; 00081 }; 00082 00083 /// Set the tracker object. This strategy does not own the tracker. 00084 void set_tracker (Tracker* tracker); 00085 00086 private: 00087 00088 /// Apply the Order Policy and queue. return -1 on error. 00089 int queue (TAO_Notify_Method_Request_Queueable* method_request); 00090 00091 /// Discard as per the Discard Policy. 00092 bool discard (TAO_Notify_Method_Request_Queueable* method_request); 00093 00094 ///= Data Members 00095 00096 /// The local Message Queue 00097 TAO_Notify_Message_Queue& msg_queue_; 00098 00099 /// Reference to the properties per event channel. 00100 TAO_Notify_AdminProperties::Ptr admin_properties_; 00101 00102 /// The shared global lock used by all the queues. 00103 TAO_SYNCH_MUTEX& global_queue_lock_; 00104 00105 /// The global queue length - queue length accross all the queues. 00106 CORBA::Long& global_queue_length_; 00107 00108 /// The maximum events that can be queued overall. 00109 const TAO_Notify_Property_Long& max_queue_length_; 00110 00111 /// Order of events in internal buffers. 00112 TAO_Notify_Property_Short order_policy_; 00113 00114 /// Policy to discard when buffers are full. 00115 TAO_Notify_Property_Short discard_policy_; 00116 00117 TAO_Notify_Property_Long max_events_per_consumer_; 00118 TAO_Notify_Property_Time blocking_policy_; 00119 00120 TAO_SYNCH_CONDITION& global_not_full_; 00121 TAO_SYNCH_CONDITION local_not_full_; 00122 00123 /// Condition that batch size reached. 00124 TAO_SYNCH_CONDITION local_not_empty_; 00125 00126 /// Flag to shutdown. 00127 bool shutdown_; 00128 00129 /// Optional queue tracker 00130 Tracker* tracker_; 00131 }; 00132 00133 TAO_END_VERSIONED_NAMESPACE_DECL 00134 00135 #include /**/ "ace/post.h" 00136 00137 #endif /* TAO_Notify_BUFFERING_STRATEGY_H */