ECG_Mcast_EH.h

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 
00003 /**
00004  * @file ECG_Mcast_EH.h
00005  *
00006  * ECG_Mcast_EH.h,v 1.19 2006/03/15 07:52:21 jtc Exp
00007  *
00008  * @author Carlos O'Ryan <coryan@uci.edu>
00009  * @author Jaiganesh Balasubramanian <jai@doc.ece.uci.edu>
00010  * @author Marina Spivak <marina@atdesk.com>
00011  * @author Don Hinton <dhinton@ieee.org>
00012  *
00013  * http://doc.ece.uci.edu/~coryan/EC/index.html
00014  *
00015  */
00016 #ifndef TAO_ECG_MCAST_EH_H
00017 #define TAO_ECG_MCAST_EH_H
00018 
00019 #include /**/ "ace/pre.h"
00020 #include "ace/Event_Handler.h"
00021 
00022 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00023 # pragma once
00024 #endif /* ACE_LACKS_PRAGMA_ONCE */
00025 
00026 #include "ace/Unbounded_Set.h"
00027 #include "ace/Array_Base.h"
00028 #include "ace/SOCK_Dgram_Mcast.h"
00029 
00030 #include "orbsvcs/RtecEventChannelAdminS.h"
00031 
00032 #include /**/ "orbsvcs/Event/event_serv_export.h"
00033 #include "orbsvcs/Event/ECG_Adapters.h"
00034 #include "orbsvcs/Event/EC_Lifetime_Utils.h"
00035 #include "orbsvcs/Event/EC_Lifetime_Utils_T.h"
00036 
00037 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00038 
00039 /**
00040  * @class TAO_ECG_Mcast_EH
00041  *
00042  * @brief Event Handler for Mcast messages.
00043  *        NOT THREAD-SAFE.
00044  *
00045  * This object acts as an Observer to Event Channel.  It subscribes to
00046  * multicast groups that carry events matching the EC's subscriptions.
00047  * This object then receives callbacks from the Reactor when data is
00048  * available on the mcast sockets and alerts TAO_ECG_Dgram_Handler,
00049  * which reads the data, transforms it into event and pushes to the
00050  * Event Channel.
00051  */
00052 class TAO_RTEvent_Serv_Export TAO_ECG_Mcast_EH :
00053   public ACE_Event_Handler,
00054   public TAO_ECG_Handler_Shutdown
00055 {
00056 public:
00057 
00058   /// Initialization and termination methods.
00059   //@{
00060   /**
00061    * Constructor.  Messages received by this EH will be forwarded to
00062    * the \a recv.  \a net_if can be used to specify NIC where multicast
00063    * messages are expected. \buf_sz would be used to alter the default
00064    * buffer size.
00065    *
00066    * See comments for receiver_ data member on why raw pointer is
00067    * used for the \a recv argument.
00068    */
00069   TAO_ECG_Mcast_EH (TAO_ECG_Dgram_Handler *recv,
00070                     const ACE_TCHAR *net_if = 0,
00071                     CORBA::ULong buf_sz = 0);
00072 
00073   /// Destructor.
00074   virtual ~TAO_ECG_Mcast_EH (void);
00075 
00076   /**
00077    * Register for changes in the EC subscription list.
00078    * When the subscription list becomes non-empty we join the proper
00079    * multicast groups (using Dgram_Handler to translate between event
00080    * types and mcast groups) and the class registers itself with the
00081    * reactor.
00082    *
00083    * To insure proper resource clean up, if open () is successful,
00084    * the user MUST call shutdown () when handler is no longer needed
00085    * (and its reactor still exists).
00086    */
00087   void open (RtecEventChannelAdmin::EventChannel_ptr ec
00088              ACE_ENV_ARG_DECL_WITH_DEFAULTS);
00089 
00090   /// TAO_ECG_Handler_Shutdown method.
00091   /**
00092    * Remove ourselves from the event channel, unsubscribe from the
00093    * multicast groups, close the sockets and deregister from the
00094    * reactor.
00095    */
00096   virtual int shutdown (void);
00097   //@}
00098 
00099   /// Reactor callback.  Notify receiver_ that a dgram corresponding
00100   /// to \a fd is ready for reading.
00101   virtual int handle_input (ACE_HANDLE fd);
00102 
00103 private:
00104 
00105   /**
00106    * @class Observer
00107    *
00108    * @brief Observes changes in the EC consumer subscriptions and notifies
00109    *        TAO_ECG_Mcast_EH  when there are changes.
00110    */
00111   class Observer :
00112     public virtual POA_RtecEventChannelAdmin::Observer,
00113     public TAO_EC_Deactivated_Object
00114   {
00115   public:
00116     /// Constructor.  Changes in the EC subscriptions will be reported
00117     /// to the \a eh.
00118     Observer (TAO_ECG_Mcast_EH* eh);
00119 
00120     /// Shut down the observer: disconnect from EC and deactivate from
00121     /// POA.
00122     void shutdown (void);
00123 
00124     /// Event Channel Observer methods
00125     //@{
00126     virtual void update_consumer (
00127         const RtecEventChannelAdmin::ConsumerQOS& sub
00128         ACE_ENV_ARG_DECL_WITH_DEFAULTS)
00129       ACE_THROW_SPEC ((CORBA::SystemException));
00130     virtual void update_supplier (
00131         const RtecEventChannelAdmin::SupplierQOS& pub
00132         ACE_ENV_ARG_DECL_WITH_DEFAULTS)
00133       ACE_THROW_SPEC ((CORBA::SystemException));
00134 
00135   private:
00136     /// Handler we notify of subscriptions changes.
00137     /*
00138      * Observer can keep a raw pointer to mcast handler, because the handler
00139      * guarantees to notify the observer (by calling shutdown ())
00140      * before going away.
00141      */
00142     TAO_ECG_Mcast_EH *eh_;
00143   };
00144 
00145   /// Make update_consumer () accessible to Observer.
00146   friend class Observer;
00147 
00148   /// The Observer method.  Subscribe/unsubscribe to multicast groups
00149   /// according to changes in consumer subscriptions.
00150   void update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub
00151                         ACE_ENV_ARG_DECL)
00152       ACE_THROW_SPEC ((CORBA::SystemException));
00153 
00154 
00155   typedef ACE_Unbounded_Set<ACE_INET_Addr> Address_Set;
00156 
00157   /// Helpers for updating multicast subscriptions based on changes in
00158   /// consumer subscriptions.
00159   //@{
00160   /// Compute the list of multicast addresses that we need to be
00161   /// subscribed to, in order to receive the events described in the
00162   /// ConsumerQOS parameter.
00163   /**
00164    * @param sub The list of event types that our event channel
00165    *        consumers are interested in.
00166    * @param multicast_addresses This method populates this list with
00167    *        multicast addresses that we need to be subscribed to in
00168    *        order to receive event types specified in /a sub.
00169    *
00170    * @throw CORBA::SystemException This method needs to perform
00171    *        several CORBA invocations, and it propagates any exceptions
00172    *        back to the caller.
00173    */
00174   void compute_required_subscriptions (
00175         const RtecEventChannelAdmin::ConsumerQOS& sub,
00176               Address_Set& multicast_addresses
00177               ACE_ENV_ARG_DECL)
00178               ACE_THROW_SPEC ((CORBA::SystemException));
00179 
00180   /// Unsubscribe from any multicast addresses we are currently
00181   /// subscribed to that are not in the \a multicast_addresses list.
00182   /// Also remove from /a multicast_addresses any addresses to which we are
00183   /// already subscribed.
00184   /**
00185    * @param multicast_addresses List of multicast
00186    *        addresses we need to be subscribed to in order receive all
00187    *        event types in the current consumer subscriptions.
00188    */
00189   int delete_unwanted_subscriptions (
00190               Address_Set& multicast_addresses);
00191 
00192   /// Subscribe to all multicast addresses in /a multicast_addresses -
00193   /// we are not subscribed to them yet, but need to be.
00194   /**
00195    * @param multicast_addresses List of multicast addresses to which
00196    *        we need to subscribe to in order to be receiving all event
00197    *        types in the current consumer subscriptions.
00198    */
00199   void add_new_subscriptions (
00200               Address_Set& multicast_addresses);
00201   //@}
00202 
00203   /**
00204    * @class Observer_Disconnect_Command
00205    *
00206    * @brief Disconnects Observer from the Event Channel
00207    *
00208    * Utility class for use as a template argument to TAO_EC_Auto_Command.
00209    * TAO_EC_Auto_Command<Observer_Disconnect_Command> manages
00210    * observer connection to the Event Channel, automatically
00211    * disconnecting from ec in its destructor, if necessary.
00212    */
00213   class TAO_RTEvent_Serv_Export Observer_Disconnect_Command
00214   {
00215   public:
00216     Observer_Disconnect_Command (void);
00217     Observer_Disconnect_Command (RtecEventChannelAdmin::Observer_Handle handle,
00218                                  RtecEventChannelAdmin::EventChannel_ptr ec);
00219 
00220     Observer_Disconnect_Command (const Observer_Disconnect_Command &rhs);
00221     Observer_Disconnect_Command & operator= (const Observer_Disconnect_Command & rhs);
00222 
00223     void execute (ACE_ENV_SINGLE_ARG_DECL);
00224 
00225   private:
00226 
00227     RtecEventChannelAdmin::Observer_Handle handle_;
00228     RtecEventChannelAdmin::EventChannel_var ec_;
00229   };
00230 
00231 private:
00232   /// The NIC used to subscribe for multicast traffic.
00233   ACE_TCHAR *net_if_;
00234 
00235   typedef struct {
00236     ACE_INET_Addr mcast_addr;
00237     ACE_SOCK_Dgram_Mcast* dgram;
00238   } Subscription;
00239   typedef ACE_Array_Base<Subscription> Subscriptions;
00240 
00241   /// List of multicast addresses we subscribe to and dgrams we use.
00242   /*
00243    * We use a dedicated socket for each multicast subscription.  The
00244    * reason: we assume the underlying software, i.e., ACE, binds each
00245    * socket used to receive multicast to the multicast group (mcast addr
00246    * + port) to avoid receiving promiscuous traffic, in which case it is
00247    * not possible to subscribe to more than one mcast address on the same
00248    * socket.
00249    *
00250    * Performance.  We use array to store subscriptions (mcast addr / dgram
00251    * pairs).  If performance is not adequate, we should look into
00252    * using a hash map, keyed on file descriptors, instead.  When there
00253    * are many subscriptions, handle_input() is likely to be more
00254    * efficient with a hash lookup than an array iteration for locating a
00255    * target dgram.  Difference in costs of subscripton changes between
00256    * hash map and array would need to be looked at as well, although
00257    * it is probably highly dependent on the pattern of changes.
00258    */
00259   Subscriptions subscriptions_;
00260 
00261   /// We callback to this object when a message arrives.
00262   /*
00263    * We can keep a raw pointer to the receiver (even though it may
00264    * be a refcounted object) because receiver guarantees
00265    * to notify us (by calling shutdown ()) before going away.
00266    *
00267    * We have to use raw pointer instead of a refcounting mechanism
00268    * here to avoid a circular refcounting dependency between
00269    * receiver and handler.
00270    */
00271   TAO_ECG_Dgram_Handler * receiver_;
00272 
00273   /// SOCKbuf size
00274   CORBA::ULong recvbuf_size_;
00275 
00276   /// Event Channel Observer.  Detects changes in EC consumer subscriptions.
00277   /// ORDER DEPENDENCY: this member should be declared before
00278   /// <auto_observer_disconnect_>.
00279   TAO_EC_Servant_Var<Observer> observer_;
00280 
00281   /// Manages connection of our observer to the Event Channel.
00282   /// ORDER DEPENDENCY: this member should be declared AFTER <observer_>.
00283   TAO_EC_Auto_Command<Observer_Disconnect_Command> auto_observer_disconnect_;
00284 };
00285 
00286 TAO_END_VERSIONED_NAMESPACE_DECL
00287 
00288 #if defined(__ACE_INLINE__)
00289 #include "orbsvcs/Event/ECG_Mcast_EH.i"
00290 #endif /* __ACE_INLINE__ */
00291 
00292 #include /**/ "ace/post.h"
00293 
00294 #endif /* TAO_ECG_Mcast_EH_H */

Generated on Thu Nov 9 13:11:10 2006 for TAO_RTEvent by doxygen 1.3.6