00001 /* -*- C++ -*- */ 00002 /** 00003 * @file Buffering_Strategy.h 00004 * 00005 * Buffering_Strategy.h,v 1.16 2006/03/14 06:14:34 jtc Exp 00006 * 00007 * @author Pradeep Gore <pradeep@oomworks.com> 00008 * 00009 * 00010 */ 00011 00012 #ifndef TAO_Notify_BUFFERING_STRATEGY_H 00013 #define TAO_Notify_BUFFERING_STRATEGY_H 00014 00015 #include /**/ "ace/pre.h" 00016 00017 #include "orbsvcs/Notify/notify_serv_export.h" 00018 00019 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00020 # pragma once 00021 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00022 00023 #include "ace/Null_Condition.h" 00024 #include "ace/Message_Queue.h" 00025 00026 #include "orbsvcs/TimeBaseC.h" 00027 00028 #include "orbsvcs/Notify/Property.h" 00029 #include "orbsvcs/Notify/Property_T.h" 00030 #include "orbsvcs/Notify/AdminProperties.h" 00031 00032 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00033 00034 class TAO_Notify_Method_Request_Queueable; 00035 class TAO_Notify_QoSProperties; 00036 00037 typedef ACE_Message_Queue<ACE_NULL_SYNCH> TAO_Notify_Message_Queue; 00038 00039 /** 00040 * @class TAO_Notify_Buffering_Strategy 00041 * 00042 * @brief Base Strategy to enqueue and dequeue items from a Message Queue. 00043 * 00044 */ 00045 class TAO_Notify_Serv_Export TAO_Notify_Buffering_Strategy 00046 { 00047 public: 00048 TAO_Notify_Buffering_Strategy ( 00049 TAO_Notify_Message_Queue& msg_queue, 00050 const TAO_Notify_AdminProperties::Ptr& admin_properties); 00051 00052 ~TAO_Notify_Buffering_Strategy (); 00053 00054 /// Update state with the following QoS Properties: 00055 /// Order Policy 00056 /// Discard Policy 00057 /// MaxEventsPerConsumer 00058 void update_qos_properties (const TAO_Notify_QoSProperties& qos_properties); 00059 00060 /// Enqueue according the enqueing strategy. 00061 /// Return -1 on error else the number of items in the queue. 00062 int enqueue (TAO_Notify_Method_Request_Queueable* method_request); 00063 00064 /// Dequeue batch. This method will block for @a abstime if non-zero or else blocks till an item is available. 00065 /// Return -1 on error or if nothing is available, else the number of items actually dequeued (1). 00066 int dequeue (TAO_Notify_Method_Request_Queueable* &method_request, 00067 const ACE_Time_Value *abstime); 00068 00069 /// Shutdown 00070 void shutdown (void); 00071 00072 private: 00073 00074 /// Apply the Order Policy and queue. return -1 on error. 00075 int queue (TAO_Notify_Method_Request_Queueable* method_request); 00076 00077 /// Discard as per the Discard Policy. 00078 bool discard (TAO_Notify_Method_Request_Queueable* method_request); 00079 00080 ///= Data Members 00081 00082 /// The local Message Queue 00083 TAO_Notify_Message_Queue& msg_queue_; 00084 00085 /// Reference to the properties per event channel. 00086 TAO_Notify_AdminProperties::Ptr admin_properties_; 00087 00088 /// The shared global lock used by all the queues. 00089 ACE_SYNCH_MUTEX& global_queue_lock_; 00090 00091 /// The global queue length - queue length accross all the queues. 00092 CORBA::Long& global_queue_length_; 00093 00094 /// The maximum events that can be queued overall. 00095 const TAO_Notify_Property_Long& max_queue_length_; 00096 00097 /// Order of events in internal buffers. 00098 TAO_Notify_Property_Short order_policy_; 00099 00100 /// Policy to discard when buffers are full. 00101 TAO_Notify_Property_Short discard_policy_; 00102 00103 TAO_Notify_Property_Long max_events_per_consumer_; 00104 TAO_Notify_Property_Time blocking_policy_; 00105 00106 00107 TAO_SYNCH_CONDITION& global_not_full_; 00108 TAO_SYNCH_CONDITION local_not_full_; 00109 00110 /// Condition that batch size reached. 00111 TAO_SYNCH_CONDITION local_not_empty_; 00112 00113 /// Flag to shutdown. 00114 bool shutdown_; 00115 }; 00116 00117 TAO_END_VERSIONED_NAMESPACE_DECL 00118 00119 #include /**/ "ace/post.h" 00120 00121 #endif /* TAO_Notify_BUFFERING_STRATEGY_H */