#include <ECG_UDP_Sender.h>
Inheritance diagram for TAO_ECG_UDP_Sender:


[NOHEADER] | |
| ~TAO_ECG_UDP_Sender (void) | |
| void | init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec, RtecUDPAdmin::AddrServer_ptr addr_server, TAO_ECG_Refcounted_Endpoint endpoint_rptr) |
| void | connect (const RtecEventChannelAdmin::ConsumerQOS &sub) |
| Connect or reconnect to the EC with the given subscriptions. | |
| void | shutdown () |
| TAO_EC_Servant_Var< TAO_ECG_UDP_Sender > | create (CORBA::Boolean crc=0) |
Public Member Functions | |
| int | mtu (CORBA::ULong mtu) |
| CORBA::ULong | mtu (void) const |
| int | get_local_addr (ACE_INET_Addr &addr) |
| Get the local endpoint used to send the events. | |
| virtual void | disconnect_push_consumer () throw (CORBA::SystemException) |
| virtual void | push (const RtecEventComm::EventSet &events) throw (CORBA::SystemException) |
Protected Member Functions | |
| TAO_ECG_UDP_Sender (CORBA::Boolean crc=0) | |
Private Types | |
| typedef TAO_EC_Auto_Command< TAO_ECG_UDP_Sender_Disconnect_Command > | ECG_Sender_Auto_Proxy_Disconnect |
Private Member Functions | |
| void | new_connect (const RtecEventChannelAdmin::ConsumerQOS &sub) |
| void | reconnect (const RtecEventChannelAdmin::ConsumerQOS &sub) |
Private Attributes | |
| RtecEventChannelAdmin::ProxyPushSupplier_var | supplier_proxy_ |
| Proxy used to receive events from the Event Channel. | |
| RtecEventChannelAdmin::EventChannel_var | lcl_ec_ |
| Event Channel to which we act as a consumer. | |
| RtecUDPAdmin::AddrServer_var | addr_server_ |
| We query this object to determine where the events should be sent. | |
| TAO_ECG_CDR_Message_Sender | cdr_sender_ |
| Helper for fragmenting and sending cdr-encoded events using udp. | |
| ECG_Sender_Auto_Proxy_Disconnect | auto_proxy_disconnect_ |
| Manages our connection to Supplier Proxy. | |
Definition at line 103 of file ECG_UDP_Sender.h.
|
|
Definition at line 218 of file ECG_UDP_Sender.h. Referenced by new_connect(). |
|
|
Definition at line 28 of file ECG_UDP_Sender.cpp.
00029 {
00030 }
|
|
|
Constructor (protected). Clients can create new TAO_ECG_UDP_Sender objects using the static create() method. |
|
|
Connect or reconnect to the EC with the given subscriptions. NOTE: if we are already connected to EC and a reconnection is necessary, the EC must have reconnects enabled in order for this method to succeed. Definition at line 63 of file ECG_UDP_Sender.cpp. References ACE_CHECK, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_THROW, CORBA::is_nil(), LM_ERROR, new_connect(), and reconnect().
00065 {
00066 if (CORBA::is_nil (this->lcl_ec_.in ()))
00067 {
00068 ACE_ERROR ((LM_ERROR, "Error initializing TAO_ECG_UDP_Sender: "
00069 "init() has not been called before connect()."));
00070 ACE_THROW (CORBA::INTERNAL ());
00071 }
00072
00073 if (sub.dependencies.length () == 0)
00074 {
00075 ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::connect(): "
00076 "0-length subscriptions argument."));
00077 ACE_THROW (CORBA::INTERNAL ());
00078 }
00079
00080 if (CORBA::is_nil (this->supplier_proxy_.in ()))
00081 {
00082 this->new_connect (sub ACE_ENV_ARG_PARAMETER);
00083 ACE_CHECK;
00084 }
00085 else
00086 {
00087 this->reconnect (sub ACE_ENV_ARG_PARAMETER);
00088 ACE_CHECK;
00089 }
00090 }
|
|
|
Create a new TAO_ECG_UDP_Sender object. (Constructor access is restricted to insure that all TAO_ECG_UDP_Sender objects are heap-allocated.) |
|
|
Invokes shutdown (), which may result in the object being deleted, if refcounting is used to manage its lifetime. Definition at line 156 of file ECG_UDP_Sender.cpp. References ACE_ENV_SINGLE_ARG_PARAMETER.
00158 {
00159 // Prevent attempts to disconnect.
00160 this->auto_proxy_disconnect_.disallow_command ();
00161
00162 this->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
00163 }
|
|
|
Get the local endpoint used to send the events.
|
|
||||||||||||||||
|
Definition at line 33 of file ECG_UDP_Sender.cpp. References ACE_CHECK, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_THROW, cdr_sender_, TAO_ECG_CDR_Message_Sender::init(), CORBA::is_nil(), LM_ERROR, and TAO_ECG_Refcounted_Endpoint.
00037 {
00038 if (CORBA::is_nil (lcl_ec))
00039 {
00040 ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::init(): "
00041 "<lcl_ec> argument is nil."));
00042 ACE_THROW (CORBA::INTERNAL ());
00043 }
00044
00045 if (CORBA::is_nil (addr_server))
00046 {
00047 ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::init(): "
00048 "address server argument is nil."));
00049 ACE_THROW (CORBA::INTERNAL ());
00050 }
00051
00052 this->cdr_sender_.init (endpoint_rptr ACE_ENV_ARG_PARAMETER);
00053 ACE_CHECK;
00054
00055 this->lcl_ec_ =
00056 RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec);
00057
00058 this->addr_server_ =
00059 RtecUDPAdmin::AddrServer::_duplicate (addr_server);
00060 }
|
|
|
The sender may need to fragment the message, otherwise the network may drop the packets. Setting the MTU can fail if the value is too small (at least the header + 8 bytes must fit). |
|
|
The sender may need to fragment the message, otherwise the network may drop the packets. Setting the MTU can fail if the value is too small (at least the header + 8 bytes must fit). |
|
|
Definition at line 93 of file ECG_UDP_Sender.cpp. References ACE_CHECK, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, activate(), ECG_Sender_Auto_Proxy_Disconnect, TAO_EC_Auto_Command< T >::set_command(), and TAO_EC_Deactivated_Object::set_deactivator(). Referenced by connect().
00095 {
00096 // Activate with poa.
00097 RtecEventComm::PushConsumer_var consumer_ref;
00098 PortableServer::POA_var poa = this->_default_POA ();
00099
00100 TAO_EC_Object_Deactivator deactivator;
00101 activate (consumer_ref,
00102 poa.in (),
00103 this,
00104 deactivator
00105 ACE_ENV_ARG_PARAMETER);
00106 ACE_CHECK;
00107
00108 // Connect as a consumer to the local EC.
00109 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
00110 this->lcl_ec_->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
00111 ACE_CHECK;
00112
00113 RtecEventChannelAdmin::ProxyPushSupplier_var proxy =
00114 consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00115 ACE_CHECK;
00116 ECG_Sender_Auto_Proxy_Disconnect new_proxy_disconnect (proxy.in ());
00117
00118 proxy->connect_push_consumer (consumer_ref.in (),
00119 sub
00120 ACE_ENV_ARG_PARAMETER);
00121 ACE_CHECK;
00122
00123 // Update resource managers.
00124 this->supplier_proxy_ = proxy._retn ();
00125 this->auto_proxy_disconnect_.set_command (new_proxy_disconnect);
00126 this->set_deactivator (deactivator);
00127 }
|
|
|
Invokes shutdown (), which may result in the object being deleted, if refcounting is used to manage its lifetime. Definition at line 182 of file ECG_UDP_Sender.cpp. References ACE_CHECK, ACE_ENV_ARG_PARAMETER, ACE_THROW, RtecEventComm::Event::data, RtecEventComm::EventSet, RtecEventComm::Event::header, RtecUDPAdmin::UDP_Addr::ipaddr, RtecUDPAdmin::UDP_Addr::port, RtecEventComm::EventHeader::ttl, and ACE_OutputCDR::write_ulong().
00185 {
00186 if (events.length () == 0)
00187 {
00188 // ACE_DEBUG ((EC_FORMAT (DEBUG,
00189 // "Nothing to multicast: "
00190 // "0-length EventSet.")));
00191 return;
00192 }
00193
00194 // Send each event in a separate message.
00195 // @@ TODO It is interesting to group events destined to the
00196 // same mcast group in a single message.
00197 for (u_int i = 0; i < events.length (); ++i)
00198 {
00199 // To avoid loops we keep a TTL field on the events and skip the
00200 // events with TTL <= 0
00201 if (events[i].header.ttl <= 0)
00202 continue;
00203
00204 const RtecEventComm::Event& e = events[i];
00205
00206 // We need to modify the TTL field, but copying the entire event
00207 // would be wasteful; instead we create a new header and only
00208 // modify the header portion.
00209 RtecEventComm::EventHeader header = e.header;
00210 header.ttl--;
00211
00212 // Start building the message
00213 TAO_OutputCDR cdr;
00214
00215 // Marshal as if it was a sequence of one element, notice how we
00216 // marshal a modified version of the header, but the payload is
00217 // marshal without any extra copies.
00218 cdr.write_ulong (1);
00219 if (!(cdr << header)
00220 || !(cdr << e.data))
00221 ACE_THROW (CORBA::MARSHAL ());
00222
00223 // Grab the right mcast group for this event...
00224 RtecUDPAdmin::UDP_Addr udp_addr;
00225 this->addr_server_->get_addr (header, udp_addr ACE_ENV_ARG_PARAMETER);
00226 ACE_CHECK;
00227
00228 ACE_INET_Addr inet_addr (udp_addr.port,
00229 udp_addr.ipaddr);
00230
00231 this->cdr_sender_.send_message (cdr, inet_addr ACE_ENV_ARG_PARAMETER);
00232 ACE_CHECK;
00233 }
00234 }
|
|
|
Definition at line 130 of file ECG_UDP_Sender.cpp. References ACE_CHECK, ACE_ENV_ARG_PARAMETER, ACE_THROW, and CORBA::is_nil(). Referenced by connect().
00132 {
00133 // Obtain our object reference from the POA.
00134 RtecEventComm::PushConsumer_var consumer_ref;
00135 PortableServer::POA_var poa = this->_default_POA ();
00136
00137 CORBA::Object_var obj = poa->servant_to_reference (this ACE_ENV_ARG_PARAMETER);
00138 ACE_CHECK;
00139 consumer_ref =
00140 RtecEventComm::PushConsumer::_narrow (obj.in () ACE_ENV_ARG_PARAMETER);
00141 ACE_CHECK;
00142
00143 if (CORBA::is_nil (consumer_ref.in ()))
00144 {
00145 ACE_THROW (CORBA::INTERNAL ());
00146 }
00147
00148 // Reconnect.
00149 this->supplier_proxy_->connect_push_consumer (consumer_ref.in (),
00150 sub
00151 ACE_ENV_ARG_PARAMETER);
00152 ACE_CHECK;
00153 }
|
|
|
Calling this method may result in decrementing of the reference count (due to deactivation) and deletion of the object. Definition at line 166 of file ECG_UDP_Sender.cpp. References ACE_CHECK, ACE_ENV_SINGLE_ARG_PARAMETER, cdr_sender_, TAO_EC_Object_Deactivator::deactivate(), TAO_EC_Auto_Command< T >::execute(), and TAO_ECG_CDR_Message_Sender::shutdown().
00167 {
00168 this->supplier_proxy_ =
00169 RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
00170
00171 this->auto_proxy_disconnect_.execute ();
00172
00173 this->addr_server_ = RtecUDPAdmin::AddrServer::_nil ();
00174 this->lcl_ec_ = RtecEventChannelAdmin::EventChannel::_nil ();
00175
00176 this->deactivator_.deactivate ();
00177 this->cdr_sender_.shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
00178 ACE_CHECK;
00179 }
|
|
|
We query this object to determine where the events should be sent.
Definition at line 212 of file ECG_UDP_Sender.h. |
|
|
Manages our connection to Supplier Proxy.
Definition at line 220 of file ECG_UDP_Sender.h. |
|
|
Helper for fragmenting and sending cdr-encoded events using udp.
Definition at line 215 of file ECG_UDP_Sender.h. Referenced by init(), and shutdown(). |
|
|
Event Channel to which we act as a consumer.
Definition at line 209 of file ECG_UDP_Sender.h. |
|
|
Proxy used to receive events from the Event Channel.
Definition at line 206 of file ECG_UDP_Sender.h. |
1.3.6