00001 /* -*- C++ -*- */ 00002 /** 00003 * @file Consumer.h 00004 * 00005 * Consumer.h,v 1.16 2006/03/14 06:14:34 jtc Exp 00006 * 00007 * @author Pradeep Gore <pradeep@oomworks.com> 00008 * 00009 * 00010 */ 00011 00012 #ifndef TAO_NOTIFY_CONSUMER_H 00013 #define TAO_NOTIFY_CONSUMER_H 00014 00015 #include /**/ "ace/pre.h" 00016 00017 #include "orbsvcs/Notify/notify_serv_export.h" 00018 00019 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00020 # pragma once 00021 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00022 00023 #include "orbsvcs/CosNotifyCommC.h" 00024 #include "orbsvcs/CosNotificationC.h" 00025 00026 #include "orbsvcs/Notify/Peer.h" 00027 #include "orbsvcs/Notify/Event.h" 00028 #include "orbsvcs/Notify/Timer.h" 00029 #include "ace/Event_Handler.h" 00030 00031 00032 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00033 00034 class TAO_Notify_ProxySupplier; 00035 class TAO_Notify_Proxy; 00036 class TAO_Notify_Method_Request_Event_Queueable; 00037 class TAO_Notify_Method_Request_Event; 00038 /** 00039 * @class TAO_Notify_Consumer 00040 * 00041 * @brief Astract Base class for wrapping consumer objects that connect to the EventChannel 00042 * 00043 */ 00044 class TAO_Notify_Serv_Export TAO_Notify_Consumer 00045 : public TAO_Notify_Peer 00046 , public ACE_Event_Handler // to support timer 00047 { 00048 00049 public: 00050 /// Status returned from dispatch attempts 00051 enum DispatchStatus { 00052 DISPATCH_SUCCESS, 00053 DISPATCH_RETRY, // retry this message 00054 DISPATCH_DISCARD, // discard this message 00055 DISPATCH_FAIL}; // discard all messages and disconnect consumer 00056 00057 public: 00058 /// Constuctor 00059 TAO_Notify_Consumer (TAO_Notify_ProxySupplier* proxy); 00060 00061 /// Destructor 00062 virtual ~TAO_Notify_Consumer (); 00063 00064 /// Access Specific Proxy. 00065 TAO_Notify_ProxySupplier* proxy_supplier (void); 00066 00067 /// Access Base Proxy. 00068 virtual TAO_Notify_Proxy* proxy (void); 00069 00070 /// Dispatch Event to consumer 00071 void deliver (TAO_Notify_Method_Request_Event * request ACE_ENV_ARG_DECL); 00072 00073 /// Push <event> to this consumer. 00074 virtual void push (const CORBA::Any& event ACE_ENV_ARG_DECL) = 0; 00075 00076 /// Push <event> to this consumer. 00077 virtual void push (const CosNotification::StructuredEvent& event ACE_ENV_ARG_DECL) = 0; 00078 00079 /// Push a batch of events to this consumer. 00080 virtual void push (const CosNotification::EventBatch& event ACE_ENV_ARG_DECL) = 0; 00081 00082 /// Dispatch the batch of events to the attached consumer 00083 DispatchStatus dispatch_batch (const CosNotification::EventBatch& batch); 00084 00085 /// Dispatch the pending events 00086 void dispatch_pending (ACE_ENV_SINGLE_ARG_DECL); 00087 00088 /// Is the connection suspended? 00089 CORBA::Boolean is_suspended (void); 00090 00091 /// Suspend Connection 00092 void suspend (ACE_ENV_SINGLE_ARG_DECL); 00093 00094 /// Resume Connection 00095 void resume (ACE_ENV_SINGLE_ARG_DECL); 00096 00097 /// Shutdown the consumer 00098 virtual void shutdown (ACE_ENV_SINGLE_ARG_DECL); 00099 00100 /// on reconnect we need to move events from the old consumer 00101 /// to the new one 00102 virtual void reconnect_from_consumer ( 00103 TAO_Notify_Consumer* old_consumer 00104 ACE_ENV_ARG_DECL) = 0; 00105 00106 /// Override, Peer::qos_changed 00107 virtual void qos_changed (const TAO_Notify_QoSProperties& qos_properties); 00108 00109 protected: 00110 typedef ACE_Unbounded_Queue<TAO_Notify_Method_Request_Event_Queueable *> Request_Queue; 00111 00112 DispatchStatus dispatch_request (TAO_Notify_Method_Request_Event * request); 00113 00114 /** 00115 * \brief Attempt to dispatch event from a queue. 00116 * 00117 * Called by dispatch_pending. Deliver one or more events to the Consumer. 00118 * If delivery fails, events are left in the queue (or discarded depending 00119 * on QoS parameters.) 00120 * Undelivered, undiscarded requests are left at the front of the queue. 00121 * Overridden in sequence consumer to dispatch as an EventBatch. 00122 * \return false if delivery failed and the request(s) cannot be discarded. 00123 */ 00124 virtual bool dispatch_from_queue ( 00125 Request_Queue & requests, 00126 ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon); 00127 00128 void enqueue_request(TAO_Notify_Method_Request_Event * request ACE_ENV_ARG_DECL); 00129 00130 /// Add request to a queue if necessary. 00131 /// Overridden by sequence consumer to "always" put incoming events into the queue. 00132 /// @returns true the request has been enqueued; false the request should be handled now. 00133 virtual bool enqueue_if_necessary( 00134 TAO_Notify_Method_Request_Event * request 00135 ACE_ENV_ARG_DECL); 00136 00137 // Dispatch updates 00138 virtual void dispatch_updates_i (const CosNotification::EventTypeSeq& added, 00139 const CosNotification::EventTypeSeq& removed 00140 ACE_ENV_ARG_DECL); 00141 00142 /// Get the shared Proxy Lock 00143 TAO_SYNCH_MUTEX* proxy_lock (void); 00144 00145 protected: 00146 virtual int handle_timeout (const ACE_Time_Value& current_time, 00147 const void* act = 0); 00148 00149 00150 /// Schedule timer 00151 void schedule_timer (bool is_error = false); 00152 00153 /// Cancel timer 00154 void cancel_timer (void); 00155 00156 ///= Protected Data Members 00157 protected: 00158 Request_Queue& pending_events(); 00159 00160 /// The Proxy that we associate with. 00161 TAO_Notify_ProxySupplier* proxy_; 00162 00163 /// Suspended Flag. 00164 CORBA::Boolean is_suspended_; 00165 00166 /// Interface that accepts offer_changes 00167 CosNotifyComm::NotifyPublish_var publish_; 00168 00169 /// The Pacing Interval 00170 const TAO_Notify_Property_Time & pacing_; 00171 00172 /// Max. batch size. 00173 TAO_Notify_Property_Long max_batch_size_; 00174 00175 /// Timer Id. 00176 long timer_id_; 00177 00178 /// The Timer Manager that we use. 00179 TAO_Notify_Timer::Ptr timer_; 00180 00181 private: 00182 00183 /// Events pending to be delivered. 00184 ACE_Auto_Ptr< Request_Queue > pending_events_; 00185 }; 00186 00187 TAO_END_VERSIONED_NAMESPACE_DECL 00188 00189 #if defined (__ACE_INLINE__) 00190 #include "orbsvcs/Notify/Consumer.inl" 00191 #endif /* __ACE_INLINE__ */ 00192 00193 #include /**/ "ace/post.h" 00194 00195 #endif /* TAO_NOTIFY_CONSUMER_H */