TAO_ECG_UDP_Sender Class Reference

Send events received from a "local" EC using UDP. NOT THREAD-SAFE. This class connect as a consumer to an EventChannel and forwards the events it receives from that EC using UDP. More...

#include <ECG_UDP_Sender.h>

Inheritance diagram for TAO_ECG_UDP_Sender:

Inheritance graph
[legend]
Collaboration diagram for TAO_ECG_UDP_Sender:

Collaboration graph
[legend]
List of all members.

[NOHEADER]

 ~TAO_ECG_UDP_Sender (void)
void init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec, RtecUDPAdmin::AddrServer_ptr addr_server, TAO_ECG_Refcounted_Endpoint endpoint_rptr)
void connect (const RtecEventChannelAdmin::ConsumerQOS &sub)
 Connect or reconnect to the EC with the given subscriptions.

void shutdown (void)
TAO_EC_Servant_Var< TAO_ECG_UDP_Sendercreate (CORBA::Boolean crc=0)

Public Member Functions

int mtu (CORBA::ULong mtu)
CORBA::ULong mtu (void) const
int get_local_addr (ACE_INET_Addr &addr)
 Get the local endpoint used to send the events.

virtual void disconnect_push_consumer (void)
virtual void push (const RtecEventComm::EventSet &events)

Protected Member Functions

 TAO_ECG_UDP_Sender (CORBA::Boolean crc=0)

Private Types

typedef TAO_EC_Auto_Command<
TAO_ECG_UDP_Sender_Disconnect_Command
ECG_Sender_Auto_Proxy_Disconnect

Private Member Functions

void new_connect (const RtecEventChannelAdmin::ConsumerQOS &sub)
void reconnect (const RtecEventChannelAdmin::ConsumerQOS &sub)

Private Attributes

RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_
 Proxy used to receive events from the Event Channel.

RtecEventChannelAdmin::EventChannel_var lcl_ec_
 Event Channel to which we act as a consumer.

RtecUDPAdmin::AddrServer_var addr_server_
 We query this object to determine where the events should be sent.

TAO_ECG_CDR_Message_Sender cdr_sender_
 Helper for fragmenting and sending cdr-encoded events using udp.

ECG_Sender_Auto_Proxy_Disconnect auto_proxy_disconnect_
 Manages our connection to Supplier Proxy.


Detailed Description

Send events received from a "local" EC using UDP. NOT THREAD-SAFE. This class connect as a consumer to an EventChannel and forwards the events it receives from that EC using UDP.

Definition at line 103 of file ECG_UDP_Sender.h.


Member Typedef Documentation

typedef TAO_EC_Auto_Command<TAO_ECG_UDP_Sender_Disconnect_Command> TAO_ECG_UDP_Sender::ECG_Sender_Auto_Proxy_Disconnect [private]
 

Definition at line 211 of file ECG_UDP_Sender.h.

Referenced by new_connect().


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_ECG_UDP_Sender::~TAO_ECG_UDP_Sender void   ) 
 

Definition at line 28 of file ECG_UDP_Sender.cpp.

00029 {
00030 }

TAO_ECG_UDP_Sender::TAO_ECG_UDP_Sender CORBA::Boolean  crc = 0  )  [protected]
 

Constructor (protected). Clients can create new TAO_ECG_UDP_Sender objects using the static create() method.


Member Function Documentation

void TAO_ECG_UDP_Sender::connect const RtecEventChannelAdmin::ConsumerQOS sub  ) 
 

Connect or reconnect to the EC with the given subscriptions.

NOTE: if we are already connected to EC and a reconnection is necessary, the EC must have reconnects enabled in order for this method to succeed.

Definition at line 61 of file ECG_UDP_Sender.cpp.

References ACE_ERROR, RtecEventChannelAdmin::ConsumerQOS::dependencies, CORBA::is_nil(), LM_ERROR, new_connect(), and reconnect().

