00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012 #include "orbsvcs/Event/ECG_UDP_Sender.h"
00013 #include "orbsvcs/Event_Utilities.h"
00014 #include "tao/CDR.h"
00015
00016 #if !defined(__ACE_INLINE__)
00017 #include "orbsvcs/Event/ECG_UDP_Sender.inl"
00018 #endif
00019
00020 ACE_RCSID (Event,
00021 ECG_UDP_Sender,
00022 "$Id: ECG_UDP_Sender.cpp 78820 2007-07-07 20:06:46Z sowayaa $")
00023
00024
00025
00026 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00027
00028 TAO_ECG_UDP_Sender::~TAO_ECG_UDP_Sender (void)
00029 {
00030 }
00031
00032 void
00033 TAO_ECG_UDP_Sender::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
00034 RtecUDPAdmin::AddrServer_ptr addr_server,
00035 TAO_ECG_Refcounted_Endpoint endpoint_rptr)
00036 {
00037 if (CORBA::is_nil (lcl_ec))
00038 {
00039 ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::init(): "
00040 "<lcl_ec> argument is nil."));
00041 throw CORBA::INTERNAL ();
00042 }
00043
00044 if (CORBA::is_nil (addr_server))
00045 {
00046 ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::init(): "
00047 "address server argument is nil."));
00048 throw CORBA::INTERNAL ();
00049 }
00050
00051 this->cdr_sender_.init (endpoint_rptr);
00052
00053 this->lcl_ec_ =
00054 RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec);
00055
00056 this->addr_server_ =
00057 RtecUDPAdmin::AddrServer::_duplicate (addr_server);
00058 }
00059
00060 void
00061 TAO_ECG_UDP_Sender::connect (const RtecEventChannelAdmin::ConsumerQOS& sub)
00062 {
00063 if (CORBA::is_nil (this->lcl_ec_.in ()))
00064 {
00065
00066 ACE_ERROR ((LM_ERROR, "Error initializing TAO_ECG_UDP_Sender: "
00067 "init() has not been called before connect()."));
00068
00069
00070 throw CORBA::INTERNAL ();
00071 }
00072
00073 if (sub.dependencies.length () == 0)
00074 {
00075 ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::connect(): "
00076 "0-length subscriptions argument."));
00077 throw CORBA::INTERNAL ();
00078 }
00079
00080 if (CORBA::is_nil (this->supplier_proxy_.in ()))
00081 {
00082 this->new_connect (sub);
00083 }
00084 else
00085 {
00086 this->reconnect (sub);
00087 }
00088 }
00089
00090 void
00091 TAO_ECG_UDP_Sender::new_connect (const RtecEventChannelAdmin::ConsumerQOS& sub)
00092 {
00093
00094 RtecEventComm::PushConsumer_var consumer_ref;
00095 PortableServer::POA_var poa = this->_default_POA ();
00096
00097 TAO_EC_Object_Deactivator deactivator;
00098 activate (consumer_ref,
00099 poa.in (),
00100 this,
00101 deactivator);
00102
00103
00104 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
00105 this->lcl_ec_->for_consumers ();
00106
00107 RtecEventChannelAdmin::ProxyPushSupplier_var proxy =
00108 consumer_admin->obtain_push_supplier ();
00109 ECG_Sender_Auto_Proxy_Disconnect new_proxy_disconnect (proxy.in ());
00110
00111 proxy->connect_push_consumer (consumer_ref.in (),
00112 sub);
00113
00114
00115 this->supplier_proxy_ = proxy._retn ();
00116 this->auto_proxy_disconnect_.set_command (new_proxy_disconnect);
00117 this->set_deactivator (deactivator);
00118 }
00119
00120 void
00121 TAO_ECG_UDP_Sender::reconnect (const RtecEventChannelAdmin::ConsumerQOS& sub)
00122 {
00123
00124 RtecEventComm::PushConsumer_var consumer_ref;
00125 PortableServer::POA_var poa = this->_default_POA ();
00126
00127 CORBA::Object_var obj = poa->servant_to_reference (this);
00128 consumer_ref =
00129 RtecEventComm::PushConsumer::_narrow (obj.in ());
00130
00131 if (CORBA::is_nil (consumer_ref.in ()))
00132 {
00133 throw CORBA::INTERNAL ();
00134 }
00135
00136
00137 this->supplier_proxy_->connect_push_consumer (consumer_ref.in (),
00138 sub);
00139 }
00140
00141 void
00142 TAO_ECG_UDP_Sender::disconnect_push_consumer (void)
00143 {
00144
00145 this->auto_proxy_disconnect_.disallow_command ();
00146
00147 this->shutdown ();
00148 }
00149
00150 void
00151 TAO_ECG_UDP_Sender::shutdown (void)
00152 {
00153 this->supplier_proxy_ =
00154 RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
00155
00156 this->auto_proxy_disconnect_.execute ();
00157
00158 this->addr_server_ = RtecUDPAdmin::AddrServer::_nil ();
00159 this->lcl_ec_ = RtecEventChannelAdmin::EventChannel::_nil ();
00160
00161 this->deactivator_.deactivate ();
00162 this->cdr_sender_.shutdown ();
00163 }
00164
00165 void
00166 TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events)
00167 {
00168 if (events.length () == 0)
00169 {
00170
00171
00172
00173 return;
00174 }
00175
00176
00177
00178
00179 for (u_int i = 0; i < events.length (); ++i)
00180 {
00181
00182
00183 if (events[i].header.ttl <= 0)
00184 continue;
00185
00186 const RtecEventComm::Event& e = events[i];
00187
00188
00189
00190
00191 RtecEventComm::EventHeader header = e.header;
00192 header.ttl--;
00193
00194
00195 TAO_OutputCDR cdr;
00196
00197
00198
00199
00200 cdr.write_ulong (1);
00201 if (!(cdr << header)
00202 || !(cdr << e.data))
00203 throw CORBA::MARSHAL ();
00204
00205 ACE_INET_Addr inet_addr;
00206 try
00207 {
00208
00209 RtecUDPAdmin::UDP_Address_var udp_addr;
00210
00211 this->addr_server_->get_address (header, udp_addr.out());
00212 switch (udp_addr->_d())
00213 {
00214 case RtecUDPAdmin::Rtec_inet:
00215 inet_addr.set(udp_addr->v4_addr().port,
00216 udp_addr->v4_addr().ipaddr);
00217 break;
00218 case RtecUDPAdmin::Rtec_inet6:
00219 #if defined (ACE_HAS_IPV6)
00220 inet_addr.set_type(PF_INET6);
00221 #endif
00222 inet_addr.set_address(udp_addr->v6_addr().ipaddr,16,0);
00223 inet_addr.set_port_number(udp_addr->v6_addr().port);
00224 break;
00225 }
00226 }
00227 catch (const ::CORBA::BAD_OPERATION &)
00228 {
00229
00230
00231 RtecUDPAdmin::UDP_Addr udp_addr;
00232 this->addr_server_->get_addr (header, udp_addr);
00233 inet_addr.set (udp_addr.port, udp_addr.ipaddr);
00234 }
00235
00236 this->cdr_sender_.send_message (cdr, inet_addr);
00237 }
00238 }
00239
00240 TAO_END_VERSIONED_NAMESPACE_DECL