00001 /* -*- C++ -*- */ 00002 /** 00003 * @file Consumer.h 00004 * 00005 * $Id: Consumer.h 79324 2007-08-13 11:20:01Z elliott_c $ 00006 * 00007 * @author Pradeep Gore <pradeep@oomworks.com> 00008 * 00009 * 00010 */ 00011 00012 #ifndef TAO_NOTIFY_CONSUMER_H 00013 #define TAO_NOTIFY_CONSUMER_H 00014 00015 #include /**/ "ace/pre.h" 00016 00017 #include "orbsvcs/Notify/notify_serv_export.h" 00018 00019 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00020 # pragma once 00021 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00022 00023 #include "orbsvcs/CosNotifyCommC.h" 00024 #include "orbsvcs/CosNotificationC.h" 00025 00026 #include "orbsvcs/Notify/Peer.h" 00027 #include "orbsvcs/Notify/Event.h" 00028 #include "orbsvcs/Notify/Timer.h" 00029 #include "ace/Event_Handler.h" 00030 00031 00032 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00033 00034 class TAO_Notify_ProxySupplier; 00035 class TAO_Notify_Proxy; 00036 class TAO_Notify_Method_Request_Event_Queueable; 00037 class TAO_Notify_Method_Request_Event; 00038 /** 00039 * @class TAO_Notify_Consumer 00040 * 00041 * @brief Astract Base class for wrapping consumer objects that connect to the EventChannel 00042 * 00043 */ 00044 class TAO_Notify_Serv_Export TAO_Notify_Consumer 00045 : public TAO_Notify_Peer 00046 , public ACE_Event_Handler // to support timer 00047 { 00048 00049 public: 00050 /// Status returned from dispatch attempts 00051 enum DispatchStatus { 00052 DISPATCH_SUCCESS, 00053 DISPATCH_RETRY, // retry this message 00054 DISPATCH_DISCARD, // discard this message 00055 DISPATCH_FAIL}; // discard all messages and disconnect consumer 00056 00057 public: 00058 /// Constuctor 00059 TAO_Notify_Consumer (TAO_Notify_ProxySupplier* proxy); 00060 00061 /// Destructor 00062 virtual ~TAO_Notify_Consumer (); 00063 00064 /// Access Specific Proxy. 00065 TAO_Notify_ProxySupplier* proxy_supplier (void); 00066 00067 /// Access Base Proxy. 00068 virtual TAO_Notify_Proxy* proxy (void); 00069 00070 /// Dispatch Event to consumer 00071 void deliver (TAO_Notify_Method_Request_Event * request); 00072 00073 /// Push <event> to this consumer. 00074 virtual void push (const CORBA::Any& event) = 0; 00075 00076 /// Push <event> to this consumer. 00077 virtual void push (const CosNotification::StructuredEvent& event) = 0; 00078 00079 /// Push a batch of events to this consumer. 00080 virtual void push (const CosNotification::EventBatch& event) = 0; 00081 00082 /// Dispatch the batch of events to the attached consumer 00083 DispatchStatus dispatch_batch (const CosNotification::EventBatch& batch); 00084 00085 /// Dispatch the pending events 00086 void dispatch_pending (void); 00087 00088 /// Is the connection suspended? 00089 CORBA::Boolean is_suspended (void); 00090 00091 /// Suspend Connection 00092 void suspend (void); 00093 00094 /// Resume Connection 00095 void resume (void); 00096 00097 /// Shutdown the consumer 00098 virtual void shutdown (void); 00099 00100 /// on reconnect we need to move events from the old consumer 00101 /// to the new one 00102 virtual void reconnect_from_consumer ( 00103 TAO_Notify_Consumer* old_consumer) = 0; 00104 00105 /// Override, Peer::qos_changed 00106 virtual void qos_changed (const TAO_Notify_QoSProperties& qos_properties); 00107 00108 /// Take the pending queue from the rhs, cancel it's timer and 00109 /// schedule our timer. The caller should have locked the proxy lock 00110 /// before calling this method. 00111 void assume_pending_events (TAO_Notify_Consumer& rhs); 00112 00113 protected: 00114 typedef ACE_Unbounded_Queue<TAO_Notify_Method_Request_Event_Queueable *> Request_Queue; 00115 00116 DispatchStatus dispatch_request (TAO_Notify_Method_Request_Event * request); 00117 00118 /** 00119 * \brief Attempt to dispatch event from a queue. 00120 * 00121 * Called by dispatch_pending. Deliver one or more events to the Consumer. 00122 * If delivery fails, events are left in the queue (or discarded depending 00123 * on QoS parameters.) 00124 * Undelivered, undiscarded requests are left at the front of the queue. 00125 * Overridden in sequence consumer to dispatch as an EventBatch. 00126 * \return false if delivery failed and the request(s) cannot be discarded. 00127 */ 00128 virtual bool dispatch_from_queue ( 00129 Request_Queue & requests, 00130 ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon); 00131 00132 void enqueue_request(TAO_Notify_Method_Request_Event * request); 00133 00134 /// Add request to a queue if necessary. 00135 /// Overridden by sequence consumer to "always" put incoming events into the queue. 00136 /// @returns true the request has been enqueued; false the request should be handled now. 00137 virtual bool enqueue_if_necessary( 00138 TAO_Notify_Method_Request_Event * request); 00139 00140 // Dispatch updates 00141 virtual void dispatch_updates_i (const CosNotification::EventTypeSeq& added, 00142 const CosNotification::EventTypeSeq& removed); 00143 00144 /// Get the shared Proxy Lock 00145 TAO_SYNCH_MUTEX* proxy_lock (void); 00146 00147 protected: 00148 virtual int handle_timeout (const ACE_Time_Value& current_time, 00149 const void* act = 0); 00150 00151 00152 /// Schedule timer 00153 void schedule_timer (bool is_error = false); 00154 00155 /// Cancel timer 00156 void cancel_timer (void); 00157 00158 ///= Protected Data Members 00159 protected: 00160 Request_Queue& pending_events(); 00161 00162 /// The Proxy that we associate with. 00163 TAO_Notify_ProxySupplier* proxy_; 00164 00165 /// Suspended Flag. 00166 CORBA::Boolean is_suspended_; 00167 00168 /// Interface that accepts offer_changes 00169 CosNotifyComm::NotifyPublish_var publish_; 00170 bool have_not_yet_verified_publish_; 00171 00172 /// The Pacing Interval 00173 const TAO_Notify_Property_Time & pacing_; 00174 00175 /// Max. batch size. 00176 TAO_Notify_Property_Long max_batch_size_; 00177 00178 /// Timer Id. 00179 long timer_id_; 00180 00181 /// The Timer Manager that we use. 00182 TAO_Notify_Timer::Ptr timer_; 00183 00184 private: 00185 00186 /// Events pending to be delivered. 00187 ACE_Auto_Ptr< Request_Queue > pending_events_; 00188 }; 00189 00190 TAO_END_VERSIONED_NAMESPACE_DECL 00191 00192 #if defined (__ACE_INLINE__) 00193 #include "orbsvcs/Notify/Consumer.inl" 00194 #endif /* __ACE_INLINE__ */ 00195 00196 #include /**/ "ace/post.h" 00197 00198 #endif /* TAO_NOTIFY_CONSUMER_H */