00001 // -*- C++ -*- 00002 00003 /** 00004 * @file ECG_UDP_Receiver.h 00005 * 00006 * ECG_UDP_Receiver.h,v 1.15 2006/03/15 07:52:21 jtc Exp 00007 * 00008 * @author Carlos O'Ryan (coryan@cs.wustl.edu) 00009 * @author Marina Spivak (marina@atdesk.com) 00010 * 00011 * Based on previous work by Tim Harrison (harrison@cs.wustl.edu) and 00012 * other members of the DOC group. More details can be found in: 00013 * 00014 * http://doc.ece.uci.edu/~coryan/EC/index.html 00015 * 00016 * Define helper classes to propagate events between ECs using 00017 * either UDP or multicast. 00018 * The architecture is a bit complicated and deserves some 00019 * explanation: sending the events over UDP (or mcast) is easy, a 00020 * Consumer (TAO_ECG_UDP_Sender) subscribes for a certain set of 00021 * events, its push() method marshalls the event set into a CDR 00022 * stream that is sent using an ACE_SOCK_Dgram. The subscription 00023 * set and IP address can be configured. 00024 * Another helper class (TAO_ECG_UDP_Receiver) acts as a supplier of 00025 * events; it receives a callback when an event is available on an 00026 * ACE_SOCK_Dgram, it demarshalls the event and pushes it to the 00027 * EC. Two ACE_Event_Handler classes are provided that can forward 00028 * the events to this Supplier: TAO_ECG_Mcast_EH can receive events 00029 * from a multicast group; TAO_ECG_UDP_EH can receive events from a 00030 * regular UDP socket. 00031 * 00032 * @todo The class makes an extra copy of the events, we need to 00033 * investigate if closer collaboration with its collocated EC could 00034 * be used to remove that copy. 00035 */ 00036 00037 #ifndef TAO_ECG_UDP_RECEIVER_H 00038 #define TAO_ECG_UDP_RECEIVER_H 00039 #include /**/ "ace/pre.h" 00040 00041 #include "orbsvcs/RtecUDPAdminS.h" 00042 00043 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00044 # pragma once 00045 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00046 00047 #include "orbsvcs/RtecEventChannelAdminS.h" 00048 #include /**/ "orbsvcs/Event/event_serv_export.h" 00049 00050 #include "orbsvcs/Event/ECG_Adapters.h" 00051 #include "orbsvcs/Event/EC_Lifetime_Utils.h" 00052 #include "orbsvcs/Event/EC_Lifetime_Utils_T.h" 00053 #include "orbsvcs/Event/ECG_CDR_Message_Receiver.h" 00054 00055 ACE_BEGIN_VERSIONED_NAMESPACE_DECL 00056 class ACE_Reactor; 00057 ACE_END_VERSIONED_NAMESPACE_DECL 00058 00059 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00060 00061 class TAO_ECG_UDP_Out_Endpoint; 00062 00063 /** 00064 * @class TAO_ECG_UDP_Receiver_Disconnect_Command 00065 * 00066 * @brief Disconnects supplier represented by @a proxy from the Event Channel. 00067 * 00068 * Utility class for use as a template argument to TAO_EC_Auto_Command. 00069 * TAO_EC_Auto_Command<TAO_ECG_UDP_Receiver_Disconnect_Command> manages 00070 * supplier connection to the Event Channel, automatically disconnecting from 00071 * @a proxy in its destructor, if necessary. 00072 */ 00073 class TAO_RTEvent_Serv_Export TAO_ECG_UDP_Receiver_Disconnect_Command 00074 { 00075 public: 00076 TAO_ECG_UDP_Receiver_Disconnect_Command (void); 00077 TAO_ECG_UDP_Receiver_Disconnect_Command ( 00078 RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy); 00079 00080 TAO_ECG_UDP_Receiver_Disconnect_Command ( 00081 const TAO_ECG_UDP_Receiver_Disconnect_Command & rhs); 00082 00083 TAO_ECG_UDP_Receiver_Disconnect_Command & 00084 operator= (const TAO_ECG_UDP_Receiver_Disconnect_Command & rhs); 00085 00086 void execute (ACE_ENV_SINGLE_ARG_DECL); 00087 00088 private: 00089 00090 RtecEventChannelAdmin::ProxyPushConsumer_var proxy_; 00091 }; 00092 00093 /** 00094 * @class TAO_ECG_UDP_Receiver 00095 * 00096 * @brief Receive events from UDP or Multicast and push them to a 00097 * "local" EC. 00098 * NOT THREAD-SAFE. 00099 * 00100 * This class connects as a supplier to an EventChannel, and supplies 00101 * to it all events it receives via UDP or Multicast. 00102 */ 00103 class TAO_RTEvent_Serv_Export TAO_ECG_UDP_Receiver : 00104 public virtual POA_RtecEventComm::PushSupplier 00105 , public virtual TAO_EC_Deactivated_Object 00106 , public virtual TAO_ECG_Dgram_Handler 00107 { 00108 public: 00109 00110 /// Initialization and termination methods. 00111 //@{ 00112 00113 /// Create a new TAO_ECG_UDP_Receiver object. 00114 /// (Constructor access is restricted to insure that all 00115 /// TAO_ECG_UDP_Receiver objects are heap-allocated.) 00116 static TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> create (CORBA::Boolean perform_crc = 0); 00117 00118 ~TAO_ECG_UDP_Receiver (void); 00119 00120 /** 00121 * @param lcl_ec Event Channel to which we will act as a supplier of events. 00122 * @param ignore_from Endpoint used to remove events generated by 00123 * the same process. 00124 * @param addr_server Address server used to obtain mapping of event type 00125 * to multicast group. 00126 * To insure proper resource clean up, if init () is successful, 00127 * shutdown () must be called when the receiver is no longer needed. 00128 * This is done by disconnect_push_supplier() method. If 00129 * disconnect_push_supplier() will not be called, it is the 00130 * responsibility of the user. 00131 * Furthermore, if shutdown() is not explicitly called by 00132 * either disconnect_push_supplier () or the user, the receiver 00133 * will clean up the resources in its destructor, however, certain 00134 * entities involved in cleanup must still exist at that point, 00135 * e.g., POA. 00136 */ 00137 void init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec, 00138 TAO_ECG_Refcounted_Endpoint ignore_from, 00139 RtecUDPAdmin::AddrServer_ptr addr_server 00140 ACE_ENV_ARG_DECL); 00141 00142 /// Connect or reconnect to the EC with the given publications. 00143 /** 00144 * @note If we are already connected to EC and a reconnection is 00145 * necessary, the EC must have reconnects enabled in order for this 00146 * method to succeed. 00147 */ 00148 void connect (const RtecEventChannelAdmin::SupplierQOS& pub 00149 ACE_ENV_ARG_DECL); 00150 00151 /// Set the handler we must notify when shutdown occurs. (This is 00152 /// the handler that alerts us when data is available on udp/mcast socket.) 00153 /// Shutdown notification gives the handler an opportunity to properly clean 00154 /// up resources. 00155 void set_handler_shutdown (TAO_ECG_Refcounted_Handler handler_shutdown_rptr); 00156 00157 /// Deactivate from POA and disconnect from EC, if necessary. Shut 00158 /// down all receiver components. 00159 /** 00160 * If this class is used with refcounting, calling this method may 00161 * result in decrementing of the reference count (due to 00162 * deactivation) and deletion of the object. 00163 */ 00164 void shutdown (ACE_ENV_SINGLE_ARG_DECL); 00165 //@} 00166 00167 /// Accessor. 00168 /// Call the RtecUDPAdmin::AddrServer::get_addr. Throws exception 00169 /// if nill Address Server was specified in init (). 00170 void get_addr (const RtecEventComm::EventHeader& header, 00171 RtecUDPAdmin::UDP_Addr_out addr 00172 ACE_ENV_ARG_DECL); 00173 00174 /// The PushSupplier idl method. 00175 /// Invokes shutdown (), which may result in the object being deleted, if 00176 /// refcounting is used to manage its lifetime. 00177 virtual void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL) 00178 ACE_THROW_SPEC ((CORBA::SystemException)); 00179 00180 /// TAO_ECG_Dgram_Handler method. 00181 /** 00182 * UDP/Multicast Event_Handlers call this method when data is 00183 * available at the socket - the <dgram> is ready for reading. 00184 * Data is read from the socket, and, if complete message is 00185 * received, the event is pushed to the local Event Channel. 00186 */ 00187 virtual int handle_input (ACE_SOCK_Dgram& dgram); 00188 00189 protected: 00190 00191 /// Constructor (protected). Clients can create new 00192 /// TAO_ECG_UDP_Receiver objects using the static create() method. 00193 TAO_ECG_UDP_Receiver (CORBA::Boolean perform_crc = 0); 00194 00195 private: 00196 00197 /// Helpers for the connect() method. 00198 //@{ 00199 // Establishes connection to the Event Channel for the first time. 00200 void new_connect (const RtecEventChannelAdmin::SupplierQOS& pub 00201 ACE_ENV_ARG_DECL); 00202 00203 // Updates existing connection to the Event Channel. 00204 void reconnect (const RtecEventChannelAdmin::SupplierQOS& pub 00205 ACE_ENV_ARG_DECL); 00206 //@} 00207 00208 /// Event Channel to which we act as a supplier. 00209 RtecEventChannelAdmin::EventChannel_var lcl_ec_; 00210 00211 /// The server used to map event types to multicast groups. 00212 RtecUDPAdmin::AddrServer_var addr_server_; 00213 00214 /// Proxy used to supply events to the Event Channel. 00215 RtecEventChannelAdmin::ProxyPushConsumer_var consumer_proxy_; 00216 00217 /// Helper for reading incoming UDP/Multicast messages. It assembles 00218 /// message fragments and provides access to a cdr stream once the 00219 /// complete message has been received. 00220 TAO_ECG_CDR_Message_Receiver cdr_receiver_; 00221 00222 /// Handler we must notify when shutdown occurs, so it has an 00223 /// opportunity to clean up resources. 00224 TAO_ECG_Refcounted_Handler handler_rptr_; 00225 00226 typedef TAO_EC_Auto_Command<TAO_ECG_UDP_Receiver_Disconnect_Command> 00227 ECG_Receiver_Auto_Proxy_Disconnect; 00228 /// Manages our connection to Consumer Proxy. 00229 ECG_Receiver_Auto_Proxy_Disconnect auto_proxy_disconnect_; 00230 }; 00231 00232 TAO_END_VERSIONED_NAMESPACE_DECL 00233 00234 #if defined(__ACE_INLINE__) 00235 #include "orbsvcs/Event/ECG_UDP_Receiver.i" 00236 #endif /* __ACE_INLINE__ */ 00237 00238 #include /**/ "ace/post.h" 00239 #endif /* TAO_ECG_UDP_RECEIVER_H */