Consumer.h

Go to the documentation of this file.
00001 /* -*- C++ -*- */
00002 /**
00003  *  @file Consumer.h
00004  *
00005  *  $Id: Consumer.h 81422 2008-04-24 12:33:29Z johnnyw $
00006  *
00007  *  @author Pradeep Gore <pradeep@oomworks.com>
00008  *
00009  *
00010  */
00011 
00012 #ifndef TAO_NOTIFY_CONSUMER_H
00013 #define TAO_NOTIFY_CONSUMER_H
00014 
00015 #include /**/ "ace/pre.h"
00016 
00017 #include "orbsvcs/Notify/notify_serv_export.h"
00018 
00019 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00020 # pragma once
00021 #endif /* ACE_LACKS_PRAGMA_ONCE */
00022 
00023 #include "orbsvcs/CosNotifyCommC.h"
00024 #include "orbsvcs/CosNotificationC.h"
00025 
00026 #include "orbsvcs/Notify/Peer.h"
00027 #include "orbsvcs/Notify/Event.h"
00028 #include "orbsvcs/Notify/Timer.h"
00029 #include "ace/Event_Handler.h"
00030 
00031 
00032 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00033 
00034 class TAO_Notify_ProxySupplier;
00035 class TAO_Notify_Proxy;
00036 class TAO_Notify_Method_Request_Event_Queueable;
00037 class TAO_Notify_Method_Request_Event;
00038 /**
00039  * @class TAO_Notify_Consumer
00040  *
00041  * @brief Astract Base class for wrapping consumer objects that connect to the EventChannel
00042  *
00043  */
00044 class TAO_Notify_Serv_Export TAO_Notify_Consumer
00045   : public TAO_Notify_Peer
00046   , public ACE_Event_Handler    // to support timer
00047 {
00048 
00049 public:
00050   /// Status returned from dispatch attempts
00051   enum DispatchStatus {
00052     DISPATCH_SUCCESS,
00053     DISPATCH_RETRY,   // retry this message
00054     DISPATCH_DISCARD, // discard this message
00055     DISPATCH_FAIL};   // discard all messages and disconnect consumer
00056 
00057 public:
00058   /// Constructor
00059   TAO_Notify_Consumer (TAO_Notify_ProxySupplier* proxy);
00060 
00061   /// Destructor
00062   virtual ~TAO_Notify_Consumer ();
00063 
00064   /// Access Specific Proxy.
00065   TAO_Notify_ProxySupplier* proxy_supplier (void);
00066 
00067   /// Access Base Proxy.
00068   virtual TAO_Notify_Proxy* proxy (void);
00069 
00070   /// Dispatch Event to consumer
00071   void deliver (TAO_Notify_Method_Request_Event * request);
00072 
00073   /// Push @a event to this consumer.
00074   virtual void push (const CORBA::Any& event) = 0;
00075 
00076   /// Push @a event to this consumer.
00077   virtual void push (const CosNotification::StructuredEvent& event) = 0;
00078 
00079   /// Push a batch of events to this consumer.
00080   virtual void push (const CosNotification::EventBatch& event) = 0;
00081 
00082   /// Dispatch the batch of events to the attached consumer
00083   DispatchStatus dispatch_batch (const CosNotification::EventBatch& batch);
00084 
00085   /// Dispatch the pending events
00086   void dispatch_pending (void);
00087 
00088   /// Is the connection suspended?
00089   CORBA::Boolean is_suspended (void);
00090 
00091   /// Suspend Connection
00092   void suspend (void);
00093 
00094   /// Resume Connection
00095   void resume (void);
00096 
00097   /// Shutdown the consumer
00098   virtual void shutdown (void);
00099 
00100   /// On reconnect we need to move events from the old consumer
00101   /// to the new one
00102   virtual void reconnect_from_consumer (TAO_Notify_Consumer* old_consumer) = 0;
00103 
00104   /// Override, Peer::qos_changed
00105   virtual void qos_changed (const TAO_Notify_QoSProperties& qos_properties);
00106 
00107   /// Take the pending queue from the rhs, cancel it's timer and
00108   /// schedule our timer.  The caller should have locked the proxy lock
00109   /// before calling this method.
00110   void assume_pending_events (TAO_Notify_Consumer& rhs);
00111 
00112 protected:
00113   typedef ACE_Unbounded_Queue<TAO_Notify_Method_Request_Event_Queueable *> Request_Queue;
00114 
00115   DispatchStatus dispatch_request (TAO_Notify_Method_Request_Event * request);
00116 
00117   /**
00118    * \brief Attempt to dispatch event from a queue.
00119    *
00120    * Called by dispatch_pending.  Deliver one or more events to the Consumer.
00121    * If delivery fails, events are left in the queue (or discarded depending
00122    * on QoS parameters.)
00123    * Undelivered, undiscarded requests are left at the front of the queue.
00124    * Overridden in sequence consumer to dispatch as an EventBatch.
00125    * \return false if delivery failed and the request(s) cannot be discarded.
00126    */
00127   virtual bool dispatch_from_queue (
00128     Request_Queue & requests,
00129     ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon);
00130 
00131   void enqueue_request(TAO_Notify_Method_Request_Event * request);
00132 
00133   /// Add request to a queue if necessary.
00134   /// Overridden by sequence consumer to "always" put incoming events into the queue.
00135   /// @returns true the request has been enqueued; false the request should be handled now.
00136   virtual bool enqueue_if_necessary(
00137     TAO_Notify_Method_Request_Event * request);
00138 
00139   // Dispatch updates
00140   virtual void dispatch_updates_i (const CosNotification::EventTypeSeq& added,
00141                                    const CosNotification::EventTypeSeq& removed);
00142 
00143   /// Get the shared Proxy Lock
00144   TAO_SYNCH_MUTEX* proxy_lock (void);
00145 
00146 protected:
00147   virtual int handle_timeout (const ACE_Time_Value& current_time,
00148                               const void* act = 0);
00149 
00150 
00151   /// Schedule timer
00152   void schedule_timer (bool is_error = false);
00153 
00154   /// Cancel timer
00155   void cancel_timer (void);
00156 
00157   ///= Protected Data Members
00158 protected:
00159   Request_Queue& pending_events();
00160 
00161   /// The Proxy that we associate with.
00162   TAO_Notify_ProxySupplier* proxy_;
00163 
00164   /// Suspended Flag.
00165   CORBA::Boolean is_suspended_;
00166 
00167   /// Interface that accepts offer_changes
00168   CosNotifyComm::NotifyPublish_var publish_;
00169   bool have_not_yet_verified_publish_;
00170 
00171   /// The Pacing Interval
00172   const TAO_Notify_Property_Time & pacing_;
00173 
00174   /// Max. batch size.
00175   TAO_Notify_Property_Long max_batch_size_;
00176 
00177   /// Timer Id.
00178   long timer_id_;
00179 
00180   /// The Timer Manager that we use.
00181   TAO_Notify_Timer::Ptr timer_;
00182 
00183 private:
00184 
00185   /// Events pending to be delivered.
00186   ACE_Auto_Ptr< Request_Queue > pending_events_;
00187 };
00188 
00189 TAO_END_VERSIONED_NAMESPACE_DECL
00190 
00191 #if defined (__ACE_INLINE__)
00192 #include "orbsvcs/Notify/Consumer.inl"
00193 #endif /* __ACE_INLINE__ */
00194 
00195 #include /**/ "ace/post.h"
00196 
00197 #endif /* TAO_NOTIFY_CONSUMER_H */

Generated on Tue Feb 2 17:45:28 2010 for TAO_CosNotification by  doxygen 1.4.7