00001 /* -*- C++ -*- */ 00002 /** 00003 * @file Consumer.h 00004 * 00005 * $Id: Consumer.h 81422 2008-04-24 12:33:29Z johnnyw $ 00006 * 00007 * @author Pradeep Gore <pradeep@oomworks.com> 00008 * 00009 * 00010 */ 00011 00012 #ifndef TAO_NOTIFY_CONSUMER_H 00013 #define TAO_NOTIFY_CONSUMER_H 00014 00015 #include /**/ "ace/pre.h" 00016 00017 #include "orbsvcs/Notify/notify_serv_export.h" 00018 00019 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00020 # pragma once 00021 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00022 00023 #include "orbsvcs/CosNotifyCommC.h" 00024 #include "orbsvcs/CosNotificationC.h" 00025 00026 #include "orbsvcs/Notify/Peer.h" 00027 #include "orbsvcs/Notify/Event.h" 00028 #include "orbsvcs/Notify/Timer.h" 00029 #include "ace/Event_Handler.h" 00030 00031 00032 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00033 00034 class TAO_Notify_ProxySupplier; 00035 class TAO_Notify_Proxy; 00036 class TAO_Notify_Method_Request_Event_Queueable; 00037 class TAO_Notify_Method_Request_Event; 00038 /** 00039 * @class TAO_Notify_Consumer 00040 * 00041 * @brief Astract Base class for wrapping consumer objects that connect to the EventChannel 00042 * 00043 */ 00044 class TAO_Notify_Serv_Export TAO_Notify_Consumer 00045 : public TAO_Notify_Peer 00046 , public ACE_Event_Handler // to support timer 00047 { 00048 00049 public: 00050 /// Status returned from dispatch attempts 00051 enum DispatchStatus { 00052 DISPATCH_SUCCESS, 00053 DISPATCH_RETRY, // retry this message 00054 DISPATCH_DISCARD, // discard this message 00055 DISPATCH_FAIL}; // discard all messages and disconnect consumer 00056 00057 public: 00058 /// Constructor 00059 TAO_Notify_Consumer (TAO_Notify_ProxySupplier* proxy); 00060 00061 /// Destructor 00062 virtual ~TAO_Notify_Consumer (); 00063 00064 /// Access Specific Proxy. 00065 TAO_Notify_ProxySupplier* proxy_supplier (void); 00066 00067 /// Access Base Proxy. 00068 virtual TAO_Notify_Proxy* proxy (void); 00069 00070 /// Dispatch Event to consumer 00071 void deliver (TAO_Notify_Method_Request_Event * request); 00072 00073 /// Push @a event to this consumer. 00074 virtual void push (const CORBA::Any& event) = 0; 00075 00076 /// Push @a event to this consumer. 00077 virtual void push (const CosNotification::StructuredEvent& event) = 0; 00078 00079 /// Push a batch of events to this consumer. 00080 virtual void push (const CosNotification::EventBatch& event) = 0; 00081 00082 /// Dispatch the batch of events to the attached consumer 00083 DispatchStatus dispatch_batch (const CosNotification::EventBatch& batch); 00084 00085 /// Dispatch the pending events 00086 void dispatch_pending (void); 00087 00088 /// Is the connection suspended? 00089 CORBA::Boolean is_suspended (void); 00090 00091 /// Suspend Connection 00092 void suspend (void); 00093 00094 /// Resume Connection 00095 void resume (void); 00096 00097 /// Shutdown the consumer 00098 virtual void shutdown (void); 00099 00100 /// On reconnect we need to move events from the old consumer 00101 /// to the new one 00102 virtual void reconnect_from_consumer (TAO_Notify_Consumer* old_consumer) = 0; 00103 00104 /// Override, Peer::qos_changed 00105 virtual void qos_changed (const TAO_Notify_QoSProperties& qos_properties); 00106 00107 /// Take the pending queue from the rhs, cancel it's timer and 00108 /// schedule our timer. The caller should have locked the proxy lock 00109 /// before calling this method. 00110 void assume_pending_events (TAO_Notify_Consumer& rhs); 00111 00112 protected: 00113 typedef ACE_Unbounded_Queue<TAO_Notify_Method_Request_Event_Queueable *> Request_Queue; 00114 00115 DispatchStatus dispatch_request (TAO_Notify_Method_Request_Event * request); 00116 00117 /** 00118 * \brief Attempt to dispatch event from a queue. 00119 * 00120 * Called by dispatch_pending. Deliver one or more events to the Consumer. 00121 * If delivery fails, events are left in the queue (or discarded depending 00122 * on QoS parameters.) 00123 * Undelivered, undiscarded requests are left at the front of the queue. 00124 * Overridden in sequence consumer to dispatch as an EventBatch. 00125 * \return false if delivery failed and the request(s) cannot be discarded. 00126 */ 00127 virtual bool dispatch_from_queue ( 00128 Request_Queue & requests, 00129 ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon); 00130 00131 void enqueue_request(TAO_Notify_Method_Request_Event * request); 00132 00133 /// Add request to a queue if necessary. 00134 /// Overridden by sequence consumer to "always" put incoming events into the queue. 00135 /// @returns true the request has been enqueued; false the request should be handled now. 00136 virtual bool enqueue_if_necessary( 00137 TAO_Notify_Method_Request_Event * request); 00138 00139 // Dispatch updates 00140 virtual void dispatch_updates_i (const CosNotification::EventTypeSeq& added, 00141 const CosNotification::EventTypeSeq& removed); 00142 00143 /// Get the shared Proxy Lock 00144 TAO_SYNCH_MUTEX* proxy_lock (void); 00145 00146 protected: 00147 virtual int handle_timeout (const ACE_Time_Value& current_time, 00148 const void* act = 0); 00149 00150 00151 /// Schedule timer 00152 void schedule_timer (bool is_error = false); 00153 00154 /// Cancel timer 00155 void cancel_timer (void); 00156 00157 ///= Protected Data Members 00158 protected: 00159 Request_Queue& pending_events(); 00160 00161 /// The Proxy that we associate with. 00162 TAO_Notify_ProxySupplier* proxy_; 00163 00164 /// Suspended Flag. 00165 CORBA::Boolean is_suspended_; 00166 00167 /// Interface that accepts offer_changes 00168 CosNotifyComm::NotifyPublish_var publish_; 00169 bool have_not_yet_verified_publish_; 00170 00171 /// The Pacing Interval 00172 const TAO_Notify_Property_Time & pacing_; 00173 00174 /// Max. batch size. 00175 TAO_Notify_Property_Long max_batch_size_; 00176 00177 /// Timer Id. 00178 long timer_id_; 00179 00180 /// The Timer Manager that we use. 00181 TAO_Notify_Timer::Ptr timer_; 00182 00183 private: 00184 00185 /// Events pending to be delivered. 00186 ACE_Auto_Ptr< Request_Queue > pending_events_; 00187 }; 00188 00189 TAO_END_VERSIONED_NAMESPACE_DECL 00190 00191 #if defined (__ACE_INLINE__) 00192 #include "orbsvcs/Notify/Consumer.inl" 00193 #endif /* __ACE_INLINE__ */ 00194 00195 #include /**/ "ace/post.h" 00196 00197 #endif /* TAO_NOTIFY_CONSUMER_H */