00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012 #include "orbsvcs/Event/ECG_UDP_Sender.h"
00013 #include "orbsvcs/Event_Utilities.h"
00014 #include "tao/CDR.h"
00015
00016 #if !defined(__ACE_INLINE__)
00017 #include "orbsvcs/Event/ECG_UDP_Sender.inl"
00018 #endif
00019
00020 ACE_RCSID (Event,
00021 ECG_UDP_Sender,
00022 "ECG_UDP_Sender.cpp,v 1.16 2006/03/14 06:14:25 jtc Exp")
00023
00024
00025
00026 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00027
00028 TAO_ECG_UDP_Sender::~TAO_ECG_UDP_Sender (void)
00029 {
00030 }
00031
00032 void
00033 TAO_ECG_UDP_Sender::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
00034 RtecUDPAdmin::AddrServer_ptr addr_server,
00035 TAO_ECG_Refcounted_Endpoint endpoint_rptr
00036 ACE_ENV_ARG_DECL)
00037 {
00038 if (CORBA::is_nil (lcl_ec))
00039 {
00040 ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::init(): "
00041 "<lcl_ec> argument is nil."));
00042 ACE_THROW (CORBA::INTERNAL ());
00043 }
00044
00045 if (CORBA::is_nil (addr_server))
00046 {
00047 ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::init(): "
00048 "address server argument is nil."));
00049 ACE_THROW (CORBA::INTERNAL ());
00050 }
00051
00052 this->cdr_sender_.init (endpoint_rptr ACE_ENV_ARG_PARAMETER);
00053 ACE_CHECK;
00054
00055 this->lcl_ec_ =
00056 RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec);
00057
00058 this->addr_server_ =
00059 RtecUDPAdmin::AddrServer::_duplicate (addr_server);
00060 }
00061
00062 void
00063 TAO_ECG_UDP_Sender::connect (const RtecEventChannelAdmin::ConsumerQOS& sub
00064 ACE_ENV_ARG_DECL)
00065 {
00066 if (CORBA::is_nil (this->lcl_ec_.in ()))
00067 {
00068 ACE_ERROR ((LM_ERROR, "Error initializing TAO_ECG_UDP_Sender: "
00069 "init() has not been called before connect()."));
00070 ACE_THROW (CORBA::INTERNAL ());
00071 }
00072
00073 if (sub.dependencies.length () == 0)
00074 {
00075 ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::connect(): "
00076 "0-length subscriptions argument."));
00077 ACE_THROW (CORBA::INTERNAL ());
00078 }
00079
00080 if (CORBA::is_nil (this->supplier_proxy_.in ()))
00081 {
00082 this->new_connect (sub ACE_ENV_ARG_PARAMETER);
00083 ACE_CHECK;
00084 }
00085 else
00086 {
00087 this->reconnect (sub ACE_ENV_ARG_PARAMETER);
00088 ACE_CHECK;
00089 }
00090 }
00091
00092 void
00093 TAO_ECG_UDP_Sender::new_connect (const RtecEventChannelAdmin::ConsumerQOS& sub
00094 ACE_ENV_ARG_DECL)
00095 {
00096
00097 RtecEventComm::PushConsumer_var consumer_ref;
00098 PortableServer::POA_var poa = this->_default_POA ();
00099
00100 TAO_EC_Object_Deactivator deactivator;
00101 activate (consumer_ref,
00102 poa.in (),
00103 this,
00104 deactivator
00105 ACE_ENV_ARG_PARAMETER);
00106 ACE_CHECK;
00107
00108
00109 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
00110 this->lcl_ec_->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
00111 ACE_CHECK;
00112
00113 RtecEventChannelAdmin::ProxyPushSupplier_var proxy =
00114 consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00115 ACE_CHECK;
00116 ECG_Sender_Auto_Proxy_Disconnect new_proxy_disconnect (proxy.in ());
00117
00118 proxy->connect_push_consumer (consumer_ref.in (),
00119 sub
00120 ACE_ENV_ARG_PARAMETER);
00121 ACE_CHECK;
00122
00123
00124 this->supplier_proxy_ = proxy._retn ();
00125 this->auto_proxy_disconnect_.set_command (new_proxy_disconnect);
00126 this->set_deactivator (deactivator);
00127 }
00128
00129 void
00130 TAO_ECG_UDP_Sender::reconnect (const RtecEventChannelAdmin::ConsumerQOS& sub
00131 ACE_ENV_ARG_DECL)
00132 {
00133
00134 RtecEventComm::PushConsumer_var consumer_ref;
00135 PortableServer::POA_var poa = this->_default_POA ();
00136
00137 CORBA::Object_var obj = poa->servant_to_reference (this ACE_ENV_ARG_PARAMETER);
00138 ACE_CHECK;
00139 consumer_ref =
00140 RtecEventComm::PushConsumer::_narrow (obj.in () ACE_ENV_ARG_PARAMETER);
00141 ACE_CHECK;
00142
00143 if (CORBA::is_nil (consumer_ref.in ()))
00144 {
00145 ACE_THROW (CORBA::INTERNAL ());
00146 }
00147
00148
00149 this->supplier_proxy_->connect_push_consumer (consumer_ref.in (),
00150 sub
00151 ACE_ENV_ARG_PARAMETER);
00152 ACE_CHECK;
00153 }
00154
00155 void
00156 TAO_ECG_UDP_Sender::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL)
00157 ACE_THROW_SPEC ((CORBA::SystemException))
00158 {
00159
00160 this->auto_proxy_disconnect_.disallow_command ();
00161
00162 this->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
00163 }
00164
00165 void
00166 TAO_ECG_UDP_Sender::shutdown (ACE_ENV_SINGLE_ARG_DECL)
00167 {
00168 this->supplier_proxy_ =
00169 RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
00170
00171 this->auto_proxy_disconnect_.execute ();
00172
00173 this->addr_server_ = RtecUDPAdmin::AddrServer::_nil ();
00174 this->lcl_ec_ = RtecEventChannelAdmin::EventChannel::_nil ();
00175
00176 this->deactivator_.deactivate ();
00177 this->cdr_sender_.shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
00178 ACE_CHECK;
00179 }
00180
00181 void
00182 TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events
00183 ACE_ENV_ARG_DECL)
00184 ACE_THROW_SPEC ((CORBA::SystemException))
00185 {
00186 if (events.length () == 0)
00187 {
00188
00189
00190
00191 return;
00192 }
00193
00194
00195
00196
00197 for (u_int i = 0; i < events.length (); ++i)
00198 {
00199
00200
00201 if (events[i].header.ttl <= 0)
00202 continue;
00203
00204 const RtecEventComm::Event& e = events[i];
00205
00206
00207
00208
00209 RtecEventComm::EventHeader header = e.header;
00210 header.ttl--;
00211
00212
00213 TAO_OutputCDR cdr;
00214
00215
00216
00217
00218 cdr.write_ulong (1);
00219 if (!(cdr << header)
00220 || !(cdr << e.data))
00221 ACE_THROW (CORBA::MARSHAL ());
00222
00223
00224 RtecUDPAdmin::UDP_Addr udp_addr;
00225 this->addr_server_->get_addr (header, udp_addr ACE_ENV_ARG_PARAMETER);
00226 ACE_CHECK;
00227
00228 ACE_INET_Addr inet_addr (udp_addr.port,
00229 udp_addr.ipaddr);
00230
00231 this->cdr_sender_.send_message (cdr, inet_addr ACE_ENV_ARG_PARAMETER);
00232 ACE_CHECK;
00233 }
00234 }
00235
00236 TAO_END_VERSIONED_NAMESPACE_DECL