00001 // -*- C++ -*- 00002 00003 /** 00004 * @file EC_Gateway_IIOP.h 00005 * 00006 * $Id: EC_Gateway_IIOP.h 81419 2008-04-24 11:35:22Z johnnyw $ 00007 * 00008 * @author Carlos O'Ryan (coryan@cs.wustl.edu) 00009 * @author Johnny Willemsen (jwillemsen@remedy.nl) 00010 * 00011 * Based on previous work by Tim Harrison (harrison@cs.wustl.edu) and 00012 * other members of the DOC group. More details can be found in: 00013 * 00014 * http://doc.ece.uci.edu/~coryan/EC/index.html 00015 */ 00016 00017 #ifndef TAO_EC_GATEWAY_IIOP_H 00018 #define TAO_EC_GATEWAY_IIOP_H 00019 00020 #include /**/ "ace/pre.h" 00021 00022 #include /**/ "orbsvcs/Event/event_serv_export.h" 00023 #include "orbsvcs/Event/EC_Gateway.h" 00024 00025 #include "orbsvcs/RtecEventChannelAdminS.h" 00026 #include "orbsvcs/RtecEventCommS.h" 00027 #include "orbsvcs/Channel_Clients_T.h" 00028 00029 #include "ace/Map_Manager.h" 00030 #include "ace/Null_Mutex.h" 00031 00032 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00033 00034 class TAO_ECG_ConsumerEC_Control; 00035 class TAO_EC_Gateway_IIOP_Factory; 00036 00037 /** 00038 * @class TAO_EC_Gateway_IIOP 00039 * 00040 * @brief Event Channel Gateway using IIOP. 00041 * 00042 * This class mediates among two event channels, it connects as a consumer of 00043 * events with a remote event channel, and as a supplier of events with the 00044 * local EC. As a consumer it gives a QoS designed to only accept the events 00045 * in which *local* consumers are interested. Eventually the local EC should 00046 * create this object and compute its QoS in an automated manner; but this 00047 * requires some way to filter out the peers registered as consumers, 00048 * otherwise we will get loops in the QoS graph. 00049 * It uses exactly the same set of events in the publications list 00050 * when connected as a supplier. 00051 * 00052 * @note 00053 * An alternative implementation would be to register with the 00054 * remote EC as a supplier, and then filter on the remote EC, but 00055 * one of the objectives is to minimize network traffic. 00056 * On the other hand the events will be pushed to remote consumers, 00057 * event though they will be dropped upon receipt (due to the TTL 00058 * field); IMHO this is another suggestion that the EC needs to know 00059 * (somehow) which consumers are truly its peers in disguise. 00060 * 00061 * @todo: The class makes an extra copy of the events, we need to 00062 * investigate if closer collaboration with its collocated EC could 00063 * be used to remove that copy. 00064 */ 00065 class TAO_RTEvent_Serv_Export TAO_EC_Gateway_IIOP : public TAO_EC_Gateway 00066 { 00067 public: 00068 TAO_EC_Gateway_IIOP (void); 00069 virtual ~TAO_EC_Gateway_IIOP (void); 00070 00071 /** 00072 * To do its job this class requires to know the local and remote ECs it will 00073 * connect to. 00074 * @return 0 in case of success, -1 in case of failure 00075 */ 00076 int init (RtecEventChannelAdmin::EventChannel_ptr supplier_ec, 00077 RtecEventChannelAdmin::EventChannel_ptr consumer_ec); 00078 00079 /// The channel is disconnecting. 00080 void disconnect_push_supplier (void); 00081 00082 /// The channel is disconnecting. 00083 void disconnect_push_consumer (void); 00084 00085 /// This is the consumer side behavior, it pushes the events to the 00086 /// local event channel. 00087 void push (const RtecEventComm::EventSet &events); 00088 00089 /// Disconnect and shutdown the gateway 00090 int shutdown (void); 00091 00092 // The following methods are documented in the base class. 00093 virtual void close (void); 00094 virtual void update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub); 00095 virtual void update_supplier (const RtecEventChannelAdmin::SupplierQOS& pub); 00096 00097 // Let the gateway reconnect itself to the consumer ec given exisiting QoS 00098 void reconnect_consumer_ec(void); 00099 00100 /// Check whether the consumer event channel is non existent or not 00101 CORBA::Boolean consumer_ec_non_existent (CORBA::Boolean_out disconnected); 00102 00103 /** 00104 * Cleanup all consumer proxies we have without trying to tell the 00105 * consumer that we are going to disconnect. This can be used to cleanup 00106 * the consumer proxy administration in case we know that the consumers 00107 * are all unreachable. 00108 */ 00109 void cleanup_consumer_proxies (void); 00110 00111 /// Cleanup the connection to the consumer ec. Doesn't call anything on the 00112 /// ec again, just set the object to nil 00113 int cleanup_consumer_ec (void); 00114 00115 /// Cleanup the connection to the supplier ec. Doesn't call anything on the 00116 /// ec again, just set the object to nil 00117 int cleanup_supplier_ec (void); 00118 00119 /// Suspend the connection to the supplier ec 00120 void suspend_supplier_ec (void); 00121 00122 /// Resume the connection to the supplier ec 00123 void resume_supplier_ec (void); 00124 00125 private: 00126 void close_i (void); 00127 00128 /// Disconnect the supplier proxy 00129 void disconnect_supplier_proxy_i (void); 00130 00131 /// Disconnect all consumer proxies 00132 void disconnect_consumer_proxies_i (void); 00133 00134 /// Remove all consumer proxies without calling disconnect on them 00135 void cleanup_consumer_proxies_i (void); 00136 00137 void update_consumer_i (const RtecEventChannelAdmin::ConsumerQOS& sub); 00138 00139 /// Create all connections to consumer ec and to supplier ec. 00140 void open_i (const RtecEventChannelAdmin::ConsumerQOS& sub); 00141 00142 /// Helper method to see if consumer ec is connected 00143 CORBA::Boolean is_consumer_ec_connected_i (void) const; 00144 00145 /// Push the @a event to the @a consumer. 00146 void push_to_consumer (RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer, 00147 const RtecEventComm::EventSet& event); 00148 00149 void cleanup_consumer_ec_i (void); 00150 00151 void cleanup_supplier_ec_i (void); 00152 00153 protected: 00154 /// Do the real work in init() 00155 int init_i (RtecEventChannelAdmin::EventChannel_ptr supplier_ec, 00156 RtecEventChannelAdmin::EventChannel_ptr consumer_ec); 00157 00158 protected: 00159 /// Lock to synchronize internal changes 00160 TAO_SYNCH_MUTEX lock_; 00161 00162 /// How many threads are running push() we cannot make changes until 00163 /// that reaches 0 00164 CORBA::ULong busy_count_; 00165 00166 /** 00167 * An update_consumer() message arrived *while* we were doing a 00168 * push() the modification is stored, if multiple update_consumer messages 00169 * arrive only the last one is executed. 00170 */ 00171 int update_posted_; 00172 RtecEventChannelAdmin::ConsumerQOS c_qos_; 00173 00174 /** 00175 * We have a cleanup outstanding and must wait doing cleanup until all pushes 00176 * are ready. 00177 */ 00178 int cleanup_posted_; 00179 00180 /** 00181 * Is the supplier ec suspended? 00182 */ 00183 int supplier_ec_suspended_; 00184 00185 /// The event channel acting as supplier for this gateway so we can reconnect 00186 /// when the list changes. 00187 RtecEventChannelAdmin::EventChannel_var supplier_ec_; 00188 00189 /// The event channel acting as consumer of this gateway 00190 RtecEventChannelAdmin::EventChannel_var consumer_ec_; 00191 00192 /// Our RT_Infos for the event channel that is the supplier. 00193 RtecBase::handle_t supplier_info_; 00194 /// Our RT_Infos for the event channel that is the consumer. 00195 RtecBase::handle_t consumer_info_; 00196 00197 /// Our consumer personality.... 00198 ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP> consumer_; 00199 00200 /// If it is true then we must deactivate the consumer 00201 bool consumer_is_active_; 00202 00203 /// Our supplier personality.... 00204 ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP> supplier_; 00205 00206 /// If it is true then we must deactivate the supplier 00207 bool supplier_is_active_; 00208 00209 // We use a different Consumer_Proxy 00210 typedef ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> Consumer_Map; 00211 typedef ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> Consumer_Map_Iterator; 00212 00213 /// We talk to the EC (as a supplier) using either an per-supplier 00214 /// proxy or a generic proxy for the type only subscriptions. We push the 00215 /// events to these proxies 00216 Consumer_Map consumer_proxy_map_; 00217 RtecEventChannelAdmin::ProxyPushConsumer_var default_consumer_proxy_; 00218 00219 /// We talk to the EC (as a consumer) using this proxy. We receive the events 00220 /// from these proxy 00221 RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_; 00222 00223 /// The consumer ec control which controls the behaviour in case of a 00224 /// misbehaving consumer ec 00225 TAO_ECG_ConsumerEC_Control* ec_control_; 00226 00227 /// The Gateway IIOP Factory for all the settings 00228 TAO_EC_Gateway_IIOP_Factory* factory_; 00229 00230 /// If 1, we use the TTL flags, if 0, we just ignore TTL 00231 int use_ttl_; 00232 00233 /// The flag for using the consumer proxy map. With 1 the consumer proxy map 00234 /// is used, meaning that for each unique source id we use a different 00235 /// proxy push consumer, if 0, we only use one proxy push consumer (the 00236 /// default) for all source ids. 00237 int use_consumer_proxy_map_; 00238 00239 }; 00240 00241 TAO_END_VERSIONED_NAMESPACE_DECL 00242 00243 #include /**/ "ace/post.h" 00244 00245 #endif /* ACE_EC_GATEWAY_IIOP_H */