#include <ECG_UDP_Sender.h>
Inheritance diagram for TAO_ECG_UDP_Sender:
[NOHEADER] | |
~TAO_ECG_UDP_Sender (void) | |
void | init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec, RtecUDPAdmin::AddrServer_ptr addr_server, TAO_ECG_Refcounted_Endpoint endpoint_rptr) |
void | connect (const RtecEventChannelAdmin::ConsumerQOS &sub) |
Connect or reconnect to the EC with the given subscriptions. | |
void | shutdown () |
TAO_EC_Servant_Var< TAO_ECG_UDP_Sender > | create (CORBA::Boolean crc=0) |
Public Member Functions | |
int | mtu (CORBA::ULong mtu) |
CORBA::ULong | mtu (void) const |
int | get_local_addr (ACE_INET_Addr &addr) |
Get the local endpoint used to send the events. | |
virtual void | disconnect_push_consumer () throw (CORBA::SystemException) |
virtual void | push (const RtecEventComm::EventSet &events) throw (CORBA::SystemException) |
Protected Member Functions | |
TAO_ECG_UDP_Sender (CORBA::Boolean crc=0) | |
Private Types | |
typedef TAO_EC_Auto_Command< TAO_ECG_UDP_Sender_Disconnect_Command > | ECG_Sender_Auto_Proxy_Disconnect |
Private Member Functions | |
void | new_connect (const RtecEventChannelAdmin::ConsumerQOS &sub) |
void | reconnect (const RtecEventChannelAdmin::ConsumerQOS &sub) |
Private Attributes | |
RtecEventChannelAdmin::ProxyPushSupplier_var | supplier_proxy_ |
Proxy used to receive events from the Event Channel. | |
RtecEventChannelAdmin::EventChannel_var | lcl_ec_ |
Event Channel to which we act as a consumer. | |
RtecUDPAdmin::AddrServer_var | addr_server_ |
We query this object to determine where the events should be sent. | |
TAO_ECG_CDR_Message_Sender | cdr_sender_ |
Helper for fragmenting and sending cdr-encoded events using udp. | |
ECG_Sender_Auto_Proxy_Disconnect | auto_proxy_disconnect_ |
Manages our connection to Supplier Proxy. |
Definition at line 103 of file ECG_UDP_Sender.h.
|
Definition at line 218 of file ECG_UDP_Sender.h. Referenced by new_connect(). |
|
Definition at line 28 of file ECG_UDP_Sender.cpp.
00029 { 00030 } |
|
Constructor (protected). Clients can create new TAO_ECG_UDP_Sender objects using the static create() method. |
|
Connect or reconnect to the EC with the given subscriptions. NOTE: if we are already connected to EC and a reconnection is necessary, the EC must have reconnects enabled in order for this method to succeed. Definition at line 63 of file ECG_UDP_Sender.cpp. References ACE_CHECK, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_THROW, CORBA::is_nil(), LM_ERROR, new_connect(), and reconnect().
00065 { 00066 if (CORBA::is_nil (this->lcl_ec_.in ())) 00067 { 00068 ACE_ERROR ((LM_ERROR, "Error initializing TAO_ECG_UDP_Sender: " 00069 "init() has not been called before connect().")); 00070 ACE_THROW (CORBA::INTERNAL ()); 00071 } 00072 00073 if (sub.dependencies.length () == 0) 00074 { 00075 ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::connect(): " 00076 "0-length subscriptions argument.")); 00077 ACE_THROW (CORBA::INTERNAL ()); 00078 } 00079 00080 if (CORBA::is_nil (this->supplier_proxy_.in ())) 00081 { 00082 this->new_connect (sub ACE_ENV_ARG_PARAMETER); 00083 ACE_CHECK; 00084 } 00085 else 00086 { 00087 this->reconnect (sub ACE_ENV_ARG_PARAMETER); 00088 ACE_CHECK; 00089 } 00090 } |
|
Create a new TAO_ECG_UDP_Sender object. (Constructor access is restricted to insure that all TAO_ECG_UDP_Sender objects are heap-allocated.) |
|
Invokes shutdown (), which may result in the object being deleted, if refcounting is used to manage its lifetime. Definition at line 156 of file ECG_UDP_Sender.cpp. References ACE_ENV_SINGLE_ARG_PARAMETER.
00158 { 00159 // Prevent attempts to disconnect. 00160 this->auto_proxy_disconnect_.disallow_command (); 00161 00162 this->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER); 00163 } |
|
Get the local endpoint used to send the events.
|
|
Definition at line 33 of file ECG_UDP_Sender.cpp. References ACE_CHECK, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_THROW, cdr_sender_, TAO_ECG_CDR_Message_Sender::init(), CORBA::is_nil(), LM_ERROR, and TAO_ECG_Refcounted_Endpoint.
00037 { 00038 if (CORBA::is_nil (lcl_ec)) 00039 { 00040 ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::init(): " 00041 "<lcl_ec> argument is nil.")); 00042 ACE_THROW (CORBA::INTERNAL ()); 00043 } 00044 00045 if (CORBA::is_nil (addr_server)) 00046 { 00047 ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::init(): " 00048 "address server argument is nil.")); 00049 ACE_THROW (CORBA::INTERNAL ()); 00050 } 00051 00052 this->cdr_sender_.init (endpoint_rptr ACE_ENV_ARG_PARAMETER); 00053 ACE_CHECK; 00054 00055 this->lcl_ec_ = 00056 RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec); 00057 00058 this->addr_server_ = 00059 RtecUDPAdmin::AddrServer::_duplicate (addr_server); 00060 } |
|
The sender may need to fragment the message, otherwise the network may drop the packets. Setting the MTU can fail if the value is too small (at least the header + 8 bytes must fit). |
|
The sender may need to fragment the message, otherwise the network may drop the packets. Setting the MTU can fail if the value is too small (at least the header + 8 bytes must fit). |
|
Definition at line 93 of file ECG_UDP_Sender.cpp. References ACE_CHECK, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, activate(), ECG_Sender_Auto_Proxy_Disconnect, TAO_EC_Auto_Command< T >::set_command(), and TAO_EC_Deactivated_Object::set_deactivator(). Referenced by connect().
00095 { 00096 // Activate with poa. 00097 RtecEventComm::PushConsumer_var consumer_ref; 00098 PortableServer::POA_var poa = this->_default_POA (); 00099 00100 TAO_EC_Object_Deactivator deactivator; 00101 activate (consumer_ref, 00102 poa.in (), 00103 this, 00104 deactivator 00105 ACE_ENV_ARG_PARAMETER); 00106 ACE_CHECK; 00107 00108 // Connect as a consumer to the local EC. 00109 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = 00110 this->lcl_ec_->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER); 00111 ACE_CHECK; 00112 00113 RtecEventChannelAdmin::ProxyPushSupplier_var proxy = 00114 consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); 00115 ACE_CHECK; 00116 ECG_Sender_Auto_Proxy_Disconnect new_proxy_disconnect (proxy.in ()); 00117 00118 proxy->connect_push_consumer (consumer_ref.in (), 00119 sub 00120 ACE_ENV_ARG_PARAMETER); 00121 ACE_CHECK; 00122 00123 // Update resource managers. 00124 this->supplier_proxy_ = proxy._retn (); 00125 this->auto_proxy_disconnect_.set_command (new_proxy_disconnect); 00126 this->set_deactivator (deactivator); 00127 } |
|
Invokes shutdown (), which may result in the object being deleted, if refcounting is used to manage its lifetime. Definition at line 182 of file ECG_UDP_Sender.cpp. References ACE_CHECK, ACE_ENV_ARG_PARAMETER, ACE_THROW, RtecEventComm::Event::data, RtecEventComm::EventSet, RtecEventComm::Event::header, RtecUDPAdmin::UDP_Addr::ipaddr, RtecUDPAdmin::UDP_Addr::port, RtecEventComm::EventHeader::ttl, and ACE_OutputCDR::write_ulong().
00185 { 00186 if (events.length () == 0) 00187 { 00188 // ACE_DEBUG ((EC_FORMAT (DEBUG, 00189 // "Nothing to multicast: " 00190 // "0-length EventSet."))); 00191 return; 00192 } 00193 00194 // Send each event in a separate message. 00195 // @@ TODO It is interesting to group events destined to the 00196 // same mcast group in a single message. 00197 for (u_int i = 0; i < events.length (); ++i) 00198 { 00199 // To avoid loops we keep a TTL field on the events and skip the 00200 // events with TTL <= 0 00201 if (events[i].header.ttl <= 0) 00202 continue; 00203 00204 const RtecEventComm::Event& e = events[i]; 00205 00206 // We need to modify the TTL field, but copying the entire event 00207 // would be wasteful; instead we create a new header and only 00208 // modify the header portion. 00209 RtecEventComm::EventHeader header = e.header; 00210 header.ttl--; 00211 00212 // Start building the message 00213 TAO_OutputCDR cdr; 00214 00215 // Marshal as if it was a sequence of one element, notice how we 00216 // marshal a modified version of the header, but the payload is 00217 // marshal without any extra copies. 00218 cdr.write_ulong (1); 00219 if (!(cdr << header) 00220 || !(cdr << e.data)) 00221 ACE_THROW (CORBA::MARSHAL ()); 00222 00223 // Grab the right mcast group for this event... 00224 RtecUDPAdmin::UDP_Addr udp_addr; 00225 this->addr_server_->get_addr (header, udp_addr ACE_ENV_ARG_PARAMETER); 00226 ACE_CHECK; 00227 00228 ACE_INET_Addr inet_addr (udp_addr.port, 00229 udp_addr.ipaddr); 00230 00231 this->cdr_sender_.send_message (cdr, inet_addr ACE_ENV_ARG_PARAMETER); 00232 ACE_CHECK; 00233 } 00234 } |
|
Definition at line 130 of file ECG_UDP_Sender.cpp. References ACE_CHECK, ACE_ENV_ARG_PARAMETER, ACE_THROW, and CORBA::is_nil(). Referenced by connect().
00132 { 00133 // Obtain our object reference from the POA. 00134 RtecEventComm::PushConsumer_var consumer_ref; 00135 PortableServer::POA_var poa = this->_default_POA (); 00136 00137 CORBA::Object_var obj = poa->servant_to_reference (this ACE_ENV_ARG_PARAMETER); 00138 ACE_CHECK; 00139 consumer_ref = 00140 RtecEventComm::PushConsumer::_narrow (obj.in () ACE_ENV_ARG_PARAMETER); 00141 ACE_CHECK; 00142 00143 if (CORBA::is_nil (consumer_ref.in ())) 00144 { 00145 ACE_THROW (CORBA::INTERNAL ()); 00146 } 00147 00148 // Reconnect. 00149 this->supplier_proxy_->connect_push_consumer (consumer_ref.in (), 00150 sub 00151 ACE_ENV_ARG_PARAMETER); 00152 ACE_CHECK; 00153 } |
|
Calling this method may result in decrementing of the reference count (due to deactivation) and deletion of the object. Definition at line 166 of file ECG_UDP_Sender.cpp. References ACE_CHECK, ACE_ENV_SINGLE_ARG_PARAMETER, cdr_sender_, TAO_EC_Object_Deactivator::deactivate(), TAO_EC_Auto_Command< T >::execute(), and TAO_ECG_CDR_Message_Sender::shutdown().
00167 { 00168 this->supplier_proxy_ = 00169 RtecEventChannelAdmin::ProxyPushSupplier::_nil (); 00170 00171 this->auto_proxy_disconnect_.execute (); 00172 00173 this->addr_server_ = RtecUDPAdmin::AddrServer::_nil (); 00174 this->lcl_ec_ = RtecEventChannelAdmin::EventChannel::_nil (); 00175 00176 this->deactivator_.deactivate (); 00177 this->cdr_sender_.shutdown (ACE_ENV_SINGLE_ARG_PARAMETER); 00178 ACE_CHECK; 00179 } |
|
We query this object to determine where the events should be sent.
Definition at line 212 of file ECG_UDP_Sender.h. |
|
Manages our connection to Supplier Proxy.
Definition at line 220 of file ECG_UDP_Sender.h. |
|
Helper for fragmenting and sending cdr-encoded events using udp.
Definition at line 215 of file ECG_UDP_Sender.h. Referenced by init(), and shutdown(). |
|
Event Channel to which we act as a consumer.
Definition at line 209 of file ECG_UDP_Sender.h. |
|
Proxy used to receive events from the Event Channel.
Definition at line 206 of file ECG_UDP_Sender.h. |