TAO_ECG_UDP_Sender Class Reference

Send events received from a "local" EC using UDP. NOT THREAD-SAFE. This class connect as a consumer to an EventChannel and forwards the events it receives from that EC using UDP. More...

#include <ECG_UDP_Sender.h>

Inheritance diagram for TAO_ECG_UDP_Sender:

Inheritance graph
[legend]
Collaboration diagram for TAO_ECG_UDP_Sender:

Collaboration graph
[legend]
List of all members.

[NOHEADER]

 ~TAO_ECG_UDP_Sender (void)
void init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec, RtecUDPAdmin::AddrServer_ptr addr_server, TAO_ECG_Refcounted_Endpoint endpoint_rptr)
void connect (const RtecEventChannelAdmin::ConsumerQOS &sub)
 Connect or reconnect to the EC with the given subscriptions.

void shutdown ()
TAO_EC_Servant_Var< TAO_ECG_UDP_Sendercreate (CORBA::Boolean crc=0)

Public Member Functions

int mtu (CORBA::ULong mtu)
CORBA::ULong mtu (void) const
int get_local_addr (ACE_INET_Addr &addr)
 Get the local endpoint used to send the events.

virtual void disconnect_push_consumer () throw (CORBA::SystemException)
virtual void push (const RtecEventComm::EventSet &events) throw (CORBA::SystemException)

Protected Member Functions

 TAO_ECG_UDP_Sender (CORBA::Boolean crc=0)

Private Types

typedef TAO_EC_Auto_Command<
TAO_ECG_UDP_Sender_Disconnect_Command
ECG_Sender_Auto_Proxy_Disconnect

Private Member Functions

void new_connect (const RtecEventChannelAdmin::ConsumerQOS &sub)
void reconnect (const RtecEventChannelAdmin::ConsumerQOS &sub)

Private Attributes

RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_
 Proxy used to receive events from the Event Channel.

RtecEventChannelAdmin::EventChannel_var lcl_ec_
 Event Channel to which we act as a consumer.

RtecUDPAdmin::AddrServer_var addr_server_
 We query this object to determine where the events should be sent.

TAO_ECG_CDR_Message_Sender cdr_sender_
 Helper for fragmenting and sending cdr-encoded events using udp.

ECG_Sender_Auto_Proxy_Disconnect auto_proxy_disconnect_
 Manages our connection to Supplier Proxy.


Detailed Description

Send events received from a "local" EC using UDP. NOT THREAD-SAFE. This class connect as a consumer to an EventChannel and forwards the events it receives from that EC using UDP.

Definition at line 103 of file ECG_UDP_Sender.h.


Member Typedef Documentation

typedef TAO_EC_Auto_Command<TAO_ECG_UDP_Sender_Disconnect_Command> TAO_ECG_UDP_Sender::ECG_Sender_Auto_Proxy_Disconnect [private]
 

Definition at line 218 of file ECG_UDP_Sender.h.

Referenced by new_connect().


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_ECG_UDP_Sender::~TAO_ECG_UDP_Sender void   ) 
 

Definition at line 28 of file ECG_UDP_Sender.cpp.

00029 {
00030 }

TAO_ECG_UDP_Sender::TAO_ECG_UDP_Sender CORBA::Boolean  crc = 0  )  [protected]
 

Constructor (protected). Clients can create new TAO_ECG_UDP_Sender objects using the static create() method.


Member Function Documentation

void TAO_ECG_UDP_Sender::connect const RtecEventChannelAdmin::ConsumerQOS sub  ) 
 

Connect or reconnect to the EC with the given subscriptions.

NOTE: if we are already connected to EC and a reconnection is necessary, the EC must have reconnects enabled in order for this method to succeed.

Definition at line 63 of file ECG_UDP_Sender.cpp.

References ACE_CHECK, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_THROW, CORBA::is_nil(), LM_ERROR, new_connect(), and reconnect().

