ECG_Mcast_EH.h

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 
00003 /**
00004  * @file ECG_Mcast_EH.h
00005  *
00006  * $Id: ECG_Mcast_EH.h 77001 2007-02-12 07:54:49Z johnnyw $
00007  *
00008  * @author Carlos O'Ryan <coryan@uci.edu>
00009  * @author Jaiganesh Balasubramanian <jai@doc.ece.uci.edu>
00010  * @author Marina Spivak <marina@atdesk.com>
00011  * @author Don Hinton <dhinton@ieee.org>
00012  *
00013  * http://doc.ece.uci.edu/~coryan/EC/index.html
00014  *
00015  */
00016 #ifndef TAO_ECG_MCAST_EH_H
00017 #define TAO_ECG_MCAST_EH_H
00018 
00019 #include /**/ "ace/pre.h"
00020 #include "ace/Event_Handler.h"
00021 
00022 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00023 # pragma once
00024 #endif /* ACE_LACKS_PRAGMA_ONCE */
00025 
00026 #include "ace/Unbounded_Set.h"
00027 #include "ace/Array_Base.h"
00028 #include "ace/SOCK_Dgram_Mcast.h"
00029 
00030 #include "orbsvcs/RtecEventChannelAdminS.h"
00031 
00032 #include /**/ "orbsvcs/Event/event_serv_export.h"
00033 #include "orbsvcs/Event/ECG_Adapters.h"
00034 #include "orbsvcs/Event/EC_Lifetime_Utils.h"
00035 #include "orbsvcs/Event/EC_Lifetime_Utils_T.h"
00036 
00037 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00038 
00039 /**
00040  * @class TAO_ECG_Mcast_EH
00041  *
00042  * @brief Event Handler for Mcast messages.
00043  *        NOT THREAD-SAFE.
00044  *
00045  * This object acts as an Observer to Event Channel.  It subscribes to
00046  * multicast groups that carry events matching the EC's subscriptions.
00047  * This object then receives callbacks from the Reactor when data is
00048  * available on the mcast sockets and alerts TAO_ECG_Dgram_Handler,
00049  * which reads the data, transforms it into event and pushes to the
00050  * Event Channel.
00051  */
00052 class TAO_RTEvent_Serv_Export TAO_ECG_Mcast_EH :
00053   public ACE_Event_Handler,
00054   public TAO_ECG_Handler_Shutdown
00055 {
00056 public:
00057 
00058   /// Initialization and termination methods.
00059   //@{
00060   /**
00061    * Constructor.  Messages received by this EH will be forwarded to
00062    * the \a recv.  \a net_if can be used to specify NIC where multicast
00063    * messages are expected. \buf_sz would be used to alter the default
00064    * buffer size.
00065    *
00066    * See comments for receiver_ data member on why raw pointer is
00067    * used for the \a recv argument.
00068    */
00069   TAO_ECG_Mcast_EH (TAO_ECG_Dgram_Handler *recv,
00070                     const ACE_TCHAR *net_if = 0,
00071             CORBA::ULong buf_sz = 0);
00072 
00073   /// Destructor.
00074   virtual ~TAO_ECG_Mcast_EH (void);
00075 
00076   /**
00077    * Register for changes in the EC subscription list.
00078    * When the subscription list becomes non-empty we join the proper
00079    * multicast groups (using Dgram_Handler to translate between event
00080    * types and mcast groups) and the class registers itself with the
00081    * reactor.
00082    *
00083    * To insure proper resource clean up, if open () is successful,
00084    * the user MUST call shutdown () when handler is no longer needed
00085    * (and its reactor still exists).
00086    */
00087   void open (RtecEventChannelAdmin::EventChannel_ptr ec);
00088 
00089   /// TAO_ECG_Handler_Shutdown method.
00090   /**
00091    * Remove ourselves from the event channel, unsubscribe from the
00092    * multicast groups, close the sockets and deregister from the
00093    * reactor.
00094    */
00095   virtual int shutdown (void);
00096   //@}
00097 
00098   /// Reactor callback.  Notify receiver_ that a dgram corresponding
00099   /// to \a fd is ready for reading.
00100   virtual int handle_input (ACE_HANDLE fd);
00101 
00102 private:
00103 
00104   /**
00105    * @class Observer
00106    *
00107    * @brief Observes changes in the EC consumer subscriptions and notifies
00108    *        TAO_ECG_Mcast_EH  when there are changes.
00109    */
00110   class Observer :
00111     public virtual POA_RtecEventChannelAdmin::Observer,
00112     public TAO_EC_Deactivated_Object
00113   {
00114   public:
00115     /// Constructor.  Changes in the EC subscriptions will be reported
00116     /// to the \a eh.
00117     Observer (TAO_ECG_Mcast_EH* eh);
00118 
00119     /// Shut down the observer: disconnect from EC and deactivate from
00120     /// POA.
00121     void shutdown (void);
00122 
00123     /// Event Channel Observer methods
00124     //@{
00125     virtual void update_consumer (
00126         const RtecEventChannelAdmin::ConsumerQOS& sub);
00127     virtual void update_supplier (
00128         const RtecEventChannelAdmin::SupplierQOS& pub);
00129 
00130   private:
00131     /// Handler we notify of subscriptions changes.
00132     /*
00133      * Observer can keep a raw pointer to mcast handler, because the handler
00134      * guarantees to notify the observer (by calling shutdown ())
00135      * before going away.
00136      */
00137     TAO_ECG_Mcast_EH *eh_;
00138   };
00139 
00140   /// Make update_consumer () accessible to Observer.
00141   friend class Observer;
00142 
00143   /// The Observer method.  Subscribe/unsubscribe to multicast groups
00144   /// according to changes in consumer subscriptions.
00145   void update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub);
00146 
00147 
00148   typedef ACE_Unbounded_Set<ACE_INET_Addr> Address_Set;
00149 
00150   /// Helpers for updating multicast subscriptions based on changes in
00151   /// consumer subscriptions.
00152   //@{
00153   /// Compute the list of multicast addresses that we need to be
00154   /// subscribed to, in order to receive the events described in the
00155   /// ConsumerQOS parameter.
00156   /**
00157    * @param sub The list of event types that our event channel
00158    *        consumers are interested in.
00159    * @param multicast_addresses This method populates this list with
00160    *        multicast addresses that we need to be subscribed to in
00161    *        order to receive event types specified in /a sub.
00162    *
00163    * @throw CORBA::SystemException This method needs to perform
00164    *        several CORBA invocations, and it propagates any exceptions
00165    *        back to the caller.
00166    */
00167   void compute_required_subscriptions (
00168         const RtecEventChannelAdmin::ConsumerQOS& sub,
00169               Address_Set& multicast_addresses);
00170 
00171   /// Unsubscribe from any multicast addresses we are currently
00172   /// subscribed to that are not in the \a multicast_addresses list.
00173   /// Also remove from /a multicast_addresses any addresses to which we are
00174   /// already subscribed.
00175   /**
00176    * @param multicast_addresses List of multicast
00177    *        addresses we need to be subscribed to in order receive all
00178    *        event types in the current consumer subscriptions.
00179    */
00180   int delete_unwanted_subscriptions (
00181               Address_Set& multicast_addresses);
00182 
00183   /// Subscribe to all multicast addresses in /a multicast_addresses -
00184   /// we are not subscribed to them yet, but need to be.
00185   /**
00186    * @param multicast_addresses List of multicast addresses to which
00187    *        we need to subscribe to in order to be receiving all event
00188    *        types in the current consumer subscriptions.
00189    */
00190   void add_new_subscriptions (
00191               Address_Set& multicast_addresses);
00192   //@}
00193 
00194   /**
00195    * @class Observer_Disconnect_Command
00196    *
00197    * @brief Disconnects Observer from the Event Channel
00198    *
00199    * Utility class for use as a template argument to TAO_EC_Auto_Command.
00200    * TAO_EC_Auto_Command<Observer_Disconnect_Command> manages
00201    * observer connection to the Event Channel, automatically
00202    * disconnecting from ec in its destructor, if necessary.
00203    */
00204   class TAO_RTEvent_Serv_Export Observer_Disconnect_Command
00205   {
00206   public:
00207     Observer_Disconnect_Command (void);
00208     Observer_Disconnect_Command (RtecEventChannelAdmin::Observer_Handle handle,
00209                                  RtecEventChannelAdmin::EventChannel_ptr ec);
00210 
00211     Observer_Disconnect_Command (const Observer_Disconnect_Command &rhs);
00212     Observer_Disconnect_Command & operator= (const Observer_Disconnect_Command & rhs);
00213 
00214     void execute (void);
00215 
00216   private:
00217 
00218     RtecEventChannelAdmin::Observer_Handle handle_;
00219     RtecEventChannelAdmin::EventChannel_var ec_;
00220   };
00221 
00222 private:
00223   /// The NIC used to subscribe for multicast traffic.
00224   ACE_TCHAR *net_if_;
00225 
00226   typedef struct {
00227     ACE_INET_Addr mcast_addr;
00228     ACE_SOCK_Dgram_Mcast* dgram;
00229   } Subscription;
00230   typedef ACE_Array_Base<Subscription> Subscriptions;
00231 
00232   /// List of multicast addresses we subscribe to and dgrams we use.
00233   /*
00234    * We use a dedicated socket for each multicast subscription.  The
00235    * reason: we assume the underlying software, i.e., ACE, binds each
00236    * socket used to receive multicast to the multicast group (mcast addr
00237    * + port) to avoid receiving promiscuous traffic, in which case it is
00238    * not possible to subscribe to more than one mcast address on the same
00239    * socket.
00240    *
00241    * Performance.  We use array to store subscriptions (mcast addr / dgram
00242    * pairs).  If performance is not adequate, we should look into
00243    * using a hash map, keyed on file descriptors, instead.  When there
00244    * are many subscriptions, handle_input() is likely to be more
00245    * efficient with a hash lookup than an array iteration for locating a
00246    * target dgram.  Difference in costs of subscripton changes between
00247    * hash map and array would need to be looked at as well, although
00248    * it is probably highly dependent on the pattern of changes.
00249    */
00250   Subscriptions subscriptions_;
00251 
00252   /// We callback to this object when a message arrives.
00253   /*
00254    * We can keep a raw pointer to the receiver (even though it may
00255    * be a refcounted object) because receiver guarantees
00256    * to notify us (by calling shutdown ()) before going away.
00257    *
00258    * We have to use raw pointer instead of a refcounting mechanism
00259    * here to avoid a circular refcounting dependency between
00260    * receiver and handler.
00261    */
00262   TAO_ECG_Dgram_Handler * receiver_;
00263 
00264   /// SOCKbuf size
00265   CORBA::ULong recvbuf_size_;
00266 
00267   /// Event Channel Observer.  Detects changes in EC consumer subscriptions.
00268   /// ORDER DEPENDENCY: this member should be declared before
00269   /// <auto_observer_disconnect_>.
00270   TAO_EC_Servant_Var<Observer> observer_;
00271 
00272   /// Manages connection of our observer to the Event Channel.
00273   /// ORDER DEPENDENCY: this member should be declared AFTER <observer_>.
00274   TAO_EC_Auto_Command<Observer_Disconnect_Command> auto_observer_disconnect_;
00275 };
00276 
00277 TAO_END_VERSIONED_NAMESPACE_DECL
00278 
00279 #if defined(__ACE_INLINE__)
00280 #include "orbsvcs/Event/ECG_Mcast_EH.inl"
00281 #endif /* __ACE_INLINE__ */
00282 
00283 #include /**/ "ace/post.h"
00284 
00285 #endif /* TAO_ECG_Mcast_EH_H */

Generated on Tue Feb 2 17:44:06 2010 for TAO_RTEvent by  doxygen 1.4.7