00062 {
00063   if (CORBA::is_nil (this->lcl_ec_.in ()))
00064     {
00065       //FUZZ: disable check_for_lack_ACE_OS
00066       ACE_ERROR ((LM_ERROR, "Error initializing TAO_ECG_UDP_Sender: "
00067                             "init() has not been called before connect()."));
00068       //FUZZ: enable check_for_lack_ACE_OS
00069 
00070       throw CORBA::INTERNAL ();
00071     }
00072 
00073   if (sub.dependencies.length () == 0)
00074     {
00075       ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::connect(): "
00076                             "0-length subscriptions argument."));
00077       throw CORBA::INTERNAL ();
00078     }
00079 
00080   if (CORBA::is_nil (this->supplier_proxy_.in ()))
00081     {
00082       this->new_connect (sub);
00083     }
00084   else
00085     {
00086       this->reconnect (sub);
00087     }
00088 }

TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> TAO_ECG_UDP_Sender::create CORBA::Boolean  crc = 0  )  [static]
 

Create a new TAO_ECG_UDP_Sender object. (Constructor access is restricted to insure that all TAO_ECG_UDP_Sender objects are heap-allocated.)

void TAO_ECG_UDP_Sender::disconnect_push_consumer void   )  [virtual]
 

Invokes shutdown (), which may result in the object being deleted, if refcounting is used to manage its lifetime.

Definition at line 142 of file ECG_UDP_Sender.cpp.

References TAO_EC_Auto_Command< T >::disallow_command(), and shutdown().

00143 {
00144   // Prevent attempts to disconnect.
00145   this->auto_proxy_disconnect_.disallow_command ();
00146 
00147   this->shutdown ();
00148 }

int TAO_ECG_UDP_Sender::get_local_addr ACE_INET_Addr addr  ) 
 

Get the local endpoint used to send the events.

void TAO_ECG_UDP_Sender::init RtecEventChannelAdmin::EventChannel_ptr  lcl_ec,
RtecUDPAdmin::AddrServer_ptr  addr_server,
TAO_ECG_Refcounted_Endpoint  endpoint_rptr
 

Parameters:
lcl_ec Event Channel to which we will act as a consumer of events.
addr_server Address server used to obtain event type to multicast group mapping.
endpoint_rptr Endpoint for sending udp/multicast messages. Endpoint's dgram must be open!
To insure proper resource clean up, if init () is successful, shutdown () must be called when the sender is no longer needed. This is done by disconnect_push_consumer() method. If disconnect_push_consumer() will not be called, it is the responsibility of the user. Furthermore, if shutdown() is not explicitly called by either disconnect_push_consumer () or the user, the sender will clean up the resources in its destructor, however, certain entities involved in cleanup must still exist at that point, e.g., POA.

Definition at line 33 of file ECG_UDP_Sender.cpp.

References ACE_ERROR, cdr_sender_, TAO_ECG_CDR_Message_Sender::init(), CORBA::is_nil(), LM_ERROR, and TAO_ECG_Refcounted_Endpoint.

00036 {
00037   if (CORBA::is_nil (lcl_ec))
00038     {
00039       ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::init(): "
00040                             "<lcl_ec> argument is nil."));
00041       throw CORBA::INTERNAL ();
00042     }
00043 
00044   if (CORBA::is_nil (addr_server))
00045     {
00046       ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::init(): "
00047                             "address server argument is nil."));
00048       throw CORBA::INTERNAL ();
00049     }
00050 
00051   this->cdr_sender_.init (endpoint_rptr);
00052 
00053   this->lcl_ec_ =
00054     RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec);
00055 
00056   this->addr_server_ =
00057     RtecUDPAdmin::AddrServer::_duplicate (addr_server);
00058 }

CORBA::ULong TAO_ECG_UDP_Sender::mtu void   )  const
 

The sender may need to fragment the message, otherwise the network may drop the packets. Setting the MTU can fail if the value is too small (at least the header + 8 bytes must fit).

int TAO_ECG_UDP_Sender::mtu CORBA::ULong  mtu  ) 
 

The sender may need to fragment the message, otherwise the network may drop the packets. Setting the MTU can fail if the value is too small (at least the header + 8 bytes must fit).