00065 {
00066   if (CORBA::is_nil (this->lcl_ec_.in ()))
00067     {
00068       ACE_ERROR ((LM_ERROR, "Error initializing TAO_ECG_UDP_Sender: "
00069                             "init() has not been called before connect()."));
00070       ACE_THROW (CORBA::INTERNAL ());
00071     }
00072 
00073   if (sub.dependencies.length () == 0)
00074     {
00075       ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::connect(): "
00076                             "0-length subscriptions argument."));
00077       ACE_THROW (CORBA::INTERNAL ());
00078     }
00079 
00080   if (CORBA::is_nil (this->supplier_proxy_.in ()))
00081     {
00082       this->new_connect (sub ACE_ENV_ARG_PARAMETER);
00083       ACE_CHECK;
00084     }
00085   else
00086     {
00087       this->reconnect (sub ACE_ENV_ARG_PARAMETER);
00088       ACE_CHECK;
00089     }
00090 }

TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> TAO_ECG_UDP_Sender::create CORBA::Boolean  crc = 0  )  [static]
 

Create a new TAO_ECG_UDP_Sender object. (Constructor access is restricted to insure that all TAO_ECG_UDP_Sender objects are heap-allocated.)

void TAO_ECG_UDP_Sender::disconnect_push_consumer  )  throw (CORBA::SystemException) [virtual]
 

Invokes shutdown (), which may result in the object being deleted, if refcounting is used to manage its lifetime.

Definition at line 156 of file ECG_UDP_Sender.cpp.

References ACE_ENV_SINGLE_ARG_PARAMETER.

00158 {
00159   // Prevent attempts to disconnect.
00160   this->auto_proxy_disconnect_.disallow_command ();
00161 
00162   this->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
00163 }

int TAO_ECG_UDP_Sender::get_local_addr ACE_INET_Addr addr  ) 
 

Get the local endpoint used to send the events.

void TAO_ECG_UDP_Sender::init RtecEventChannelAdmin::EventChannel_ptr  lcl_ec,
RtecUDPAdmin::AddrServer_ptr  addr_server,
TAO_ECG_Refcounted_Endpoint  endpoint_rptr
 

Parameters:
lcl_ec Event Channel to which we will act as a consumer of events.
addr_server Address server used to obtain event type to multicast group mapping.
endpoint_rptr Endpoint for sending udp/multicast messages. Endpoint's dgram must be open!
To insure proper resource clean up, if init () is successful, shutdown () must be called when the sender is no longer needed. This is done by disconnect_push_consumer() method. If disconnect_push_consumer() will not be called, it is the responsibility of the user. Furthermore, if shutdown() is not explicitly called by either disconnect_push_consumer () or the user, the sender will clean up the resources in its destructor, however, certain entities involved in cleanup must still exist at that point, e.g., POA.

Definition at line 33 of file ECG_UDP_Sender.cpp.

References ACE_CHECK, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_THROW, cdr_sender_, TAO_ECG_CDR_Message_Sender::init(), CORBA::is_nil(), LM_ERROR, and TAO_ECG_Refcounted_Endpoint.

00037 {
00038   if (CORBA::is_nil (lcl_ec))
00039     {
00040       ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::init(): "
00041                             "<lcl_ec> argument is nil."));
00042       ACE_THROW (CORBA::INTERNAL ());
00043     }
00044 
00045   if (CORBA::is_nil (addr_server))
00046     {
00047       ACE_ERROR ((LM_ERROR, "TAO_ECG_UDP_Sender::init(): "
00048                             "address server argument is nil."));
00049       ACE_THROW (CORBA::INTERNAL ());
00050     }
00051 
00052   this->cdr_sender_.init (endpoint_rptr ACE_ENV_ARG_PARAMETER);
00053   ACE_CHECK;
00054 
00055   this->lcl_ec_ =
00056     RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec);
00057 
00058   this->addr_server_ =
00059     RtecUDPAdmin::AddrServer::_duplicate (addr_server);
00060 }

CORBA::ULong TAO_ECG_UDP_Sender::mtu void   )  const
 

The sender may need to fragment the message, otherwise the network may drop the packets. Setting the MTU can fail if the value is too small (at least the header + 8 bytes must fit).

int TAO_ECG_UDP_Sender::mtu CORBA::ULong  mtu  ) 
 

