EC_Gateway_IIOP.h

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 
00003 /**
00004  *  @file   EC_Gateway_IIOP.h
00005  *
00006  *  EC_Gateway_IIOP.h,v 1.23 2006/03/15 07:52:22 jtc Exp
00007  *
00008  *  @author Carlos O'Ryan (coryan@cs.wustl.edu)
00009  *  @author Johnny Willemsen  (jwillemsen@remedy.nl)
00010  *
00011  * Based on previous work by Tim Harrison (harrison@cs.wustl.edu) and
00012  * other members of the DOC group. More details can be found in:
00013  *
00014  * http://doc.ece.uci.edu/~coryan/EC/index.html
00015  */
00016 
00017 #ifndef TAO_EC_GATEWAY_IIOP_H
00018 #define TAO_EC_GATEWAY_IIOP_H
00019 
00020 #include /**/ "ace/pre.h"
00021 
00022 #include /**/ "orbsvcs/Event/event_serv_export.h"
00023 #include "orbsvcs/Event/EC_Gateway.h"
00024 
00025 #include "orbsvcs/RtecEventChannelAdminS.h"
00026 #include "orbsvcs/RtecEventCommS.h"
00027 #include "orbsvcs/Channel_Clients_T.h"
00028 
00029 #include "ace/Map_Manager.h"
00030 #include "ace/Null_Mutex.h"
00031 
00032 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00033 
00034 class TAO_ECG_ConsumerEC_Control;
00035 class TAO_EC_Gateway_IIOP_Factory;
00036 
00037 /**
00038  * @class TAO_EC_Gateway_IIOP
00039  *
00040  * @brief Event Channel Gateway using IIOP.
00041  *
00042  * This class mediates among two event channels, it connects as a consumer of
00043  * events with a remote event channel, and as a supplier of events with the
00044  * local EC. As a consumer it gives a QoS designed to only accept the events
00045  * in which *local* consumers are interested. Eventually the local EC should
00046  * create this object and compute its QoS in an automated manner; but this
00047  * requires some way to filter out the peers registered as consumers,
00048  * otherwise we will get loops in the QoS graph.
00049  * It uses exactly the same set of events in the publications list
00050  * when connected as a supplier.
00051  *
00052  * @note
00053  * An alternative implementation would be to register with the
00054  * remote EC as a supplier, and then filter on the remote EC, but
00055  * one of the objectives is to minimize network traffic.
00056  * On the other hand the events will be pushed to remote consumers,
00057  * event though they will be dropped upon receipt (due to the TTL
00058  * field); IMHO this is another suggestion that the EC needs to know
00059  * (somehow) which consumers are truly its peers in disguise.
00060  *
00061  * @todo: The class makes an extra copy of the events, we need to
00062  * investigate if closer collaboration with its collocated EC could
00063  * be used to remove that copy.
00064  */
00065 class TAO_RTEvent_Serv_Export TAO_EC_Gateway_IIOP : public TAO_EC_Gateway
00066 {
00067 public:
00068   TAO_EC_Gateway_IIOP (void);
00069   virtual ~TAO_EC_Gateway_IIOP (void);
00070 
00071   /**
00072    * To do its job this class requires to know the local and remote ECs it will
00073    * connect to.
00074    * @return 0 in case of success, -1 in case of failure
00075    */
00076   int init (RtecEventChannelAdmin::EventChannel_ptr supplier_ec,
00077             RtecEventChannelAdmin::EventChannel_ptr consumer_ec
00078             ACE_ENV_ARG_DECL);
00079 
00080   /// The channel is disconnecting.
00081   void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS);
00082 
00083   /// The channel is disconnecting.
00084   void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS);
00085 
00086   /// This is the consumer side behavior, it pushes the events to the
00087   /// local event channel.
00088   void push (const RtecEventComm::EventSet &events
00089              ACE_ENV_ARG_DECL_WITH_DEFAULTS);
00090 
00091   /// Disconnect and shutdown the gateway
00092   int shutdown (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS);
00093 
00094   // The following methods are documented in the base class.
00095   virtual void close (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS);
00096   virtual void update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub
00097                                 ACE_ENV_ARG_DECL_WITH_DEFAULTS)
00098       ACE_THROW_SPEC ((CORBA::SystemException));
00099   virtual void update_supplier (const RtecEventChannelAdmin::SupplierQOS& pub
00100                                 ACE_ENV_ARG_DECL_WITH_DEFAULTS)
00101       ACE_THROW_SPEC ((CORBA::SystemException));
00102 
00103   // Let the gateway reconnect itself to the consumer ec given exisiting QoS
00104   void reconnect_consumer_ec(ACE_ENV_SINGLE_ARG_DECL);
00105 
00106   /// Check whether the consumer event channel is non existent or not
00107   CORBA::Boolean consumer_ec_non_existent (CORBA::Boolean_out disconnected
00108                                            ACE_ENV_ARG_DECL);
00109 
00110   /**
00111    * Cleanup all consumer proxies we have without trying to tell the
00112    * consumer that we are going to disconnect. This can be used to cleanup
00113    * the consumer proxy administration in case we know that the consumers
00114    * are all unreachable.
00115    */
00116   void cleanup_consumer_proxies (ACE_ENV_SINGLE_ARG_DECL);
00117 
00118   /// Cleanup the connection to the consumer ec. Doesn't call anything on the
00119   /// ec again, just set the object to nil
00120   int cleanup_consumer_ec (void);
00121 
00122   /// Cleanup the connection to the supplier ec. Doesn't call anything on the
00123   /// ec again, just set the object to nil
00124   int cleanup_supplier_ec (void);
00125 
00126   /// Suspend the connection to the supplier ec
00127   void suspend_supplier_ec (ACE_ENV_SINGLE_ARG_DECL);
00128 
00129   /// Resume the connection to the supplier ec
00130   void resume_supplier_ec (ACE_ENV_SINGLE_ARG_DECL);
00131 
00132 private:
00133   void close_i (ACE_ENV_SINGLE_ARG_DECL);
00134 
00135   /// Disconnect the supplier proxy
00136   void disconnect_supplier_proxy_i (ACE_ENV_SINGLE_ARG_DECL);
00137 
00138   /// Disconnect all consumer proxies
00139   void disconnect_consumer_proxies_i (ACE_ENV_SINGLE_ARG_DECL);
00140 
00141   /// Remove all consumer proxies without calling disconnect on them
00142   void cleanup_consumer_proxies_i (ACE_ENV_SINGLE_ARG_DECL);
00143 
00144   void update_consumer_i (const RtecEventChannelAdmin::ConsumerQOS& sub
00145                           ACE_ENV_ARG_DECL);
00146 
00147   /// Create all connections to consumer ec and to supplier ec.
00148   void open_i (const RtecEventChannelAdmin::ConsumerQOS& sub
00149                ACE_ENV_ARG_DECL);
00150 
00151   /// Helper method to see if consumer ec is connected
00152   CORBA::Boolean is_consumer_ec_connected_i (void) const;
00153 
00154   /// Push the @a event to the @a consumer.
00155   void push_to_consumer (RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer,
00156                          const RtecEventComm::EventSet& event ACE_ENV_ARG_DECL);
00157 
00158   void cleanup_consumer_ec_i (void);
00159 
00160   void cleanup_supplier_ec_i (void);
00161 
00162 protected:
00163   /// Do the real work in init()
00164   int init_i (RtecEventChannelAdmin::EventChannel_ptr supplier_ec,
00165               RtecEventChannelAdmin::EventChannel_ptr consumer_ec
00166               ACE_ENV_ARG_DECL);
00167 
00168 protected:
00169   /// Lock to synchronize internal changes
00170   TAO_SYNCH_MUTEX lock_;
00171 
00172   /// How many threads are running push() we cannot make changes until
00173   /// that reaches 0
00174   CORBA::ULong busy_count_;
00175 
00176   /**
00177    * An update_consumer() message arrived *while* we were doing a
00178    * push() the modification is stored, if multiple update_consumer messages
00179    * arrive only the last one is executed.
00180    */
00181   int update_posted_;
00182   RtecEventChannelAdmin::ConsumerQOS c_qos_;
00183 
00184   /**
00185    * We have a cleanup outstanding and must wait doing cleanup until all pushes
00186    * are ready.
00187    */
00188   int cleanup_posted_;
00189 
00190   /**
00191    * Is the supplier ec suspended?
00192    */
00193   int supplier_ec_suspended_;
00194 
00195   /// The event channel acting as supplier for this gateway so we can reconnect
00196   /// when the list changes.
00197   RtecEventChannelAdmin::EventChannel_var supplier_ec_;
00198 
00199   /// The event channel acting as consumer of this gateway
00200   RtecEventChannelAdmin::EventChannel_var consumer_ec_;
00201 
00202   /// Our RT_Infos for the event channel that is the supplier.
00203   RtecBase::handle_t supplier_info_;
00204   /// Our RT_Infos for the event channel that is the consumer.
00205   RtecBase::handle_t consumer_info_;
00206 
00207   /// Our consumer personality....
00208   ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP> consumer_;
00209 
00210   /// If it is not 0 then we must deactivate the consumer
00211   int consumer_is_active_;
00212 
00213   /// Our supplier personality....
00214   ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP> supplier_;
00215 
00216   /// If it is not 0 then we must deactivate the supplier
00217   int supplier_is_active_;
00218 
00219   // We use a different Consumer_Proxy
00220   typedef ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> Consumer_Map;
00221   typedef ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> Consumer_Map_Iterator;
00222 
00223   /// We talk to the EC (as a supplier) using either an per-supplier
00224   /// proxy or a generic proxy for the type only subscriptions. We push the
00225   /// events to these proxies
00226   Consumer_Map consumer_proxy_map_;
00227   RtecEventChannelAdmin::ProxyPushConsumer_var default_consumer_proxy_;
00228 
00229   /// We talk to the EC (as a consumer) using this proxy. We receive the events
00230   /// from these proxy
00231   RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_;
00232 
00233   /// The consumer ec control which controls the behaviour in case of a
00234   /// misbehaving consumer ec
00235   TAO_ECG_ConsumerEC_Control* ec_control_;
00236 
00237   /// The Gateway IIOP Factory for all the settings
00238   TAO_EC_Gateway_IIOP_Factory* factory_;
00239 
00240   /// If 1, we use the TTL flags, if 0, we just ignore TTL
00241   int use_ttl_;
00242 
00243   /// The flag for using the consumer proxy map. With 1 the consumer proxy map
00244   /// is used, meaning that for each unique source id we use a different
00245   /// proxy push consumer, if 0, we only use one proxy push consumer (the
00246   /// default) for all source ids.
00247   int use_consumer_proxy_map_;
00248 
00249 };
00250 
00251 TAO_END_VERSIONED_NAMESPACE_DECL
00252 
00253 #include /**/ "ace/post.h"
00254 
00255 #endif /* ACE_EC_GATEWAY_IIOP_H */

Generated on Thu Nov 9 13:11:07 2006 for TAO_RTEvent by doxygen 1.3.6