00001 /* -*- C++ -*- */ 00002 /** 00003 * @file Consumer.h 00004 * 00005 * $Id: Consumer.h 87253 2009-10-28 23:29:32Z dai_y $ 00006 * 00007 * @author Pradeep Gore <pradeep@oomworks.com> 00008 * 00009 * 00010 */ 00011 00012 #ifndef TAO_NOTIFY_CONSUMER_H 00013 #define TAO_NOTIFY_CONSUMER_H 00014 00015 #include /**/ "ace/pre.h" 00016 00017 #include "orbsvcs/Notify/notify_serv_export.h" 00018 00019 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00020 # pragma once 00021 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00022 00023 #include "orbsvcs/CosNotifyCommC.h" 00024 #include "orbsvcs/CosNotificationC.h" 00025 00026 #include "orbsvcs/Notify/Peer.h" 00027 #include "orbsvcs/Notify/Event.h" 00028 #include "orbsvcs/Notify/Timer.h" 00029 #include "ace/Event_Handler.h" 00030 #include "ace/Atomic_Op.h" 00031 00032 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00033 00034 class TAO_Notify_ProxySupplier; 00035 class TAO_Notify_Proxy; 00036 class TAO_Notify_Method_Request_Event_Queueable; 00037 class TAO_Notify_Method_Request_Event; 00038 /** 00039 * @class TAO_Notify_Consumer 00040 * 00041 * @brief Astract Base class for wrapping consumer objects that connect to the EventChannel 00042 * 00043 */ 00044 class TAO_Notify_Serv_Export TAO_Notify_Consumer 00045 : public TAO_Notify_Peer 00046 , public ACE_Event_Handler // to support timer 00047 { 00048 00049 public: 00050 /// Status returned from dispatch attempts 00051 enum DispatchStatus { 00052 DISPATCH_SUCCESS, 00053 DISPATCH_RETRY, // retry this message 00054 DISPATCH_DISCARD, // discard this message 00055 DISPATCH_FAIL, // discard all messages and disconnect consumer 00056 DISPATCH_FAIL_TIMEOUT // Same as DISPATCH_FAIL, but due to a timeout 00057 }; 00058 00059 public: 00060 00061 typedef TAO_Notify_Refcountable_Guard_T< TAO_Notify_Consumer > Ptr; 00062 00063 /// Constructor 00064 TAO_Notify_Consumer (TAO_Notify_ProxySupplier* proxy); 00065 00066 /// Destructor 00067 virtual ~TAO_Notify_Consumer (); 00068 00069 /// This method sigantures deliberately match the RefCounting methods required for ESF Proxy 00070 CORBA::ULong _incr_refcnt (void); 00071 CORBA::ULong _decr_refcnt (void); 00072 00073 /// Access Specific Proxy. 00074 TAO_Notify_ProxySupplier* proxy_supplier (void); 00075 00076 /// Access Base Proxy. 00077 virtual TAO_Notify_Proxy* proxy (void); 00078 00079 /// Dispatch Event to consumer 00080 void deliver (TAO_Notify_Method_Request_Event * request); 00081 00082 /// Push @a event to this consumer. 00083 virtual void push (const CORBA::Any& event) = 0; 00084 00085 /// Push @a event to this consumer. 00086 virtual void push (const CosNotification::StructuredEvent& event) = 0; 00087 00088 /// Push a batch of events to this consumer. 00089 virtual void push (const CosNotification::EventBatch& event) = 0; 00090 00091 /// Dispatch the batch of events to the attached consumer 00092 DispatchStatus dispatch_batch (const CosNotification::EventBatch& batch); 00093 00094 /// Dispatch the pending events 00095 void dispatch_pending (void); 00096 00097 /// Is the connection suspended? 00098 CORBA::Boolean is_suspended (void); 00099 00100 /// Suspend Connection 00101 void suspend (void); 00102 00103 /// Resume Connection 00104 void resume (void); 00105 00106 /// Shutdown the consumer 00107 virtual void shutdown (void); 00108 00109 /// On reconnect we need to move events from the old consumer 00110 /// to the new one 00111 virtual void reconnect_from_consumer (TAO_Notify_Consumer* old_consumer) = 0; 00112 00113 /// Override, Peer::qos_changed 00114 virtual void qos_changed (const TAO_Notify_QoSProperties& qos_properties); 00115 00116 /// Take the pending queue from the rhs, cancel it's timer and 00117 /// schedule our timer. The caller should have locked the proxy lock 00118 /// before calling this method. 00119 void assume_pending_events (TAO_Notify_Consumer& rhs); 00120 00121 /// Is the connected consumer still around? 00122 bool is_alive (bool allow_nil_consumer); 00123 00124 /// Estimate how many events are pending delivery for this consumer 00125 /// 00126 /// The estimate does not include events queued at the admin level which 00127 /// have not been passed to this consumer for delivery yet. 00128 size_t pending_count (void); 00129 00130 protected: 00131 00132 /// This method is called by the is_alive() method. It should provide 00133 /// the connected consumer or nil if there is none. 00134 virtual CORBA::Object_ptr get_consumer (void) = 0; 00135 00136 typedef ACE_Unbounded_Queue<TAO_Notify_Method_Request_Event_Queueable *> Request_Queue; 00137 00138 DispatchStatus dispatch_request (TAO_Notify_Method_Request_Event * request); 00139 00140 /** 00141 * \brief Attempt to dispatch event from a queue. 00142 * 00143 * Called by dispatch_pending. Deliver one or more events to the Consumer. 00144 * If delivery fails, events are left in the queue (or discarded depending 00145 * on QoS parameters.) 00146 * Undelivered, undiscarded requests are left at the front of the queue. 00147 * Overridden in sequence consumer to dispatch as an EventBatch. 00148 * \return false if delivery failed and the request(s) cannot be discarded. 00149 */ 00150 virtual bool dispatch_from_queue ( 00151 Request_Queue & requests, 00152 ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon); 00153 00154 void enqueue_request(TAO_Notify_Method_Request_Event * request); 00155 00156 /// Add request to a queue if necessary. 00157 /// Overridden by sequence consumer to "always" put incoming events into the queue. 00158 /// @returns true the request has been enqueued; false the request should be handled now. 00159 virtual bool enqueue_if_necessary( 00160 TAO_Notify_Method_Request_Event * request); 00161 00162 // Dispatch updates 00163 virtual void dispatch_updates_i (const CosNotification::EventTypeSeq& added, 00164 const CosNotification::EventTypeSeq& removed); 00165 00166 /// Get the shared Proxy Lock 00167 TAO_SYNCH_MUTEX* proxy_lock (void); 00168 00169 protected: 00170 virtual int handle_timeout (const ACE_Time_Value& current_time, 00171 const void* act = 0); 00172 00173 00174 /// Schedule timer 00175 void schedule_timer (bool is_error = false); 00176 00177 /// Cancel timer 00178 void cancel_timer (void); 00179 00180 ///= Protected Data Members 00181 protected: 00182 Request_Queue& pending_events(); 00183 00184 /// The Proxy that we associate with. 00185 TAO_Notify_ProxySupplier* proxy_; 00186 00187 /// Suspended Flag. 00188 CORBA::Boolean is_suspended_; 00189 00190 /// Interface that accepts offer_changes 00191 CosNotifyComm::NotifyPublish_var publish_; 00192 bool have_not_yet_verified_publish_; 00193 00194 /// The Pacing Interval 00195 const TAO_Notify_Property_Time & pacing_; 00196 00197 /// Max. batch size. 00198 TAO_Notify_Property_Long max_batch_size_; 00199 00200 /// Timer Id. 00201 long timer_id_; 00202 00203 /// The Timer Manager that we use. 00204 TAO_Notify_Timer::Ptr timer_; 00205 00206 /// Last time either push an event or validate connection 00207 /// via _non_exist call. 00208 ACE_Atomic_Op<TAO_SYNCH_MUTEX, ACE_Time_Value> last_ping_; 00209 00210 private: 00211 00212 /// Events pending to be delivered. 00213 ACE_Auto_Ptr< Request_Queue > pending_events_; 00214 00215 CORBA::Object_var rtt_obj_; 00216 }; 00217 00218 TAO_END_VERSIONED_NAMESPACE_DECL 00219 00220 #if defined (__ACE_INLINE__) 00221 #include "orbsvcs/Notify/Consumer.inl" 00222 #endif /* __ACE_INLINE__ */ 00223 00224 #include /**/ "ace/post.h" 00225 00226 #endif /* TAO_NOTIFY_CONSUMER_H */