00001 // -*- C++ -*- 00002 00003 /** 00004 * @file ECG_Mcast_EH.h 00005 * 00006 * $Id: ECG_Mcast_EH.h 77001 2007-02-12 07:54:49Z johnnyw $ 00007 * 00008 * @author Carlos O'Ryan <coryan@uci.edu> 00009 * @author Jaiganesh Balasubramanian <jai@doc.ece.uci.edu> 00010 * @author Marina Spivak <marina@atdesk.com> 00011 * @author Don Hinton <dhinton@ieee.org> 00012 * 00013 * http://doc.ece.uci.edu/~coryan/EC/index.html 00014 * 00015 */ 00016 #ifndef TAO_ECG_MCAST_EH_H 00017 #define TAO_ECG_MCAST_EH_H 00018 00019 #include /**/ "ace/pre.h" 00020 #include "ace/Event_Handler.h" 00021 00022 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00023 # pragma once 00024 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00025 00026 #include "ace/Unbounded_Set.h" 00027 #include "ace/Array_Base.h" 00028 #include "ace/SOCK_Dgram_Mcast.h" 00029 00030 #include "orbsvcs/RtecEventChannelAdminS.h" 00031 00032 #include /**/ "orbsvcs/Event/event_serv_export.h" 00033 #include "orbsvcs/Event/ECG_Adapters.h" 00034 #include "orbsvcs/Event/EC_Lifetime_Utils.h" 00035 #include "orbsvcs/Event/EC_Lifetime_Utils_T.h" 00036 00037 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00038 00039 /** 00040 * @class TAO_ECG_Mcast_EH 00041 * 00042 * @brief Event Handler for Mcast messages. 00043 * NOT THREAD-SAFE. 00044 * 00045 * This object acts as an Observer to Event Channel. It subscribes to 00046 * multicast groups that carry events matching the EC's subscriptions. 00047 * This object then receives callbacks from the Reactor when data is 00048 * available on the mcast sockets and alerts TAO_ECG_Dgram_Handler, 00049 * which reads the data, transforms it into event and pushes to the 00050 * Event Channel. 00051 */ 00052 class TAO_RTEvent_Serv_Export TAO_ECG_Mcast_EH : 00053 public ACE_Event_Handler, 00054 public TAO_ECG_Handler_Shutdown 00055 { 00056 public: 00057 00058 /// Initialization and termination methods. 00059 //@{ 00060 /** 00061 * Constructor. Messages received by this EH will be forwarded to 00062 * the \a recv. \a net_if can be used to specify NIC where multicast 00063 * messages are expected. \buf_sz would be used to alter the default 00064 * buffer size. 00065 * 00066 * See comments for receiver_ data member on why raw pointer is 00067 * used for the \a recv argument. 00068 */ 00069 TAO_ECG_Mcast_EH (TAO_ECG_Dgram_Handler *recv, 00070 const ACE_TCHAR *net_if = 0, 00071 CORBA::ULong buf_sz = 0); 00072 00073 /// Destructor. 00074 virtual ~TAO_ECG_Mcast_EH (void); 00075 00076 /** 00077 * Register for changes in the EC subscription list. 00078 * When the subscription list becomes non-empty we join the proper 00079 * multicast groups (using Dgram_Handler to translate between event 00080 * types and mcast groups) and the class registers itself with the 00081 * reactor. 00082 * 00083 * To insure proper resource clean up, if open () is successful, 00084 * the user MUST call shutdown () when handler is no longer needed 00085 * (and its reactor still exists). 00086 */ 00087 void open (RtecEventChannelAdmin::EventChannel_ptr ec); 00088 00089 /// TAO_ECG_Handler_Shutdown method. 00090 /** 00091 * Remove ourselves from the event channel, unsubscribe from the 00092 * multicast groups, close the sockets and deregister from the 00093 * reactor. 00094 */ 00095 virtual int shutdown (void); 00096 //@} 00097 00098 /// Reactor callback. Notify receiver_ that a dgram corresponding 00099 /// to \a fd is ready for reading. 00100 virtual int handle_input (ACE_HANDLE fd); 00101 00102 private: 00103 00104 /** 00105 * @class Observer 00106 * 00107 * @brief Observes changes in the EC consumer subscriptions and notifies 00108 * TAO_ECG_Mcast_EH when there are changes. 00109 */ 00110 class Observer : 00111 public virtual POA_RtecEventChannelAdmin::Observer, 00112 public TAO_EC_Deactivated_Object 00113 { 00114 public: 00115 /// Constructor. Changes in the EC subscriptions will be reported 00116 /// to the \a eh. 00117 Observer (TAO_ECG_Mcast_EH* eh); 00118 00119 /// Shut down the observer: disconnect from EC and deactivate from 00120 /// POA. 00121 void shutdown (void); 00122 00123 /// Event Channel Observer methods 00124 //@{ 00125 virtual void update_consumer ( 00126 const RtecEventChannelAdmin::ConsumerQOS& sub); 00127 virtual void update_supplier ( 00128 const RtecEventChannelAdmin::SupplierQOS& pub); 00129 00130 private: 00131 /// Handler we notify of subscriptions changes. 00132 /* 00133 * Observer can keep a raw pointer to mcast handler, because the handler 00134 * guarantees to notify the observer (by calling shutdown ()) 00135 * before going away. 00136 */ 00137 TAO_ECG_Mcast_EH *eh_; 00138 }; 00139 00140 /// Make update_consumer () accessible to Observer. 00141 friend class Observer; 00142 00143 /// The Observer method. Subscribe/unsubscribe to multicast groups 00144 /// according to changes in consumer subscriptions. 00145 void update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub); 00146 00147 00148 typedef ACE_Unbounded_Set<ACE_INET_Addr> Address_Set; 00149 00150 /// Helpers for updating multicast subscriptions based on changes in 00151 /// consumer subscriptions. 00152 //@{ 00153 /// Compute the list of multicast addresses that we need to be 00154 /// subscribed to, in order to receive the events described in the 00155 /// ConsumerQOS parameter. 00156 /** 00157 * @param sub The list of event types that our event channel 00158 * consumers are interested in. 00159 * @param multicast_addresses This method populates this list with 00160 * multicast addresses that we need to be subscribed to in 00161 * order to receive event types specified in /a sub. 00162 * 00163 * @throw CORBA::SystemException This method needs to perform 00164 * several CORBA invocations, and it propagates any exceptions 00165 * back to the caller. 00166 */ 00167 void compute_required_subscriptions ( 00168 const RtecEventChannelAdmin::ConsumerQOS& sub, 00169 Address_Set& multicast_addresses); 00170 00171 /// Unsubscribe from any multicast addresses we are currently 00172 /// subscribed to that are not in the \a multicast_addresses list. 00173 /// Also remove from /a multicast_addresses any addresses to which we are 00174 /// already subscribed. 00175 /** 00176 * @param multicast_addresses List of multicast 00177 * addresses we need to be subscribed to in order receive all 00178 * event types in the current consumer subscriptions. 00179 */ 00180 int delete_unwanted_subscriptions ( 00181 Address_Set& multicast_addresses); 00182 00183 /// Subscribe to all multicast addresses in /a multicast_addresses - 00184 /// we are not subscribed to them yet, but need to be. 00185 /** 00186 * @param multicast_addresses List of multicast addresses to which 00187 * we need to subscribe to in order to be receiving all event 00188 * types in the current consumer subscriptions. 00189 */ 00190 void add_new_subscriptions ( 00191 Address_Set& multicast_addresses); 00192 //@} 00193 00194 /** 00195 * @class Observer_Disconnect_Command 00196 * 00197 * @brief Disconnects Observer from the Event Channel 00198 * 00199 * Utility class for use as a template argument to TAO_EC_Auto_Command. 00200 * TAO_EC_Auto_Command<Observer_Disconnect_Command> manages 00201 * observer connection to the Event Channel, automatically 00202 * disconnecting from ec in its destructor, if necessary. 00203 */ 00204 class TAO_RTEvent_Serv_Export Observer_Disconnect_Command 00205 { 00206 public: 00207 Observer_Disconnect_Command (void); 00208 Observer_Disconnect_Command (RtecEventChannelAdmin::Observer_Handle handle, 00209 RtecEventChannelAdmin::EventChannel_ptr ec); 00210 00211 Observer_Disconnect_Command (const Observer_Disconnect_Command &rhs); 00212 Observer_Disconnect_Command & operator= (const Observer_Disconnect_Command & rhs); 00213 00214 void execute (void); 00215 00216 private: 00217 00218 RtecEventChannelAdmin::Observer_Handle handle_; 00219 RtecEventChannelAdmin::EventChannel_var ec_; 00220 }; 00221 00222 private: 00223 /// The NIC used to subscribe for multicast traffic. 00224 ACE_TCHAR *net_if_; 00225 00226 typedef struct { 00227 ACE_INET_Addr mcast_addr; 00228 ACE_SOCK_Dgram_Mcast* dgram; 00229 } Subscription; 00230 typedef ACE_Array_Base<Subscription> Subscriptions; 00231 00232 /// List of multicast addresses we subscribe to and dgrams we use. 00233 /* 00234 * We use a dedicated socket for each multicast subscription. The 00235 * reason: we assume the underlying software, i.e., ACE, binds each 00236 * socket used to receive multicast to the multicast group (mcast addr 00237 * + port) to avoid receiving promiscuous traffic, in which case it is 00238 * not possible to subscribe to more than one mcast address on the same 00239 * socket. 00240 * 00241 * Performance. We use array to store subscriptions (mcast addr / dgram 00242 * pairs). If performance is not adequate, we should look into 00243 * using a hash map, keyed on file descriptors, instead. When there 00244 * are many subscriptions, handle_input() is likely to be more 00245 * efficient with a hash lookup than an array iteration for locating a 00246 * target dgram. Difference in costs of subscripton changes between 00247 * hash map and array would need to be looked at as well, although 00248 * it is probably highly dependent on the pattern of changes. 00249 */ 00250 Subscriptions subscriptions_; 00251 00252 /// We callback to this object when a message arrives. 00253 /* 00254 * We can keep a raw pointer to the receiver (even though it may 00255 * be a refcounted object) because receiver guarantees 00256 * to notify us (by calling shutdown ()) before going away. 00257 * 00258 * We have to use raw pointer instead of a refcounting mechanism 00259 * here to avoid a circular refcounting dependency between 00260 * receiver and handler. 00261 */ 00262 TAO_ECG_Dgram_Handler * receiver_; 00263 00264 /// SOCKbuf size 00265 CORBA::ULong recvbuf_size_; 00266 00267 /// Event Channel Observer. Detects changes in EC consumer subscriptions. 00268 /// ORDER DEPENDENCY: this member should be declared before 00269 /// <auto_observer_disconnect_>. 00270 TAO_EC_Servant_Var<Observer> observer_; 00271 00272 /// Manages connection of our observer to the Event Channel. 00273 /// ORDER DEPENDENCY: this member should be declared AFTER <observer_>. 00274 TAO_EC_Auto_Command<Observer_Disconnect_Command> auto_observer_disconnect_; 00275 }; 00276 00277 TAO_END_VERSIONED_NAMESPACE_DECL 00278 00279 #if defined(__ACE_INLINE__) 00280 #include "orbsvcs/Event/ECG_Mcast_EH.inl" 00281 #endif /* __ACE_INLINE__ */ 00282 00283 #include /**/ "ace/post.h" 00284 00285 #endif /* TAO_ECG_Mcast_EH_H */