The sender may need to fragment the message, otherwise the network may drop the packets. Setting the MTU can fail if the value is too small (at least the header + 8 bytes must fit).

void TAO_ECG_UDP_Sender::new_connect const RtecEventChannelAdmin::ConsumerQOS sub  )  [private]
 

Definition at line 93 of file ECG_UDP_Sender.cpp.

References ACE_CHECK, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, activate(), ECG_Sender_Auto_Proxy_Disconnect, TAO_EC_Auto_Command< T >::set_command(), and TAO_EC_Deactivated_Object::set_deactivator().

Referenced by connect().

00095 {
00096   // Activate with poa.
00097   RtecEventComm::PushConsumer_var consumer_ref;
00098   PortableServer::POA_var poa = this->_default_POA ();
00099 
00100   TAO_EC_Object_Deactivator deactivator;
00101   activate (consumer_ref,
00102             poa.in (),
00103             this,
00104             deactivator
00105             ACE_ENV_ARG_PARAMETER);
00106   ACE_CHECK;
00107 
00108   // Connect as a consumer to the local EC.
00109   RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
00110     this->lcl_ec_->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
00111   ACE_CHECK;
00112 
00113   RtecEventChannelAdmin::ProxyPushSupplier_var proxy =
00114     consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00115   ACE_CHECK;
00116   ECG_Sender_Auto_Proxy_Disconnect new_proxy_disconnect (proxy.in ());
00117 
00118   proxy->connect_push_consumer (consumer_ref.in (),
00119                                 sub
00120                                 ACE_ENV_ARG_PARAMETER);
00121   ACE_CHECK;
00122 
00123   // Update resource managers.
00124   this->supplier_proxy_ = proxy._retn ();
00125   this->auto_proxy_disconnect_.set_command (new_proxy_disconnect);
00126   this->set_deactivator (deactivator);
00127 }

void TAO_ECG_UDP_Sender::push const RtecEventComm::EventSet events  )  throw (CORBA::SystemException) [virtual]
 

Invokes shutdown (), which may result in the object being deleted, if refcounting is used to manage its lifetime.

Definition at line 182 of file ECG_UDP_Sender.cpp.

References ACE_CHECK, ACE_ENV_ARG_PARAMETER, ACE_THROW, RtecEventComm::Event::data, RtecEventComm::EventSet, RtecEventComm::Event::header, RtecUDPAdmin::UDP_Addr::ipaddr, RtecUDPAdmin::UDP_Addr::port, RtecEventComm::EventHeader::ttl, and ACE_OutputCDR::write_ulong().

00185 {
00186   if (events.length () == 0)
00187     {
00188       // ACE_DEBUG ((EC_FORMAT (DEBUG,
00189       //                        "Nothing to multicast: "
00190       //                        "0-length EventSet.")));
00191       return;
00192     }
00193 
00194   // Send each event in a separate message.
00195   // @@ TODO It is interesting to group events destined to the
00196   // same mcast group in a single message.
00197   for (u_int i = 0; i < events.length (); ++i)
00198     {
00199       // To avoid loops we keep a TTL field on the events and skip the
00200       // events with TTL <= 0
00201       if (events[i].header.ttl <= 0)
00202         continue;
00203 
00204       const RtecEventComm::Event& e = events[i];
00205 
00206       // We need to modify the TTL field, but copying the entire event
00207       // would be wasteful; instead we create a new header and only
00208       // modify the header portion.
00209       RtecEventComm::EventHeader header = e.header;
00210       header.ttl--;
00211 
00212       // Start building the message
00213       TAO_OutputCDR cdr;
00214 
00215       // Marshal as if it was a sequence of one element, notice how we
00216       // marshal a modified version of the header, but the payload is
00217       // marshal without any extra copies.
00218       cdr.write_ulong (1);
00219       if (!(cdr << header)
00220           || !(cdr << e.data))
00221         ACE_THROW (CORBA::MARSHAL ());
00222 
00223       // Grab the right mcast group for this event...
00224       RtecUDPAdmin::UDP_Addr udp_addr;
00225       this->addr_server_->get_addr (header, udp_addr ACE_ENV_ARG_PARAMETER);
00226       ACE_CHECK;
00227 
00228       ACE_INET_Addr inet_addr (udp_addr.port,
00229                                udp_addr.ipaddr);
00230 
00231       this->cdr_sender_.send_message (cdr, inet_addr ACE_ENV_ARG_PARAMETER);
00232       ACE_CHECK;
00233     }
00234 }

