#include <ECG_UDP_Sender.h>
Inheritance diagram for TAO_ECG_UDP_Sender:
[NOHEADER] | |
~TAO_ECG_UDP_Sender (void) | |
void | init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec, RtecUDPAdmin::AddrServer_ptr addr_server, TAO_ECG_Refcounted_Endpoint endpoint_rptr) |
void | connect (const RtecEventChannelAdmin::ConsumerQOS &sub) |
Connect or reconnect to the EC with the given subscriptions. | |
void | shutdown (void) |
TAO_EC_Servant_Var< TAO_ECG_UDP_Sender > | create (CORBA::Boolean crc=0) |
Public Member Functions | |
int | mtu (CORBA::ULong mtu) |
CORBA::ULong | mtu (void) const |
int | get_local_addr (ACE_INET_Addr &addr) |
Get the local endpoint used to send the events. | |
virtual void | disconnect_push_consumer (void) |
virtual void | push (const RtecEventComm::EventSet &events) |
Protected Member Functions | |
TAO_ECG_UDP_Sender (CORBA::Boolean crc=0) | |
Private Types | |
typedef TAO_EC_Auto_Command< TAO_ECG_UDP_Sender_Disconnect_Command > | ECG_Sender_Auto_Proxy_Disconnect |
Private Member Functions | |
void | new_connect (const RtecEventChannelAdmin::ConsumerQOS &sub) |
void | reconnect (const RtecEventChannelAdmin::ConsumerQOS &sub) |
Private Attributes | |
RtecEventChannelAdmin::ProxyPushSupplier_var | supplier_proxy_ |
Proxy used to receive events from the Event Channel. | |
RtecEventChannelAdmin::EventChannel_var | lcl_ec_ |
Event Channel to which we act as a consumer. | |
RtecUDPAdmin::AddrServer_var | addr_server_ |
We query this object to determine where the events should be sent. | |
TAO_ECG_CDR_Message_Sender | cdr_sender_ |
Helper for fragmenting and sending cdr-encoded events using udp. | |
ECG_Sender_Auto_Proxy_Disconnect | auto_proxy_disconnect_ |
Manages our connection to Supplier Proxy. |
Definition at line 103 of file ECG_UDP_Sender.h.
|
Definition at line 211 of file ECG_UDP_Sender.h. Referenced by new_connect(). |
|
Definition at line 28 of file ECG_UDP_Sender.cpp.
00029 { 00030 } |
|
Constructor (protected). Clients can create new TAO_ECG_UDP_Sender objects using the static create() method. |
|
Connect or reconnect to the EC with the given subscriptions. NOTE: if we are already connected to EC and a reconnection is necessary, the EC must have reconnects enabled in order for this method to succeed. Definition at line 61 of file ECG_UDP_Sender.cpp. References ACE_ERROR, RtecEventChannelAdmin::ConsumerQOS::dependencies, CORBA::is_nil(), LM_ERROR, new_connect(), and reconnect().
00062 { 00063 if (CORBA::is_nil (this->lcl_ec_.in ())) 00064 { 00065 //FUZZ: disable check_for_lack_ACE_OS 00066 ACE_ERROR ((LM_ERROR, "Error initializing TAO_ECG_UDP_Sender: " 00067 "init() has not been called before connect().")); 00068 //FUZZ: enable check_for_lack_ACE_OS 00069 00070 throw CORBA::INTERNAL (); 00071 } 00072 00073 if (sub.dependencies.length () == 0) 00074 { 00075 ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::connect(): " 00076 "0-length subscriptions argument.")); 00077 throw CORBA::INTERNAL (); 00078 } 00079 00080 if (CORBA::is_nil (this->supplier_proxy_.in ())) 00081 { 00082 this->new_connect (sub); 00083 } 00084 else 00085 { 00086 this->reconnect (sub); 00087 } 00088 } |
|
Create a new TAO_ECG_UDP_Sender object. (Constructor access is restricted to insure that all TAO_ECG_UDP_Sender objects are heap-allocated.) |
|
Invokes shutdown (), which may result in the object being deleted, if refcounting is used to manage its lifetime. Definition at line 142 of file ECG_UDP_Sender.cpp. References TAO_EC_Auto_Command< T >::disallow_command(), and shutdown().
00143 { 00144 // Prevent attempts to disconnect. 00145 this->auto_proxy_disconnect_.disallow_command (); 00146 00147 this->shutdown (); 00148 } |
|
Get the local endpoint used to send the events.
|
|
Definition at line 33 of file ECG_UDP_Sender.cpp. References ACE_ERROR, cdr_sender_, TAO_ECG_CDR_Message_Sender::init(), CORBA::is_nil(), LM_ERROR, and TAO_ECG_Refcounted_Endpoint.
00036 { 00037 if (CORBA::is_nil (lcl_ec)) 00038 { 00039 ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::init(): " 00040 "<lcl_ec> argument is nil.")); 00041 throw CORBA::INTERNAL (); 00042 } 00043 00044 if (CORBA::is_nil (addr_server)) 00045 { 00046 ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::init(): " 00047 "address server argument is nil.")); 00048 throw CORBA::INTERNAL (); 00049 } 00050 00051 this->cdr_sender_.init (endpoint_rptr); 00052 00053 this->lcl_ec_ = 00054 RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec); 00055 00056 this->addr_server_ = 00057 RtecUDPAdmin::AddrServer::_duplicate (addr_server); 00058 } |
|
The sender may need to fragment the message, otherwise the network may drop the packets. Setting the MTU can fail if the value is too small (at least the header + 8 bytes must fit). |
|
The sender may need to fragment the message, otherwise the network may drop the packets. Setting the MTU can fail if the value is too small (at least the header + 8 bytes must fit). |
|
Definition at line 91 of file ECG_UDP_Sender.cpp. References activate(), ECG_Sender_Auto_Proxy_Disconnect, TAO_EC_Auto_Command< T >::set_command(), and TAO_EC_Deactivated_Object::set_deactivator(). Referenced by connect().
00092 { 00093 // Activate with poa. 00094 RtecEventComm::PushConsumer_var consumer_ref; 00095 PortableServer::POA_var poa = this->_default_POA (); 00096 00097 TAO_EC_Object_Deactivator deactivator; 00098 activate (consumer_ref, 00099 poa.in (), 00100 this, 00101 deactivator); 00102 00103 // Connect as a consumer to the local EC. 00104 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = 00105 this->lcl_ec_->for_consumers (); 00106 00107 RtecEventChannelAdmin::ProxyPushSupplier_var proxy = 00108 consumer_admin->obtain_push_supplier (); 00109 ECG_Sender_Auto_Proxy_Disconnect new_proxy_disconnect (proxy.in ()); 00110 00111 proxy->connect_push_consumer (consumer_ref.in (), 00112 sub); 00113 00114 // Update resource managers. 00115 this->supplier_proxy_ = proxy._retn (); 00116 this->auto_proxy_disconnect_.set_command (new_proxy_disconnect); 00117 this->set_deactivator (deactivator); 00118 } |
|
Invokes shutdown (), which may result in the object being deleted, if refcounting is used to manage its lifetime. Definition at line 166 of file ECG_UDP_Sender.cpp. References cdr_sender_, RtecEventComm::Event::data, RtecEventComm::EventSet, RtecEventComm::Event::header, RtecUDPAdmin::UDP_Addr::ipaddr, RtecUDPAdmin::UDP_Addr::port, TAO_ECG_CDR_Message_Sender::send_message(), RtecEventComm::EventHeader::ttl, and ACE_OutputCDR::write_ulong().
00167 { 00168 if (events.length () == 0) 00169 { 00170 // ACE_DEBUG ((EC_FORMAT (DEBUG, 00171 // "Nothing to multicast: " 00172 // "0-length EventSet."))); 00173 return; 00174 } 00175 00176 // Send each event in a separate message. 00177 // @@ TODO It is interesting to group events destined to the 00178 // same mcast group in a single message. 00179 for (u_int i = 0; i < events.length (); ++i) 00180 { 00181 // To avoid loops we keep a TTL field on the events and skip the 00182 // events with TTL <= 0 00183 if (events[i].header.ttl <= 0) 00184 continue; 00185 00186 const RtecEventComm::Event& e = events[i]; 00187 00188 // We need to modify the TTL field, but copying the entire event 00189 // would be wasteful; instead we create a new header and only 00190 // modify the header portion. 00191 RtecEventComm::EventHeader header = e.header; 00192 header.ttl--; 00193 00194 // Start building the message 00195 TAO_OutputCDR cdr; 00196 00197 // Marshal as if it was a sequence of one element, notice how we 00198 // marshal a modified version of the header, but the payload is 00199 // marshal without any extra copies. 00200 cdr.write_ulong (1); 00201 if (!(cdr << header) 00202 || !(cdr << e.data)) 00203 throw CORBA::MARSHAL (); 00204 00205 ACE_INET_Addr inet_addr; 00206 try 00207 { 00208 // Grab the right mcast group for this event... 00209 RtecUDPAdmin::UDP_Address_var udp_addr; 00210 00211 this->addr_server_->get_address (header, udp_addr.out()); 00212 switch (udp_addr->_d()) 00213 { 00214 case RtecUDPAdmin::Rtec_inet: 00215 inet_addr.set(udp_addr->v4_addr().port, 00216 udp_addr->v4_addr().ipaddr); 00217 break; 00218 case RtecUDPAdmin::Rtec_inet6: 00219 #if defined (ACE_HAS_IPV6) 00220 inet_addr.set_type(PF_INET6); 00221 #endif 00222 inet_addr.set_address(udp_addr->v6_addr().ipaddr,16,0); 00223 inet_addr.set_port_number(udp_addr->v6_addr().port); 00224 break; 00225 } 00226 } 00227 catch (const ::CORBA::BAD_OPERATION &) 00228 { 00229 // server only supports IPv4 00230 // Grab the right mcast group for this event... 00231 RtecUDPAdmin::UDP_Addr udp_addr; 00232 this->addr_server_->get_addr (header, udp_addr); 00233 inet_addr.set (udp_addr.port, udp_addr.ipaddr); 00234 } 00235 00236 this->cdr_sender_.send_message (cdr, inet_addr); 00237 } 00238 } |
|
Definition at line 121 of file ECG_UDP_Sender.cpp. References CORBA::is_nil(). Referenced by connect().
00122 { 00123 // Obtain our object reference from the POA. 00124 RtecEventComm::PushConsumer_var consumer_ref; 00125 PortableServer::POA_var poa = this->_default_POA (); 00126 00127 CORBA::Object_var obj = poa->servant_to_reference (this); 00128 consumer_ref = 00129 RtecEventComm::PushConsumer::_narrow (obj.in ()); 00130 00131 if (CORBA::is_nil (consumer_ref.in ())) 00132 { 00133 throw CORBA::INTERNAL (); 00134 } 00135 00136 // Reconnect. 00137 this->supplier_proxy_->connect_push_consumer (consumer_ref.in (), 00138 sub); 00139 } |
|
Calling this method may result in decrementing of the reference count (due to deactivation) and deletion of the object. Definition at line 151 of file ECG_UDP_Sender.cpp. References cdr_sender_, TAO_EC_Object_Deactivator::deactivate(), TAO_EC_Auto_Command< T >::execute(), and TAO_ECG_CDR_Message_Sender::shutdown(). Referenced by disconnect_push_consumer().
00152 { 00153 this->supplier_proxy_ = 00154 RtecEventChannelAdmin::ProxyPushSupplier::_nil (); 00155 00156 this->auto_proxy_disconnect_.execute (); 00157 00158 this->addr_server_ = RtecUDPAdmin::AddrServer::_nil (); 00159 this->lcl_ec_ = RtecEventChannelAdmin::EventChannel::_nil (); 00160 00161 this->deactivator_.deactivate (); 00162 this->cdr_sender_.shutdown (); 00163 } |
|
We query this object to determine where the events should be sent.
Definition at line 205 of file ECG_UDP_Sender.h. |
|
Manages our connection to Supplier Proxy.
Definition at line 213 of file ECG_UDP_Sender.h. |
|
Helper for fragmenting and sending cdr-encoded events using udp.
Definition at line 208 of file ECG_UDP_Sender.h. Referenced by init(), push(), and shutdown(). |
|
Event Channel to which we act as a consumer.
Definition at line 202 of file ECG_UDP_Sender.h. |
|
Proxy used to receive events from the Event Channel.
Definition at line 199 of file ECG_UDP_Sender.h. |