Consumer.h

Go to the documentation of this file.
00001 /* -*- C++ -*- */
00002 /**
00003  *  @file Consumer.h
00004  *
00005  *  $Id: Consumer.h 79324 2007-08-13 11:20:01Z elliott_c $
00006  *
00007  *  @author Pradeep Gore <pradeep@oomworks.com>
00008  *
00009  *
00010  */
00011 
00012 #ifndef TAO_NOTIFY_CONSUMER_H
00013 #define TAO_NOTIFY_CONSUMER_H
00014 
00015 #include /**/ "ace/pre.h"
00016 
00017 #include "orbsvcs/Notify/notify_serv_export.h"
00018 
00019 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00020 # pragma once
00021 #endif /* ACE_LACKS_PRAGMA_ONCE */
00022 
00023 #include "orbsvcs/CosNotifyCommC.h"
00024 #include "orbsvcs/CosNotificationC.h"
00025 
00026 #include "orbsvcs/Notify/Peer.h"
00027 #include "orbsvcs/Notify/Event.h"
00028 #include "orbsvcs/Notify/Timer.h"
00029 #include "ace/Event_Handler.h"
00030 
00031 
00032 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00033 
00034 class TAO_Notify_ProxySupplier;
00035 class TAO_Notify_Proxy;
00036 class TAO_Notify_Method_Request_Event_Queueable;
00037 class TAO_Notify_Method_Request_Event;
00038 /**
00039  * @class TAO_Notify_Consumer
00040  *
00041  * @brief Astract Base class for wrapping consumer objects that connect to the EventChannel
00042  *
00043  */
00044 class TAO_Notify_Serv_Export TAO_Notify_Consumer
00045   : public TAO_Notify_Peer
00046   , public ACE_Event_Handler    // to support timer
00047 {
00048 
00049 public:
00050   /// Status returned from dispatch attempts
00051   enum DispatchStatus {
00052     DISPATCH_SUCCESS,
00053     DISPATCH_RETRY,   // retry this message
00054     DISPATCH_DISCARD, // discard this message
00055     DISPATCH_FAIL};   // discard all messages and disconnect consumer
00056 
00057 public:
00058   /// Constuctor
00059   TAO_Notify_Consumer (TAO_Notify_ProxySupplier* proxy);
00060 
00061   /// Destructor
00062   virtual ~TAO_Notify_Consumer ();
00063 
00064   /// Access Specific Proxy.
00065   TAO_Notify_ProxySupplier* proxy_supplier (void);
00066 
00067   /// Access Base Proxy.
00068   virtual TAO_Notify_Proxy* proxy (void);
00069 
00070   /// Dispatch Event to consumer
00071   void deliver (TAO_Notify_Method_Request_Event * request);
00072 
00073   /// Push <event> to this consumer.
00074   virtual void push (const CORBA::Any& event) = 0;
00075 
00076   /// Push <event> to this consumer.
00077   virtual void push (const CosNotification::StructuredEvent& event) = 0;
00078 
00079   /// Push a batch of events to this consumer.
00080   virtual void push (const CosNotification::EventBatch& event) = 0;
00081 
00082   /// Dispatch the batch of events to the attached consumer
00083   DispatchStatus dispatch_batch (const CosNotification::EventBatch& batch);
00084 
00085   /// Dispatch the pending events
00086   void dispatch_pending (void);
00087 
00088   /// Is the connection suspended?
00089   CORBA::Boolean is_suspended (void);
00090 
00091   /// Suspend Connection
00092   void suspend (void);
00093 
00094   /// Resume Connection
00095   void resume (void);
00096 
00097   /// Shutdown the consumer
00098   virtual void shutdown (void);
00099 
00100   /// on reconnect we need to move events from the old consumer
00101   /// to the new one
00102   virtual void reconnect_from_consumer (
00103     TAO_Notify_Consumer* old_consumer) = 0;
00104 
00105   /// Override, Peer::qos_changed
00106   virtual void qos_changed (const TAO_Notify_QoSProperties& qos_properties);
00107 
00108   /// Take the pending queue from the rhs, cancel it's timer and
00109   /// schedule our timer.  The caller should have locked the proxy lock
00110   /// before calling this method.
00111   void assume_pending_events (TAO_Notify_Consumer& rhs);
00112 
00113 protected:
00114   typedef ACE_Unbounded_Queue<TAO_Notify_Method_Request_Event_Queueable *> Request_Queue;
00115 
00116   DispatchStatus dispatch_request (TAO_Notify_Method_Request_Event * request);
00117 
00118   /**
00119    * \brief Attempt to dispatch event from a queue.
00120    *
00121    * Called by dispatch_pending.  Deliver one or more events to the Consumer.
00122    * If delivery fails, events are left in the queue (or discarded depending
00123    * on QoS parameters.)
00124    * Undelivered, undiscarded requests are left at the front of the queue.
00125    * Overridden in sequence consumer to dispatch as an EventBatch.
00126    * \return false if delivery failed and the request(s) cannot be discarded.
00127    */
00128   virtual bool dispatch_from_queue (
00129     Request_Queue & requests,
00130     ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon);
00131 
00132   void enqueue_request(TAO_Notify_Method_Request_Event * request);
00133 
00134   /// Add request to a queue if necessary.
00135   /// Overridden by sequence consumer to "always" put incoming events into the queue.
00136   /// @returns true the request has been enqueued; false the request should be handled now.
00137   virtual bool enqueue_if_necessary(
00138     TAO_Notify_Method_Request_Event * request);
00139 
00140   // Dispatch updates
00141   virtual void dispatch_updates_i (const CosNotification::EventTypeSeq& added,
00142                                    const CosNotification::EventTypeSeq& removed);
00143 
00144   /// Get the shared Proxy Lock
00145   TAO_SYNCH_MUTEX* proxy_lock (void);
00146 
00147 protected:
00148   virtual int handle_timeout (const ACE_Time_Value& current_time,
00149                               const void* act = 0);
00150 
00151 
00152   /// Schedule timer
00153   void schedule_timer (bool is_error = false);
00154 
00155   /// Cancel timer
00156   void cancel_timer (void);
00157 
00158   ///= Protected Data Members
00159 protected:
00160   Request_Queue& pending_events();
00161 
00162   /// The Proxy that we associate with.
00163   TAO_Notify_ProxySupplier* proxy_;
00164 
00165   /// Suspended Flag.
00166   CORBA::Boolean is_suspended_;
00167 
00168   /// Interface that accepts offer_changes
00169   CosNotifyComm::NotifyPublish_var publish_;
00170   bool have_not_yet_verified_publish_;
00171 
00172   /// The Pacing Interval
00173   const TAO_Notify_Property_Time & pacing_;
00174 
00175   /// Max. batch size.
00176   TAO_Notify_Property_Long max_batch_size_;
00177 
00178   /// Timer Id.
00179   long timer_id_;
00180 
00181   /// The Timer Manager that we use.
00182   TAO_Notify_Timer::Ptr timer_;
00183 
00184 private:
00185 
00186   /// Events pending to be delivered.
00187   ACE_Auto_Ptr< Request_Queue > pending_events_;
00188 };
00189 
00190 TAO_END_VERSIONED_NAMESPACE_DECL
00191 
00192 #if defined (__ACE_INLINE__)
00193 #include "orbsvcs/Notify/Consumer.inl"
00194 #endif /* __ACE_INLINE__ */
00195 
00196 #include /**/ "ace/post.h"
00197 
00198 #endif /* TAO_NOTIFY_CONSUMER_H */

Generated on Sun Jan 27 15:39:53 2008 for TAO_CosNotification by doxygen 1.3.6