void TAO_ECG_UDP_Sender::reconnect const RtecEventChannelAdmin::ConsumerQOS sub  )  [private]
 

Definition at line 130 of file ECG_UDP_Sender.cpp.

References ACE_CHECK, ACE_ENV_ARG_PARAMETER, ACE_THROW, and CORBA::is_nil().

Referenced by connect().

00132 {
00133   // Obtain our object reference from the POA.
00134   RtecEventComm::PushConsumer_var consumer_ref;
00135   PortableServer::POA_var poa = this->_default_POA ();
00136 
00137   CORBA::Object_var obj = poa->servant_to_reference (this ACE_ENV_ARG_PARAMETER);
00138   ACE_CHECK;
00139   consumer_ref =
00140     RtecEventComm::PushConsumer::_narrow (obj.in () ACE_ENV_ARG_PARAMETER);
00141   ACE_CHECK;
00142 
00143   if (CORBA::is_nil (consumer_ref.in ()))
00144     {
00145       ACE_THROW (CORBA::INTERNAL ());
00146     }
00147 
00148   // Reconnect.
00149   this->supplier_proxy_->connect_push_consumer (consumer_ref.in (),
00150                                                 sub
00151                                                 ACE_ENV_ARG_PARAMETER);
00152   ACE_CHECK;
00153 }

void TAO_ECG_UDP_Sender::shutdown  ) 
 

Calling this method may result in decrementing of the reference count (due to deactivation) and deletion of the object.

Definition at line 166 of file ECG_UDP_Sender.cpp.

References ACE_CHECK, ACE_ENV_SINGLE_ARG_PARAMETER, cdr_sender_, TAO_EC_Object_Deactivator::deactivate(), TAO_EC_Auto_Command< T >::execute(), and TAO_ECG_CDR_Message_Sender::shutdown().

00167 {
00168   this->supplier_proxy_ =
00169     RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
00170 
00171   this->auto_proxy_disconnect_.execute ();
00172 
00173   this->addr_server_ = RtecUDPAdmin::AddrServer::_nil ();
00174   this->lcl_ec_ = RtecEventChannelAdmin::EventChannel::_nil ();
00175 
00176   this->deactivator_.deactivate ();
00177   this->cdr_sender_.shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
00178   ACE_CHECK;
00179 }


Member Data Documentation

RtecUDPAdmin::AddrServer_var TAO_ECG_UDP_Sender::addr_server_ [private]
 

We query this object to determine where the events should be sent.

Definition at line 212 of file ECG_UDP_Sender.h.

ECG_Sender_Auto_Proxy_Disconnect TAO_ECG_UDP_Sender::auto_proxy_disconnect_ [private]
 

Manages our connection to Supplier Proxy.

Definition at line 220 of file ECG_UDP_Sender.h.

TAO_ECG_CDR_Message_Sender TAO_ECG_UDP_Sender::cdr_sender_ [private]
 

Helper for fragmenting and sending cdr-encoded events using udp.

Definition at line 215 of file ECG_UDP_Sender.h.

Referenced by init(), and shutdown().

RtecEventChannelAdmin::EventChannel_var TAO_ECG_UDP_Sender::lcl_ec_ [private]
 

Event Channel to which we act as a consumer.

Definition at line 209 of file ECG_UDP_Sender.h.

RtecEventChannelAdmin::ProxyPushSupplier_var TAO_ECG_UDP_Sender::supplier_proxy_ [private]
 

Proxy used to receive events from the Event Channel.

Definition at line 206 of file ECG_UDP_Sender.h.


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 13:16:28 2006 for TAO_RTEvent by doxygen 1.3.6