void TAO_ECG_UDP_Sender::new_connect const RtecEventChannelAdmin::ConsumerQOS sub  )  [private]
 

Definition at line 91 of file ECG_UDP_Sender.cpp.

References activate(), ECG_Sender_Auto_Proxy_Disconnect, TAO_EC_Auto_Command< T >::set_command(), and TAO_EC_Deactivated_Object::set_deactivator().

Referenced by connect().

00092 {
00093   // Activate with poa.
00094   RtecEventComm::PushConsumer_var consumer_ref;
00095   PortableServer::POA_var poa = this->_default_POA ();
00096 
00097   TAO_EC_Object_Deactivator deactivator;
00098   activate (consumer_ref,
00099             poa.in (),
00100             this,
00101             deactivator);
00102 
00103   // Connect as a consumer to the local EC.
00104   RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
00105     this->lcl_ec_->for_consumers ();
00106 
00107   RtecEventChannelAdmin::ProxyPushSupplier_var proxy =
00108     consumer_admin->obtain_push_supplier ();
00109   ECG_Sender_Auto_Proxy_Disconnect new_proxy_disconnect (proxy.in ());
00110 
00111   proxy->connect_push_consumer (consumer_ref.in (),
00112                                 sub);
00113 
00114   // Update resource managers.
00115   this->supplier_proxy_ = proxy._retn ();
00116   this->auto_proxy_disconnect_.set_command (new_proxy_disconnect);
00117   this->set_deactivator (deactivator);
00118 }

void TAO_ECG_UDP_Sender::push const RtecEventComm::EventSet events  )  [virtual]
 

Invokes shutdown (), which may result in the object being deleted, if refcounting is used to manage its lifetime.

Definition at line 166 of file ECG_UDP_Sender.cpp.

References cdr_sender_, RtecEventComm::Event::data, RtecEventComm::EventSet, RtecEventComm::Event::header, RtecUDPAdmin::UDP_Addr::ipaddr, RtecUDPAdmin::UDP_Addr::port, TAO_ECG_CDR_Message_Sender::send_message(), RtecEventComm::EventHeader::ttl, and ACE_OutputCDR::write_ulong().

00167 {
00168   if (events.length () == 0)
00169     {
00170       // ACE_DEBUG ((EC_FORMAT (DEBUG,
00171       //                        "Nothing to multicast: "
00172       //                        "0-length EventSet.")));
00173       return;
00174     }
00175 
00176   // Send each event in a separate message.
00177   // @@ TODO It is interesting to group events destined to the
00178   // same mcast group in a single message.
00179   for (u_int i = 0; i < events.length (); ++i)
00180     {
00181       // To avoid loops we keep a TTL field on the events and skip the
00182       // events with TTL <= 0
00183       if (events[i].header.ttl <= 0)
00184         continue;
00185 
00186       const RtecEventComm::Event& e = events[i];
00187 
00188       // We need to modify the TTL field, but copying the entire event
00189       // would be wasteful; instead we create a new header and only
00190       // modify the header portion.
00191       RtecEventComm::EventHeader header = e.header;
00192       header.ttl--;
00193 
00194       // Start building the message
00195       TAO_OutputCDR cdr;
00196 
00197       // Marshal as if it was a sequence of one element, notice how we
00198       // marshal a modified version of the header, but the payload is
00199       // marshal without any extra copies.
00200       cdr.write_ulong (1);
00201       if (!(cdr << header)
00202           || !(cdr << e.data))
00203         throw CORBA::MARSHAL ();
00204 
00205       ACE_INET_Addr inet_addr;
00206       try
00207         {
00208           // Grab the right mcast group for this event...
00209           RtecUDPAdmin::UDP_Address_var udp_addr;
00210 
00211           this->addr_server_->get_address (header, udp_addr.out());
00212           switch (udp_addr->_d())
00213             {
00214             case RtecUDPAdmin::Rtec_inet:
00215               inet_addr.set(udp_addr->v4_addr().port,
00216                             udp_addr->v4_addr().ipaddr);
00217               break;
00218             case RtecUDPAdmin::Rtec_inet6:
00219 #if defined (ACE_HAS_IPV6)
00220               inet_addr.set_type(PF_INET6);
00221 #endif
00222               inet_addr.set_address(udp_addr->v6_addr().ipaddr,16,0);
00223               inet_addr.set_port_number(udp_addr->v6_addr().port);
00224               break;
00225             }
00226         }
00227       catch (const ::CORBA::BAD_OPERATION &)
00228         {
00229           // server only supports IPv4
00230            // Grab the right mcast group for this event...
00231           RtecUDPAdmin::UDP_Addr udp_addr;
00232           this->addr_server_->get_addr (header, udp_addr);
00233           inet_addr.set (udp_addr.port, udp_addr.ipaddr);
00234         }
00235 
00236       this->cdr_sender_.send_message (cdr, inet_addr);
00237     }
00238 }

