Consumer.h

Go to the documentation of this file.
00001 /* -*- C++ -*- */
00002 /**
00003  *  @file Consumer.h
00004  *
00005  *  Consumer.h,v 1.16 2006/03/14 06:14:34 jtc Exp
00006  *
00007  *  @author Pradeep Gore <pradeep@oomworks.com>
00008  *
00009  *
00010  */
00011 
00012 #ifndef TAO_NOTIFY_CONSUMER_H
00013 #define TAO_NOTIFY_CONSUMER_H
00014 
00015 #include /**/ "ace/pre.h"
00016 
00017 #include "orbsvcs/Notify/notify_serv_export.h"
00018 
00019 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00020 # pragma once
00021 #endif /* ACE_LACKS_PRAGMA_ONCE */
00022 
00023 #include "orbsvcs/CosNotifyCommC.h"
00024 #include "orbsvcs/CosNotificationC.h"
00025 
00026 #include "orbsvcs/Notify/Peer.h"
00027 #include "orbsvcs/Notify/Event.h"
00028 #include "orbsvcs/Notify/Timer.h"
00029 #include "ace/Event_Handler.h"
00030 
00031 
00032 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00033 
00034 class TAO_Notify_ProxySupplier;
00035 class TAO_Notify_Proxy;
00036 class TAO_Notify_Method_Request_Event_Queueable;
00037 class TAO_Notify_Method_Request_Event;
00038 /**
00039  * @class TAO_Notify_Consumer
00040  *
00041  * @brief Astract Base class for wrapping consumer objects that connect to the EventChannel
00042  *
00043  */
00044 class TAO_Notify_Serv_Export TAO_Notify_Consumer
00045   : public TAO_Notify_Peer
00046   , public ACE_Event_Handler    // to support timer
00047 {
00048 
00049 public:
00050   /// Status returned from dispatch attempts
00051   enum DispatchStatus {
00052     DISPATCH_SUCCESS,
00053     DISPATCH_RETRY,   // retry this message
00054     DISPATCH_DISCARD, // discard this message
00055     DISPATCH_FAIL};   // discard all messages and disconnect consumer
00056 
00057 public:
00058   /// Constuctor
00059   TAO_Notify_Consumer (TAO_Notify_ProxySupplier* proxy);
00060 
00061   /// Destructor
00062   virtual ~TAO_Notify_Consumer ();
00063 
00064   /// Access Specific Proxy.
00065   TAO_Notify_ProxySupplier* proxy_supplier (void);
00066 
00067   /// Access Base Proxy.
00068   virtual TAO_Notify_Proxy* proxy (void);
00069 
00070   /// Dispatch Event to consumer
00071   void deliver (TAO_Notify_Method_Request_Event * request ACE_ENV_ARG_DECL);
00072 
00073   /// Push <event> to this consumer.
00074   virtual void push (const CORBA::Any& event ACE_ENV_ARG_DECL) = 0;
00075 
00076   /// Push <event> to this consumer.
00077   virtual void push (const CosNotification::StructuredEvent& event ACE_ENV_ARG_DECL) = 0;
00078 
00079   /// Push a batch of events to this consumer.
00080   virtual void push (const CosNotification::EventBatch& event ACE_ENV_ARG_DECL) = 0;
00081 
00082   /// Dispatch the batch of events to the attached consumer
00083   DispatchStatus dispatch_batch (const CosNotification::EventBatch& batch);
00084 
00085   /// Dispatch the pending events
00086   void dispatch_pending (ACE_ENV_SINGLE_ARG_DECL);
00087 
00088   /// Is the connection suspended?
00089   CORBA::Boolean is_suspended (void);
00090 
00091   /// Suspend Connection
00092   void suspend (ACE_ENV_SINGLE_ARG_DECL);
00093 
00094   /// Resume Connection
00095   void resume (ACE_ENV_SINGLE_ARG_DECL);
00096 
00097   /// Shutdown the consumer
00098   virtual void shutdown (ACE_ENV_SINGLE_ARG_DECL);
00099 
00100   /// on reconnect we need to move events from the old consumer
00101   /// to the new one
00102   virtual void reconnect_from_consumer (
00103     TAO_Notify_Consumer* old_consumer
00104     ACE_ENV_ARG_DECL) = 0;
00105 
00106   /// Override, Peer::qos_changed
00107   virtual void qos_changed (const TAO_Notify_QoSProperties& qos_properties);
00108 
00109 protected:
00110   typedef ACE_Unbounded_Queue<TAO_Notify_Method_Request_Event_Queueable *> Request_Queue;
00111 
00112   DispatchStatus dispatch_request (TAO_Notify_Method_Request_Event * request);
00113 
00114   /**
00115    * \brief Attempt to dispatch event from a queue.
00116    *
00117    * Called by dispatch_pending.  Deliver one or more events to the Consumer.
00118    * If delivery fails, events are left in the queue (or discarded depending
00119    * on QoS parameters.)
00120    * Undelivered, undiscarded requests are left at the front of the queue.
00121    * Overridden in sequence consumer to dispatch as an EventBatch.
00122    * \return false if delivery failed and the request(s) cannot be discarded.
00123    */
00124   virtual bool dispatch_from_queue (
00125     Request_Queue & requests,
00126     ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon);
00127 
00128   void enqueue_request(TAO_Notify_Method_Request_Event * request ACE_ENV_ARG_DECL);
00129 
00130   /// Add request to a queue if necessary.
00131   /// Overridden by sequence consumer to "always" put incoming events into the queue.
00132   /// @returns true the request has been enqueued; false the request should be handled now.
00133   virtual bool enqueue_if_necessary(
00134     TAO_Notify_Method_Request_Event * request
00135     ACE_ENV_ARG_DECL);
00136 
00137   // Dispatch updates
00138   virtual void dispatch_updates_i (const CosNotification::EventTypeSeq& added,
00139                                    const CosNotification::EventTypeSeq& removed
00140                                    ACE_ENV_ARG_DECL);
00141 
00142   /// Get the shared Proxy Lock
00143   TAO_SYNCH_MUTEX* proxy_lock (void);
00144 
00145 protected:
00146   virtual int handle_timeout (const ACE_Time_Value& current_time,
00147                               const void* act = 0);
00148 
00149 
00150   /// Schedule timer
00151   void schedule_timer (bool is_error = false);
00152 
00153   /// Cancel timer
00154   void cancel_timer (void);
00155 
00156   ///= Protected Data Members
00157 protected:
00158   Request_Queue& pending_events();
00159 
00160   /// The Proxy that we associate with.
00161   TAO_Notify_ProxySupplier* proxy_;
00162 
00163   /// Suspended Flag.
00164   CORBA::Boolean is_suspended_;
00165 
00166   /// Interface that accepts offer_changes
00167   CosNotifyComm::NotifyPublish_var publish_;
00168 
00169   /// The Pacing Interval
00170   const TAO_Notify_Property_Time & pacing_;
00171 
00172   /// Max. batch size.
00173   TAO_Notify_Property_Long max_batch_size_;
00174 
00175   /// Timer Id.
00176   long timer_id_;
00177 
00178   /// The Timer Manager that we use.
00179   TAO_Notify_Timer::Ptr timer_;
00180 
00181 private:
00182 
00183   /// Events pending to be delivered.
00184   ACE_Auto_Ptr< Request_Queue > pending_events_;
00185 };
00186 
00187 TAO_END_VERSIONED_NAMESPACE_DECL
00188 
00189 #if defined (__ACE_INLINE__)
00190 #include "orbsvcs/Notify/Consumer.inl"
00191 #endif /* __ACE_INLINE__ */
00192 
00193 #include /**/ "ace/post.h"
00194 
00195 #endif /* TAO_NOTIFY_CONSUMER_H */

Generated on Thu Nov 9 13:24:08 2006 for TAO_CosNotification by doxygen 1.3.6