ECG_UDP_Sender.cpp

Go to the documentation of this file.
00001 /**
00002  * @file ECG_UDP_Sender.cpp
00003  *
00004  * $Id: ECG_UDP_Sender.cpp 78820 2007-07-07 20:06:46Z sowayaa $
00005  *
00006  * @author Carlos O'Ryan <coryan@uci.edu>
00007  *
00008  * http://doc.ece.uci.edu/~coryan/EC/index.html
00009  *
00010  */
00011 
00012 #include "orbsvcs/Event/ECG_UDP_Sender.h"
00013 #include "orbsvcs/Event_Utilities.h"
00014 #include "tao/CDR.h"
00015 
00016 #if !defined(__ACE_INLINE__)
00017 #include "orbsvcs/Event/ECG_UDP_Sender.inl"
00018 #endif /* __ACE_INLINE__ */
00019 
00020 ACE_RCSID (Event,
00021            ECG_UDP_Sender,
00022            "$Id: ECG_UDP_Sender.cpp 78820 2007-07-07 20:06:46Z sowayaa $")
00023 
00024 // ****************************************************************
00025 
00026 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00027 
00028 TAO_ECG_UDP_Sender::~TAO_ECG_UDP_Sender (void)
00029 {
00030 }
00031 
00032 void
00033 TAO_ECG_UDP_Sender::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
00034                           RtecUDPAdmin::AddrServer_ptr addr_server,
00035                           TAO_ECG_Refcounted_Endpoint endpoint_rptr)
00036 {
00037   if (CORBA::is_nil (lcl_ec))
00038     {
00039       ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::init(): "
00040                             "<lcl_ec> argument is nil."));
00041       throw CORBA::INTERNAL ();
00042     }
00043 
00044   if (CORBA::is_nil (addr_server))
00045     {
00046       ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::init(): "
00047                             "address server argument is nil."));
00048       throw CORBA::INTERNAL ();
00049     }
00050 
00051   this->cdr_sender_.init (endpoint_rptr);
00052 
00053   this->lcl_ec_ =
00054     RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec);
00055 
00056   this->addr_server_ =
00057     RtecUDPAdmin::AddrServer::_duplicate (addr_server);
00058 }
00059 
00060 void
00061 TAO_ECG_UDP_Sender::connect (const RtecEventChannelAdmin::ConsumerQOS& sub)
00062 {
00063   if (CORBA::is_nil (this->lcl_ec_.in ()))
00064     {
00065       //FUZZ: disable check_for_lack_ACE_OS
00066       ACE_ERROR ((LM_ERROR, "Error initializing TAO_ECG_UDP_Sender: "
00067                             "init() has not been called before connect()."));
00068       //FUZZ: enable check_for_lack_ACE_OS
00069 
00070       throw CORBA::INTERNAL ();
00071     }
00072 
00073   if (sub.dependencies.length () == 0)
00074     {
00075       ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::connect(): "
00076                             "0-length subscriptions argument."));
00077       throw CORBA::INTERNAL ();
00078     }
00079 
00080   if (CORBA::is_nil (this->supplier_proxy_.in ()))
00081     {
00082       this->new_connect (sub);
00083     }
00084   else
00085     {
00086       this->reconnect (sub);
00087     }
00088 }
00089 
00090 void
00091 TAO_ECG_UDP_Sender::new_connect (const RtecEventChannelAdmin::ConsumerQOS& sub)
00092 {
00093   // Activate with poa.
00094   RtecEventComm::PushConsumer_var consumer_ref;
00095   PortableServer::POA_var poa = this->_default_POA ();
00096 
00097   TAO_EC_Object_Deactivator deactivator;
00098   activate (consumer_ref,
00099             poa.in (),
00100             this,
00101             deactivator);
00102 
00103   // Connect as a consumer to the local EC.
00104   RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
00105     this->lcl_ec_->for_consumers ();
00106 
00107   RtecEventChannelAdmin::ProxyPushSupplier_var proxy =
00108     consumer_admin->obtain_push_supplier ();
00109   ECG_Sender_Auto_Proxy_Disconnect new_proxy_disconnect (proxy.in ());
00110 
00111   proxy->connect_push_consumer (consumer_ref.in (),
00112                                 sub);
00113 
00114   // Update resource managers.
00115   this->supplier_proxy_ = proxy._retn ();
00116   this->auto_proxy_disconnect_.set_command (new_proxy_disconnect);
00117   this->set_deactivator (deactivator);
00118 }
00119 
00120 void
00121 TAO_ECG_UDP_Sender::reconnect (const RtecEventChannelAdmin::ConsumerQOS& sub)
00122 {
00123   // Obtain our object reference from the POA.
00124   RtecEventComm::PushConsumer_var consumer_ref;
00125   PortableServer::POA_var poa = this->_default_POA ();
00126 
00127   CORBA::Object_var obj = poa->servant_to_reference (this);
00128   consumer_ref =
00129     RtecEventComm::PushConsumer::_narrow (obj.in ());
00130 
00131   if (CORBA::is_nil (consumer_ref.in ()))
00132     {
00133       throw CORBA::INTERNAL ();
00134     }
00135 
00136   // Reconnect.
00137   this->supplier_proxy_->connect_push_consumer (consumer_ref.in (),
00138                                                 sub);
00139 }
00140 
00141 void
00142 TAO_ECG_UDP_Sender::disconnect_push_consumer (void)
00143 {
00144   // Prevent attempts to disconnect.
00145   this->auto_proxy_disconnect_.disallow_command ();
00146 
00147   this->shutdown ();
00148 }
00149 
00150 void
00151 TAO_ECG_UDP_Sender::shutdown (void)
00152 {
00153   this->supplier_proxy_ =
00154     RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
00155 
00156   this->auto_proxy_disconnect_.execute ();
00157 
00158   this->addr_server_ = RtecUDPAdmin::AddrServer::_nil ();
00159   this->lcl_ec_ = RtecEventChannelAdmin::EventChannel::_nil ();
00160 
00161   this->deactivator_.deactivate ();
00162   this->cdr_sender_.shutdown ();
00163 }
00164 
00165 void
00166 TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events)
00167 {
00168   if (events.length () == 0)
00169     {
00170       // ACE_DEBUG ((EC_FORMAT (DEBUG,
00171       //                        "Nothing to multicast: "
00172       //                        "0-length EventSet.")));
00173       return;
00174     }
00175 
00176   // Send each event in a separate message.
00177   // @@ TODO It is interesting to group events destined to the
00178   // same mcast group in a single message.
00179   for (u_int i = 0; i < events.length (); ++i)
00180     {
00181       // To avoid loops we keep a TTL field on the events and skip the
00182       // events with TTL <= 0
00183       if (events[i].header.ttl <= 0)
00184         continue;
00185 
00186       const RtecEventComm::Event& e = events[i];
00187 
00188       // We need to modify the TTL field, but copying the entire event
00189       // would be wasteful; instead we create a new header and only
00190       // modify the header portion.
00191       RtecEventComm::EventHeader header = e.header;
00192       header.ttl--;
00193 
00194       // Start building the message
00195       TAO_OutputCDR cdr;
00196 
00197       // Marshal as if it was a sequence of one element, notice how we
00198       // marshal a modified version of the header, but the payload is
00199       // marshal without any extra copies.
00200       cdr.write_ulong (1);
00201       if (!(cdr << header)
00202           || !(cdr << e.data))
00203         throw CORBA::MARSHAL ();
00204 
00205       ACE_INET_Addr inet_addr;
00206       try
00207         {
00208           // Grab the right mcast group for this event...
00209           RtecUDPAdmin::UDP_Address_var udp_addr;
00210 
00211           this->addr_server_->get_address (header, udp_addr.out());
00212           switch (udp_addr->_d())
00213             {
00214             case RtecUDPAdmin::Rtec_inet:
00215               inet_addr.set(udp_addr->v4_addr().port,
00216                             udp_addr->v4_addr().ipaddr);
00217               break;
00218             case RtecUDPAdmin::Rtec_inet6:
00219 #if defined (ACE_HAS_IPV6)
00220               inet_addr.set_type(PF_INET6);
00221 #endif
00222               inet_addr.set_address(udp_addr->v6_addr().ipaddr,16,0);
00223               inet_addr.set_port_number(udp_addr->v6_addr().port);
00224               break;
00225             }
00226         }
00227       catch (const ::CORBA::BAD_OPERATION &)
00228         {
00229           // server only supports IPv4
00230            // Grab the right mcast group for this event...
00231           RtecUDPAdmin::UDP_Addr udp_addr;
00232           this->addr_server_->get_addr (header, udp_addr);
00233           inet_addr.set (udp_addr.port, udp_addr.ipaddr);
00234         }
00235 
00236       this->cdr_sender_.send_message (cdr, inet_addr);
00237     }
00238 }
00239 
00240 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:44:06 2010 for TAO_RTEvent by  doxygen 1.4.7