void TAO_ECG_UDP_Sender::reconnect const RtecEventChannelAdmin::ConsumerQOS sub  )  [private]
 

Definition at line 121 of file ECG_UDP_Sender.cpp.

References CORBA::is_nil().

Referenced by connect().

00122 {
00123   // Obtain our object reference from the POA.
00124   RtecEventComm::PushConsumer_var consumer_ref;
00125   PortableServer::POA_var poa = this->_default_POA ();
00126 
00127   CORBA::Object_var obj = poa->servant_to_reference (this);
00128   consumer_ref =
00129     RtecEventComm::PushConsumer::_narrow (obj.in ());
00130 
00131   if (CORBA::is_nil (consumer_ref.in ()))
00132     {
00133       throw CORBA::INTERNAL ();
00134     }
00135 
00136   // Reconnect.
00137   this->supplier_proxy_->connect_push_consumer (consumer_ref.in (),
00138                                                 sub);
00139 }

void TAO_ECG_UDP_Sender::shutdown void   ) 
 

Calling this method may result in decrementing of the reference count (due to deactivation) and deletion of the object.

Definition at line 151 of file ECG_UDP_Sender.cpp.

References cdr_sender_, TAO_EC_Object_Deactivator::deactivate(), TAO_EC_Auto_Command< T >::execute(), and TAO_ECG_CDR_Message_Sender::shutdown().

Referenced by disconnect_push_consumer().

00152 {
00153   this->supplier_proxy_ =
00154     RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
00155 
00156   this->auto_proxy_disconnect_.execute ();
00157 
00158   this->addr_server_ = RtecUDPAdmin::AddrServer::_nil ();
00159   this->lcl_ec_ = RtecEventChannelAdmin::EventChannel::_nil ();
00160 
00161   this->deactivator_.deactivate ();
00162   this->cdr_sender_.shutdown ();
00163 }


Member Data Documentation

RtecUDPAdmin::AddrServer_var TAO_ECG_UDP_Sender::addr_server_ [private]
 

We query this object to determine where the events should be sent.

Definition at line 205 of file ECG_UDP_Sender.h.

ECG_Sender_Auto_Proxy_Disconnect TAO_ECG_UDP_Sender::auto_proxy_disconnect_ [private]
 

Manages our connection to Supplier Proxy.

Definition at line 213 of file ECG_UDP_Sender.h.

TAO_ECG_CDR_Message_Sender TAO_ECG_UDP_Sender::cdr_sender_ [private]
 

Helper for fragmenting and sending cdr-encoded events using udp.

Definition at line 208 of file ECG_UDP_Sender.h.

Referenced by init(), push(), and shutdown().

RtecEventChannelAdmin::EventChannel_var TAO_ECG_UDP_Sender::lcl_ec_ [private]
 

Event Channel to which we act as a consumer.

Definition at line 202 of file ECG_UDP_Sender.h.

RtecEventChannelAdmin::ProxyPushSupplier_var TAO_ECG_UDP_Sender::supplier_proxy_ [private]
 

Proxy used to receive events from the Event Channel.

Definition at line 199 of file ECG_UDP_Sender.h.


The documentation for this class was generated from the following files:
Generated on Sun Jan 27 15:36:43 2008 for TAO_RTEvent by doxygen 1.3.6