00001 // -*- C++ -*- 00002 00003 /** 00004 * @file ECG_Mcast_EH.h 00005 * 00006 * ECG_Mcast_EH.h,v 1.19 2006/03/15 07:52:21 jtc Exp 00007 * 00008 * @author Carlos O'Ryan <coryan@uci.edu> 00009 * @author Jaiganesh Balasubramanian <jai@doc.ece.uci.edu> 00010 * @author Marina Spivak <marina@atdesk.com> 00011 * @author Don Hinton <dhinton@ieee.org> 00012 * 00013 * http://doc.ece.uci.edu/~coryan/EC/index.html 00014 * 00015 */ 00016 #ifndef TAO_ECG_MCAST_EH_H 00017 #define TAO_ECG_MCAST_EH_H 00018 00019 #include /**/ "ace/pre.h" 00020 #include "ace/Event_Handler.h" 00021 00022 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00023 # pragma once 00024 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00025 00026 #include "ace/Unbounded_Set.h" 00027 #include "ace/Array_Base.h" 00028 #include "ace/SOCK_Dgram_Mcast.h" 00029 00030 #include "orbsvcs/RtecEventChannelAdminS.h" 00031 00032 #include /**/ "orbsvcs/Event/event_serv_export.h" 00033 #include "orbsvcs/Event/ECG_Adapters.h" 00034 #include "orbsvcs/Event/EC_Lifetime_Utils.h" 00035 #include "orbsvcs/Event/EC_Lifetime_Utils_T.h" 00036 00037 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00038 00039 /** 00040 * @class TAO_ECG_Mcast_EH 00041 * 00042 * @brief Event Handler for Mcast messages. 00043 * NOT THREAD-SAFE. 00044 * 00045 * This object acts as an Observer to Event Channel. It subscribes to 00046 * multicast groups that carry events matching the EC's subscriptions. 00047 * This object then receives callbacks from the Reactor when data is 00048 * available on the mcast sockets and alerts TAO_ECG_Dgram_Handler, 00049 * which reads the data, transforms it into event and pushes to the 00050 * Event Channel. 00051 */ 00052 class TAO_RTEvent_Serv_Export TAO_ECG_Mcast_EH : 00053 public ACE_Event_Handler, 00054 public TAO_ECG_Handler_Shutdown 00055 { 00056 public: 00057 00058 /// Initialization and termination methods. 00059 //@{ 00060 /** 00061 * Constructor. Messages received by this EH will be forwarded to 00062 * the \a recv. \a net_if can be used to specify NIC where multicast 00063 * messages are expected. \buf_sz would be used to alter the default 00064 * buffer size. 00065 * 00066 * See comments for receiver_ data member on why raw pointer is 00067 * used for the \a recv argument. 00068 */ 00069 TAO_ECG_Mcast_EH (TAO_ECG_Dgram_Handler *recv, 00070 const ACE_TCHAR *net_if = 0, 00071 CORBA::ULong buf_sz = 0); 00072 00073 /// Destructor. 00074 virtual ~TAO_ECG_Mcast_EH (void); 00075 00076 /** 00077 * Register for changes in the EC subscription list. 00078 * When the subscription list becomes non-empty we join the proper 00079 * multicast groups (using Dgram_Handler to translate between event 00080 * types and mcast groups) and the class registers itself with the 00081 * reactor. 00082 * 00083 * To insure proper resource clean up, if open () is successful, 00084 * the user MUST call shutdown () when handler is no longer needed 00085 * (and its reactor still exists). 00086 */ 00087 void open (RtecEventChannelAdmin::EventChannel_ptr ec 00088 ACE_ENV_ARG_DECL_WITH_DEFAULTS); 00089 00090 /// TAO_ECG_Handler_Shutdown method. 00091 /** 00092 * Remove ourselves from the event channel, unsubscribe from the 00093 * multicast groups, close the sockets and deregister from the 00094 * reactor. 00095 */ 00096 virtual int shutdown (void); 00097 //@} 00098 00099 /// Reactor callback. Notify receiver_ that a dgram corresponding 00100 /// to \a fd is ready for reading. 00101 virtual int handle_input (ACE_HANDLE fd); 00102 00103 private: 00104 00105 /** 00106 * @class Observer 00107 * 00108 * @brief Observes changes in the EC consumer subscriptions and notifies 00109 * TAO_ECG_Mcast_EH when there are changes. 00110 */ 00111 class Observer : 00112 public virtual POA_RtecEventChannelAdmin::Observer, 00113 public TAO_EC_Deactivated_Object 00114 { 00115 public: 00116 /// Constructor. Changes in the EC subscriptions will be reported 00117 /// to the \a eh. 00118 Observer (TAO_ECG_Mcast_EH* eh); 00119 00120 /// Shut down the observer: disconnect from EC and deactivate from 00121 /// POA. 00122 void shutdown (void); 00123 00124 /// Event Channel Observer methods 00125 //@{ 00126 virtual void update_consumer ( 00127 const RtecEventChannelAdmin::ConsumerQOS& sub 00128 ACE_ENV_ARG_DECL_WITH_DEFAULTS) 00129 ACE_THROW_SPEC ((CORBA::SystemException)); 00130 virtual void update_supplier ( 00131 const RtecEventChannelAdmin::SupplierQOS& pub 00132 ACE_ENV_ARG_DECL_WITH_DEFAULTS) 00133 ACE_THROW_SPEC ((CORBA::SystemException)); 00134 00135 private: 00136 /// Handler we notify of subscriptions changes. 00137 /* 00138 * Observer can keep a raw pointer to mcast handler, because the handler 00139 * guarantees to notify the observer (by calling shutdown ()) 00140 * before going away. 00141 */ 00142 TAO_ECG_Mcast_EH *eh_; 00143 }; 00144 00145 /// Make update_consumer () accessible to Observer. 00146 friend class Observer; 00147 00148 /// The Observer method. Subscribe/unsubscribe to multicast groups 00149 /// according to changes in consumer subscriptions. 00150 void update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub 00151 ACE_ENV_ARG_DECL) 00152 ACE_THROW_SPEC ((CORBA::SystemException)); 00153 00154 00155 typedef ACE_Unbounded_Set<ACE_INET_Addr> Address_Set; 00156 00157 /// Helpers for updating multicast subscriptions based on changes in 00158 /// consumer subscriptions. 00159 //@{ 00160 /// Compute the list of multicast addresses that we need to be 00161 /// subscribed to, in order to receive the events described in the 00162 /// ConsumerQOS parameter. 00163 /** 00164 * @param sub The list of event types that our event channel 00165 * consumers are interested in. 00166 * @param multicast_addresses This method populates this list with 00167 * multicast addresses that we need to be subscribed to in 00168 * order to receive event types specified in /a sub. 00169 * 00170 * @throw CORBA::SystemException This method needs to perform 00171 * several CORBA invocations, and it propagates any exceptions 00172 * back to the caller. 00173 */ 00174 void compute_required_subscriptions ( 00175 const RtecEventChannelAdmin::ConsumerQOS& sub, 00176 Address_Set& multicast_addresses 00177 ACE_ENV_ARG_DECL) 00178 ACE_THROW_SPEC ((CORBA::SystemException)); 00179 00180 /// Unsubscribe from any multicast addresses we are currently 00181 /// subscribed to that are not in the \a multicast_addresses list. 00182 /// Also remove from /a multicast_addresses any addresses to which we are 00183 /// already subscribed. 00184 /** 00185 * @param multicast_addresses List of multicast 00186 * addresses we need to be subscribed to in order receive all 00187 * event types in the current consumer subscriptions. 00188 */ 00189 int delete_unwanted_subscriptions ( 00190 Address_Set& multicast_addresses); 00191 00192 /// Subscribe to all multicast addresses in /a multicast_addresses - 00193 /// we are not subscribed to them yet, but need to be. 00194 /** 00195 * @param multicast_addresses List of multicast addresses to which 00196 * we need to subscribe to in order to be receiving all event 00197 * types in the current consumer subscriptions. 00198 */ 00199 void add_new_subscriptions ( 00200 Address_Set& multicast_addresses); 00201 //@} 00202 00203 /** 00204 * @class Observer_Disconnect_Command 00205 * 00206 * @brief Disconnects Observer from the Event Channel 00207 * 00208 * Utility class for use as a template argument to TAO_EC_Auto_Command. 00209 * TAO_EC_Auto_Command<Observer_Disconnect_Command> manages 00210 * observer connection to the Event Channel, automatically 00211 * disconnecting from ec in its destructor, if necessary. 00212 */ 00213 class TAO_RTEvent_Serv_Export Observer_Disconnect_Command 00214 { 00215 public: 00216 Observer_Disconnect_Command (void); 00217 Observer_Disconnect_Command (RtecEventChannelAdmin::Observer_Handle handle, 00218 RtecEventChannelAdmin::EventChannel_ptr ec); 00219 00220 Observer_Disconnect_Command (const Observer_Disconnect_Command &rhs); 00221 Observer_Disconnect_Command & operator= (const Observer_Disconnect_Command & rhs); 00222 00223 void execute (ACE_ENV_SINGLE_ARG_DECL); 00224 00225 private: 00226 00227 RtecEventChannelAdmin::Observer_Handle handle_; 00228 RtecEventChannelAdmin::EventChannel_var ec_; 00229 }; 00230 00231 private: 00232 /// The NIC used to subscribe for multicast traffic. 00233 ACE_TCHAR *net_if_; 00234 00235 typedef struct { 00236 ACE_INET_Addr mcast_addr; 00237 ACE_SOCK_Dgram_Mcast* dgram; 00238 } Subscription; 00239 typedef ACE_Array_Base<Subscription> Subscriptions; 00240 00241 /// List of multicast addresses we subscribe to and dgrams we use. 00242 /* 00243 * We use a dedicated socket for each multicast subscription. The 00244 * reason: we assume the underlying software, i.e., ACE, binds each 00245 * socket used to receive multicast to the multicast group (mcast addr 00246 * + port) to avoid receiving promiscuous traffic, in which case it is 00247 * not possible to subscribe to more than one mcast address on the same 00248 * socket. 00249 * 00250 * Performance. We use array to store subscriptions (mcast addr / dgram 00251 * pairs). If performance is not adequate, we should look into 00252 * using a hash map, keyed on file descriptors, instead. When there 00253 * are many subscriptions, handle_input() is likely to be more 00254 * efficient with a hash lookup than an array iteration for locating a 00255 * target dgram. Difference in costs of subscripton changes between 00256 * hash map and array would need to be looked at as well, although 00257 * it is probably highly dependent on the pattern of changes. 00258 */ 00259 Subscriptions subscriptions_; 00260 00261 /// We callback to this object when a message arrives. 00262 /* 00263 * We can keep a raw pointer to the receiver (even though it may 00264 * be a refcounted object) because receiver guarantees 00265 * to notify us (by calling shutdown ()) before going away. 00266 * 00267 * We have to use raw pointer instead of a refcounting mechanism 00268 * here to avoid a circular refcounting dependency between 00269 * receiver and handler. 00270 */ 00271 TAO_ECG_Dgram_Handler * receiver_; 00272 00273 /// SOCKbuf size 00274 CORBA::ULong recvbuf_size_; 00275 00276 /// Event Channel Observer. Detects changes in EC consumer subscriptions. 00277 /// ORDER DEPENDENCY: this member should be declared before 00278 /// <auto_observer_disconnect_>. 00279 TAO_EC_Servant_Var<Observer> observer_; 00280 00281 /// Manages connection of our observer to the Event Channel. 00282 /// ORDER DEPENDENCY: this member should be declared AFTER <observer_>. 00283 TAO_EC_Auto_Command<Observer_Disconnect_Command> auto_observer_disconnect_; 00284 }; 00285 00286 TAO_END_VERSIONED_NAMESPACE_DECL 00287 00288 #if defined(__ACE_INLINE__) 00289 #include "orbsvcs/Event/ECG_Mcast_EH.i" 00290 #endif /* __ACE_INLINE__ */ 00291 00292 #include /**/ "ace/post.h" 00293 00294 #endif /* TAO_ECG_Mcast_EH_H */