00001 // -*- C++ -*- 00002 00003 /** 00004 * @file EC_Gateway_IIOP.h 00005 * 00006 * EC_Gateway_IIOP.h,v 1.23 2006/03/15 07:52:22 jtc Exp 00007 * 00008 * @author Carlos O'Ryan (coryan@cs.wustl.edu) 00009 * @author Johnny Willemsen (jwillemsen@remedy.nl) 00010 * 00011 * Based on previous work by Tim Harrison (harrison@cs.wustl.edu) and 00012 * other members of the DOC group. More details can be found in: 00013 * 00014 * http://doc.ece.uci.edu/~coryan/EC/index.html 00015 */ 00016 00017 #ifndef TAO_EC_GATEWAY_IIOP_H 00018 #define TAO_EC_GATEWAY_IIOP_H 00019 00020 #include /**/ "ace/pre.h" 00021 00022 #include /**/ "orbsvcs/Event/event_serv_export.h" 00023 #include "orbsvcs/Event/EC_Gateway.h" 00024 00025 #include "orbsvcs/RtecEventChannelAdminS.h" 00026 #include "orbsvcs/RtecEventCommS.h" 00027 #include "orbsvcs/Channel_Clients_T.h" 00028 00029 #include "ace/Map_Manager.h" 00030 #include "ace/Null_Mutex.h" 00031 00032 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00033 00034 class TAO_ECG_ConsumerEC_Control; 00035 class TAO_EC_Gateway_IIOP_Factory; 00036 00037 /** 00038 * @class TAO_EC_Gateway_IIOP 00039 * 00040 * @brief Event Channel Gateway using IIOP. 00041 * 00042 * This class mediates among two event channels, it connects as a consumer of 00043 * events with a remote event channel, and as a supplier of events with the 00044 * local EC. As a consumer it gives a QoS designed to only accept the events 00045 * in which *local* consumers are interested. Eventually the local EC should 00046 * create this object and compute its QoS in an automated manner; but this 00047 * requires some way to filter out the peers registered as consumers, 00048 * otherwise we will get loops in the QoS graph. 00049 * It uses exactly the same set of events in the publications list 00050 * when connected as a supplier. 00051 * 00052 * @note 00053 * An alternative implementation would be to register with the 00054 * remote EC as a supplier, and then filter on the remote EC, but 00055 * one of the objectives is to minimize network traffic. 00056 * On the other hand the events will be pushed to remote consumers, 00057 * event though they will be dropped upon receipt (due to the TTL 00058 * field); IMHO this is another suggestion that the EC needs to know 00059 * (somehow) which consumers are truly its peers in disguise. 00060 * 00061 * @todo: The class makes an extra copy of the events, we need to 00062 * investigate if closer collaboration with its collocated EC could 00063 * be used to remove that copy. 00064 */ 00065 class TAO_RTEvent_Serv_Export TAO_EC_Gateway_IIOP : public TAO_EC_Gateway 00066 { 00067 public: 00068 TAO_EC_Gateway_IIOP (void); 00069 virtual ~TAO_EC_Gateway_IIOP (void); 00070 00071 /** 00072 * To do its job this class requires to know the local and remote ECs it will 00073 * connect to. 00074 * @return 0 in case of success, -1 in case of failure 00075 */ 00076 int init (RtecEventChannelAdmin::EventChannel_ptr supplier_ec, 00077 RtecEventChannelAdmin::EventChannel_ptr consumer_ec 00078 ACE_ENV_ARG_DECL); 00079 00080 /// The channel is disconnecting. 00081 void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS); 00082 00083 /// The channel is disconnecting. 00084 void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS); 00085 00086 /// This is the consumer side behavior, it pushes the events to the 00087 /// local event channel. 00088 void push (const RtecEventComm::EventSet &events 00089 ACE_ENV_ARG_DECL_WITH_DEFAULTS); 00090 00091 /// Disconnect and shutdown the gateway 00092 int shutdown (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS); 00093 00094 // The following methods are documented in the base class. 00095 virtual void close (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS); 00096 virtual void update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub 00097 ACE_ENV_ARG_DECL_WITH_DEFAULTS) 00098 ACE_THROW_SPEC ((CORBA::SystemException)); 00099 virtual void update_supplier (const RtecEventChannelAdmin::SupplierQOS& pub 00100 ACE_ENV_ARG_DECL_WITH_DEFAULTS) 00101 ACE_THROW_SPEC ((CORBA::SystemException)); 00102 00103 // Let the gateway reconnect itself to the consumer ec given exisiting QoS 00104 void reconnect_consumer_ec(ACE_ENV_SINGLE_ARG_DECL); 00105 00106 /// Check whether the consumer event channel is non existent or not 00107 CORBA::Boolean consumer_ec_non_existent (CORBA::Boolean_out disconnected 00108 ACE_ENV_ARG_DECL); 00109 00110 /** 00111 * Cleanup all consumer proxies we have without trying to tell the 00112 * consumer that we are going to disconnect. This can be used to cleanup 00113 * the consumer proxy administration in case we know that the consumers 00114 * are all unreachable. 00115 */ 00116 void cleanup_consumer_proxies (ACE_ENV_SINGLE_ARG_DECL); 00117 00118 /// Cleanup the connection to the consumer ec. Doesn't call anything on the 00119 /// ec again, just set the object to nil 00120 int cleanup_consumer_ec (void); 00121 00122 /// Cleanup the connection to the supplier ec. Doesn't call anything on the 00123 /// ec again, just set the object to nil 00124 int cleanup_supplier_ec (void); 00125 00126 /// Suspend the connection to the supplier ec 00127 void suspend_supplier_ec (ACE_ENV_SINGLE_ARG_DECL); 00128 00129 /// Resume the connection to the supplier ec 00130 void resume_supplier_ec (ACE_ENV_SINGLE_ARG_DECL); 00131 00132 private: 00133 void close_i (ACE_ENV_SINGLE_ARG_DECL); 00134 00135 /// Disconnect the supplier proxy 00136 void disconnect_supplier_proxy_i (ACE_ENV_SINGLE_ARG_DECL); 00137 00138 /// Disconnect all consumer proxies 00139 void disconnect_consumer_proxies_i (ACE_ENV_SINGLE_ARG_DECL); 00140 00141 /// Remove all consumer proxies without calling disconnect on them 00142 void cleanup_consumer_proxies_i (ACE_ENV_SINGLE_ARG_DECL); 00143 00144 void update_consumer_i (const RtecEventChannelAdmin::ConsumerQOS& sub 00145 ACE_ENV_ARG_DECL); 00146 00147 /// Create all connections to consumer ec and to supplier ec. 00148 void open_i (const RtecEventChannelAdmin::ConsumerQOS& sub 00149 ACE_ENV_ARG_DECL); 00150 00151 /// Helper method to see if consumer ec is connected 00152 CORBA::Boolean is_consumer_ec_connected_i (void) const; 00153 00154 /// Push the @a event to the @a consumer. 00155 void push_to_consumer (RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer, 00156 const RtecEventComm::EventSet& event ACE_ENV_ARG_DECL); 00157 00158 void cleanup_consumer_ec_i (void); 00159 00160 void cleanup_supplier_ec_i (void); 00161 00162 protected: 00163 /// Do the real work in init() 00164 int init_i (RtecEventChannelAdmin::EventChannel_ptr supplier_ec, 00165 RtecEventChannelAdmin::EventChannel_ptr consumer_ec 00166 ACE_ENV_ARG_DECL); 00167 00168 protected: 00169 /// Lock to synchronize internal changes 00170 TAO_SYNCH_MUTEX lock_; 00171 00172 /// How many threads are running push() we cannot make changes until 00173 /// that reaches 0 00174 CORBA::ULong busy_count_; 00175 00176 /** 00177 * An update_consumer() message arrived *while* we were doing a 00178 * push() the modification is stored, if multiple update_consumer messages 00179 * arrive only the last one is executed. 00180 */ 00181 int update_posted_; 00182 RtecEventChannelAdmin::ConsumerQOS c_qos_; 00183 00184 /** 00185 * We have a cleanup outstanding and must wait doing cleanup until all pushes 00186 * are ready. 00187 */ 00188 int cleanup_posted_; 00189 00190 /** 00191 * Is the supplier ec suspended? 00192 */ 00193 int supplier_ec_suspended_; 00194 00195 /// The event channel acting as supplier for this gateway so we can reconnect 00196 /// when the list changes. 00197 RtecEventChannelAdmin::EventChannel_var supplier_ec_; 00198 00199 /// The event channel acting as consumer of this gateway 00200 RtecEventChannelAdmin::EventChannel_var consumer_ec_; 00201 00202 /// Our RT_Infos for the event channel that is the supplier. 00203 RtecBase::handle_t supplier_info_; 00204 /// Our RT_Infos for the event channel that is the consumer. 00205 RtecBase::handle_t consumer_info_; 00206 00207 /// Our consumer personality.... 00208 ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP> consumer_; 00209 00210 /// If it is not 0 then we must deactivate the consumer 00211 int consumer_is_active_; 00212 00213 /// Our supplier personality.... 00214 ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP> supplier_; 00215 00216 /// If it is not 0 then we must deactivate the supplier 00217 int supplier_is_active_; 00218 00219 // We use a different Consumer_Proxy 00220 typedef ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> Consumer_Map; 00221 typedef ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> Consumer_Map_Iterator; 00222 00223 /// We talk to the EC (as a supplier) using either an per-supplier 00224 /// proxy or a generic proxy for the type only subscriptions. We push the 00225 /// events to these proxies 00226 Consumer_Map consumer_proxy_map_; 00227 RtecEventChannelAdmin::ProxyPushConsumer_var default_consumer_proxy_; 00228 00229 /// We talk to the EC (as a consumer) using this proxy. We receive the events 00230 /// from these proxy 00231 RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_; 00232 00233 /// The consumer ec control which controls the behaviour in case of a 00234 /// misbehaving consumer ec 00235 TAO_ECG_ConsumerEC_Control* ec_control_; 00236 00237 /// The Gateway IIOP Factory for all the settings 00238 TAO_EC_Gateway_IIOP_Factory* factory_; 00239 00240 /// If 1, we use the TTL flags, if 0, we just ignore TTL 00241 int use_ttl_; 00242 00243 /// The flag for using the consumer proxy map. With 1 the consumer proxy map 00244 /// is used, meaning that for each unique source id we use a different 00245 /// proxy push consumer, if 0, we only use one proxy push consumer (the 00246 /// default) for all source ids. 00247 int use_consumer_proxy_map_; 00248 00249 }; 00250 00251 TAO_END_VERSIONED_NAMESPACE_DECL 00252 00253 #include /**/ "ace/post.h" 00254 00255 #endif /* ACE_EC_GATEWAY_IIOP_H */