ECG_UDP_Sender.cpp

Go to the documentation of this file.
00001 /**
00002  * @file ECG_UDP_Sender.cpp
00003  *
00004  * ECG_UDP_Sender.cpp,v 1.16 2006/03/14 06:14:25 jtc Exp
00005  *
00006  * @author Carlos O'Ryan <coryan@uci.edu>
00007  *
00008  * http://doc.ece.uci.edu/~coryan/EC/index.html
00009  *
00010  */
00011 
00012 #include "orbsvcs/Event/ECG_UDP_Sender.h"
00013 #include "orbsvcs/Event_Utilities.h"
00014 #include "tao/CDR.h"
00015 
00016 #if !defined(__ACE_INLINE__)
00017 #include "orbsvcs/Event/ECG_UDP_Sender.inl"
00018 #endif /* __ACE_INLINE__ */
00019 
00020 ACE_RCSID (Event,
00021            ECG_UDP_Sender,
00022            "ECG_UDP_Sender.cpp,v 1.16 2006/03/14 06:14:25 jtc Exp")
00023 
00024 // ****************************************************************
00025 
00026 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00027 
00028 TAO_ECG_UDP_Sender::~TAO_ECG_UDP_Sender (void)
00029 {
00030 }
00031 
00032 void
00033 TAO_ECG_UDP_Sender::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
00034                           RtecUDPAdmin::AddrServer_ptr addr_server,
00035                           TAO_ECG_Refcounted_Endpoint endpoint_rptr
00036                           ACE_ENV_ARG_DECL)
00037 {
00038   if (CORBA::is_nil (lcl_ec))
00039     {
00040       ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::init(): "
00041                             "<lcl_ec> argument is nil."));
00042       ACE_THROW (CORBA::INTERNAL ());
00043     }
00044 
00045   if (CORBA::is_nil (addr_server))
00046     {
00047       ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::init(): "
00048                             "address server argument is nil."));
00049       ACE_THROW (CORBA::INTERNAL ());
00050     }
00051 
00052   this->cdr_sender_.init (endpoint_rptr ACE_ENV_ARG_PARAMETER);
00053   ACE_CHECK;
00054 
00055   this->lcl_ec_ =
00056     RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec);
00057 
00058   this->addr_server_ =
00059     RtecUDPAdmin::AddrServer::_duplicate (addr_server);
00060 }
00061 
00062 void
00063 TAO_ECG_UDP_Sender::connect (const RtecEventChannelAdmin::ConsumerQOS& sub
00064                              ACE_ENV_ARG_DECL)
00065 {
00066   if (CORBA::is_nil (this->lcl_ec_.in ()))
00067     {
00068       ACE_ERROR ((LM_ERROR, "Error initializing TAO_ECG_UDP_Sender: "
00069                             "init() has not been called before connect()."));
00070       ACE_THROW (CORBA::INTERNAL ());
00071     }
00072 
00073   if (sub.dependencies.length () == 0)
00074     {
00075       ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::connect(): "
00076                             "0-length subscriptions argument."));
00077       ACE_THROW (CORBA::INTERNAL ());
00078     }
00079 
00080   if (CORBA::is_nil (this->supplier_proxy_.in ()))
00081     {
00082       this->new_connect (sub ACE_ENV_ARG_PARAMETER);
00083       ACE_CHECK;
00084     }
00085   else
00086     {
00087       this->reconnect (sub ACE_ENV_ARG_PARAMETER);
00088       ACE_CHECK;
00089     }
00090 }
00091 
00092 void
00093 TAO_ECG_UDP_Sender::new_connect (const RtecEventChannelAdmin::ConsumerQOS& sub
00094                                  ACE_ENV_ARG_DECL)
00095 {
00096   // Activate with poa.
00097   RtecEventComm::PushConsumer_var consumer_ref;
00098   PortableServer::POA_var poa = this->_default_POA ();
00099 
00100   TAO_EC_Object_Deactivator deactivator;
00101   activate (consumer_ref,
00102             poa.in (),
00103             this,
00104             deactivator
00105             ACE_ENV_ARG_PARAMETER);
00106   ACE_CHECK;
00107 
00108   // Connect as a consumer to the local EC.
00109   RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
00110     this->lcl_ec_->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
00111   ACE_CHECK;
00112 
00113   RtecEventChannelAdmin::ProxyPushSupplier_var proxy =
00114     consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00115   ACE_CHECK;
00116   ECG_Sender_Auto_Proxy_Disconnect new_proxy_disconnect (proxy.in ());
00117 
00118   proxy->connect_push_consumer (consumer_ref.in (),
00119                                 sub
00120                                 ACE_ENV_ARG_PARAMETER);
00121   ACE_CHECK;
00122 
00123   // Update resource managers.
00124   this->supplier_proxy_ = proxy._retn ();
00125   this->auto_proxy_disconnect_.set_command (new_proxy_disconnect);
00126   this->set_deactivator (deactivator);
00127 }
00128 
00129 void
00130 TAO_ECG_UDP_Sender::reconnect (const RtecEventChannelAdmin::ConsumerQOS& sub
00131                                ACE_ENV_ARG_DECL)
00132 {
00133   // Obtain our object reference from the POA.
00134   RtecEventComm::PushConsumer_var consumer_ref;
00135   PortableServer::POA_var poa = this->_default_POA ();
00136 
00137   CORBA::Object_var obj = poa->servant_to_reference (this ACE_ENV_ARG_PARAMETER);
00138   ACE_CHECK;
00139   consumer_ref =
00140     RtecEventComm::PushConsumer::_narrow (obj.in () ACE_ENV_ARG_PARAMETER);
00141   ACE_CHECK;
00142 
00143   if (CORBA::is_nil (consumer_ref.in ()))
00144     {
00145       ACE_THROW (CORBA::INTERNAL ());
00146     }
00147 
00148   // Reconnect.
00149   this->supplier_proxy_->connect_push_consumer (consumer_ref.in (),
00150                                                 sub
00151                                                 ACE_ENV_ARG_PARAMETER);
00152   ACE_CHECK;
00153 }
00154 
00155 void
00156 TAO_ECG_UDP_Sender::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL)
00157     ACE_THROW_SPEC ((CORBA::SystemException))
00158 {
00159   // Prevent attempts to disconnect.
00160   this->auto_proxy_disconnect_.disallow_command ();
00161 
00162   this->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
00163 }
00164 
00165 void
00166 TAO_ECG_UDP_Sender::shutdown (ACE_ENV_SINGLE_ARG_DECL)
00167 {
00168   this->supplier_proxy_ =
00169     RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
00170 
00171   this->auto_proxy_disconnect_.execute ();
00172 
00173   this->addr_server_ = RtecUDPAdmin::AddrServer::_nil ();
00174   this->lcl_ec_ = RtecEventChannelAdmin::EventChannel::_nil ();
00175 
00176   this->deactivator_.deactivate ();
00177   this->cdr_sender_.shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
00178   ACE_CHECK;
00179 }
00180 
00181 void
00182 TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events
00183                           ACE_ENV_ARG_DECL)
00184     ACE_THROW_SPEC ((CORBA::SystemException))
00185 {
00186   if (events.length () == 0)
00187     {
00188       // ACE_DEBUG ((EC_FORMAT (DEBUG,
00189       //                        "Nothing to multicast: "
00190       //                        "0-length EventSet.")));
00191       return;
00192     }
00193 
00194   // Send each event in a separate message.
00195   // @@ TODO It is interesting to group events destined to the
00196   // same mcast group in a single message.
00197   for (u_int i = 0; i < events.length (); ++i)
00198     {
00199       // To avoid loops we keep a TTL field on the events and skip the
00200       // events with TTL <= 0
00201       if (events[i].header.ttl <= 0)
00202         continue;
00203 
00204       const RtecEventComm::Event& e = events[i];
00205 
00206       // We need to modify the TTL field, but copying the entire event
00207       // would be wasteful; instead we create a new header and only
00208       // modify the header portion.
00209       RtecEventComm::EventHeader header = e.header;
00210       header.ttl--;
00211 
00212       // Start building the message
00213       TAO_OutputCDR cdr;
00214 
00215       // Marshal as if it was a sequence of one element, notice how we
00216       // marshal a modified version of the header, but the payload is
00217       // marshal without any extra copies.
00218       cdr.write_ulong (1);
00219       if (!(cdr << header)
00220           || !(cdr << e.data))
00221         ACE_THROW (CORBA::MARSHAL ());
00222 
00223       // Grab the right mcast group for this event...
00224       RtecUDPAdmin::UDP_Addr udp_addr;
00225       this->addr_server_->get_addr (header, udp_addr ACE_ENV_ARG_PARAMETER);
00226       ACE_CHECK;
00227 
00228       ACE_INET_Addr inet_addr (udp_addr.port,
00229                                udp_addr.ipaddr);
00230 
00231       this->cdr_sender_.send_message (cdr, inet_addr ACE_ENV_ARG_PARAMETER);
00232       ACE_CHECK;
00233     }
00234 }
00235 
00236 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:11:11 2006 for TAO_RTEvent by doxygen 1.3.6