TAO_ECG_CDR_Message_Sender Class Reference

Sends CDR messages using UDP. NOT THREAD-SAFE. More...

#include <ECG_CDR_Message_Sender.h>

Collaboration diagram for TAO_ECG_CDR_Message_Sender:

Collaboration graph
[legend]
List of all members.

Public Types

enum  { ECG_HEADER_SIZE = 32, ECG_MIN_MTU = 32 + 8, ECG_MAX_MTU = 65536, ECG_DEFAULT_MTU = 1024 }

Public Member Functions

void send_message (const TAO_OutputCDR &cdr, const ACE_INET_Addr &addr) throw (CORBA::SystemException)
 The main method - send a CDR message.

 TAO_ECG_CDR_Message_Sender (CORBA::Boolean crc=0)
void init (TAO_ECG_Refcounted_Endpoint endpoint_rptr) throw (CORBA::SystemException)
 Set the endpoint for sending messages.

void shutdown ()
int get_local_addr (ACE_INET_Addr &addr)
 Get the local endpoint used to send the events.

int mtu (CORBA::ULong mtu)
CORBA::ULong mtu (void) const
 Get the local endpoint used to send the events.


Private Member Functions

ACE_SOCK_Dgramdgram (void)
 Return the datagram...

void send_fragment (const ACE_INET_Addr &addr, CORBA::ULong request_id, CORBA::ULong request_size, CORBA::ULong fragment_size, CORBA::ULong fragment_offset, CORBA::ULong fragment_id, CORBA::ULong fragment_count, iovec iov[], int iovcnt)
CORBA::ULong compute_fragment_count (const ACE_Message_Block *begin, const ACE_Message_Block *end, int iov_size, CORBA::ULong max_fragment_payload, CORBA::ULong &total_length)

Private Attributes

TAO_ECG_Refcounted_Endpoint endpoint_rptr_
 The datagram used for sendto ().

CORBA::ULong mtu_
 The MTU for this sender...

CORBA::Boolean checksum_
 Should crc checksum be caluclated and sent?


Detailed Description

Sends CDR messages using UDP. NOT THREAD-SAFE.

This class breaks up a CDR message into fragments and sends each fragment with a header (described below) using UDP. The UDP address can be a normal IP address or it can be a multicast group. The UDP address is obtained from a RtecUDPAdmin::AddrServer class.

This class is used by various Gateway (Senders/Receivers) classes responsible for federating Event Channels with UDP/Mcast.

MESSAGE FORMAT

Message header are encapsulated using CDR, with the following format: struct Header { octet byte_order_flags; // bit 0 represents the byte order as in GIOP 1.1 // bit 1 is set if this is the last fragment unsigned long request_id; // The request ID, senders must not send two requests with // the same ID, senders can be distinguished using recvfrom.. unsigned long request_size; // The size of this request, this can be used to pre-allocate // the request buffer. unsgined long fragment_size; // The size of this fragment, excluding the header... unsigned long fragment_offset; // Where does this fragment fit in the complete message... unsigned long fragment_id; // The ID of this fragment... unsigned long fragment_count; // The total number of fragments to expect in this request

//

Todo:
This could be eliminated if efficient reassembly // could be implemented without it. octet padding[4];
// Ensures the header ends at an 8-byte boundary. }; // size (in CDR stream) = 32

Definition at line 76 of file ECG_CDR_Message_Sender.h.


Member Enumeration Documentation

anonymous enum
 

Enumeration values:
ECG_HEADER_SIZE 
ECG_MIN_MTU 
ECG_MAX_MTU 
ECG_DEFAULT_MTU 

Definition at line 80 of file ECG_CDR_Message_Sender.h.

00080        {
00081     ECG_HEADER_SIZE = 32,
00082     ECG_MIN_MTU = 32 + 8,
00083     ECG_MAX_MTU = 65536, // Really optimistic...
00084     ECG_DEFAULT_MTU = 1024
00085   };


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE TAO_ECG_CDR_Message_Sender::TAO_ECG_CDR_Message_Sender CORBA::Boolean  crc = 0  ) 
 

Definition at line 10 of file ECG_CDR_Message_Sender.i.

00011   :  endpoint_rptr_ ()
00012      , mtu_ (TAO_ECG_CDR_Message_Sender::ECG_DEFAULT_MTU)
00013      , checksum_ (crc)
00014 {
00015 }


Member Function Documentation

CORBA::ULong TAO_ECG_CDR_Message_Sender::compute_fragment_count const ACE_Message_Block begin,
const ACE_Message_Block end,
int  iov_size,
CORBA::ULong  max_fragment_payload,
CORBA::ULong total_length
[private]
 

Count the number of fragments that will be required to send the message blocks in the range [begin,end) The maximum fragment payload (i.e. the size without the header is also required); returns the total message size.

Definition at line 287 of file ECG_CDR_Message_Sender.cpp.

References ACE_Message_Block::cont(), and ACE_Message_Block::length().

00292 {
00293   CORBA::ULong fragment_count = 0;
00294   total_length = 0;
00295 
00296   CORBA::ULong fragment_size = 0;
00297   // Reserve the first iovec for the header...
00298   int iovcnt = 1;
00299   for (const ACE_Message_Block* b = begin;
00300        b != end;
00301        b = b->cont ())
00302     {
00303       CORBA::ULong l = b->length ();
00304       total_length += l;
00305       fragment_size += l;
00306       ++iovcnt;
00307       while (fragment_size > max_fragment_payload)
00308         {
00309           // Ran out of space, must create a fragment...
00310           ++fragment_count;
00311 
00312           // The next iovector will contain what remains of this
00313           // buffer, but also consider
00314           iovcnt = 2;
00315           l -= max_fragment_payload - (fragment_size - l);
00316           fragment_size = l;
00317         }
00318       if (fragment_size == max_fragment_payload)
00319         {
00320           ++fragment_count;
00321           iovcnt = 1;
00322           fragment_size = 0;
00323         }
00324       if (iovcnt >= iov_size)
00325         {
00326           // Ran out of space in the iovector....
00327           ++fragment_count;
00328           iovcnt = 1;
00329           fragment_size = 0;
00330         }
00331     }
00332   if (iovcnt != 1)
00333     {
00334       // Send the remaining data in another fragment
00335       ++fragment_count;
00336     }
00337   return fragment_count;
00338 }

ACE_INLINE ACE_SOCK_Dgram & TAO_ECG_CDR_Message_Sender::dgram void   )  [private]
 

Return the datagram...

Definition at line 26 of file ECG_CDR_Message_Sender.i.

References ACE_ASSERT, and endpoint_rptr_.

Referenced by get_local_addr(), and send_fragment().

00027 {
00028   ACE_ASSERT (this->endpoint_rptr_.get ());
00029   return this->endpoint_rptr_->dgram ();
00030 }

ACE_INLINE int TAO_ECG_CDR_Message_Sender::get_local_addr ACE_INET_Addr addr  ) 
 

Get the local endpoint used to send the events.

Definition at line 33 of file ECG_CDR_Message_Sender.i.

References dgram(), endpoint_rptr_, ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::get(), and ACE_SOCK::get_local_addr().

00034 {
00035   if (this->endpoint_rptr_.get () == 0)
00036     return -1;
00037   return this->dgram ().get_local_addr (addr);
00038 }

TAO_BEGIN_VERSIONED_NAMESPACE_DECL void TAO_ECG_CDR_Message_Sender::init TAO_ECG_Refcounted_Endpoint  endpoint_rptr  )  throw (CORBA::SystemException)
 

Set the endpoint for sending messages.

If init () is successful, shutdown () must be called when the sender is no longer needed. If shutdown () is not called by the user, cleanup activities will be performed by the destructor.

Definition at line 18 of file ECG_CDR_Message_Sender.cpp.

References ACE_ERROR, ACE_THROW, LM_ERROR, and TAO_ECG_Refcounted_Endpoint.

Referenced by TAO_ECG_UDP_Sender::init().

00022 {
00023   if (endpoint_rptr.get () == 0
00024       || endpoint_rptr->dgram ().get_handle () == ACE_INVALID_HANDLE)
00025     {
00026       ACE_ERROR ((LM_ERROR, "TAO_ECG_CDR_Message_Sender::init(): "
00027                             "nil or unitialized endpoint argument.\n"));
00028       ACE_THROW (CORBA::INTERNAL ());
00029     }
00030 
00031   this->endpoint_rptr_ = endpoint_rptr;
00032 }

ACE_INLINE CORBA::ULong TAO_ECG_CDR_Message_Sender::mtu void   )  const
 

Get the local endpoint used to send the events.

Definition at line 41 of file ECG_CDR_Message_Sender.i.

References mtu_.

00042 {
00043   return this->mtu_;
00044 }

ACE_INLINE int TAO_ECG_CDR_Message_Sender::mtu CORBA::ULong  mtu  ) 
 

The sender may need to fragment the message, otherwise the network may drop the packets. Setting the MTU can fail if the value is too small (at least the header + 8 bytes must fit).

Definition at line 47 of file ECG_CDR_Message_Sender.i.

References ECG_MAX_MTU, ECG_MIN_MTU, and mtu_.

00048 {
00049   if (new_mtu < TAO_ECG_CDR_Message_Sender::ECG_MIN_MTU
00050       || new_mtu >= TAO_ECG_CDR_Message_Sender::ECG_MAX_MTU)
00051     return -1;
00052   this->mtu_ = new_mtu;
00053   return 0;
00054 }

void TAO_ECG_CDR_Message_Sender::send_fragment const ACE_INET_Addr addr,
CORBA::ULong  request_id,
CORBA::ULong  request_size,
CORBA::ULong  fragment_size,
CORBA::ULong  fragment_offset,
CORBA::ULong  fragment_id,
CORBA::ULong  fragment_count,
iovec  iov[],
int  iovcnt
[private]
 

Send one fragment, the first entry in the iovec is used to send the header, the rest of the iovec array should contain pointers to the actual data.

Definition at line 192 of file ECG_CDR_Message_Sender.cpp.

References ACE_DEBUG, ACE_THROW, ACE_OutputCDR::begin(), checksum_, ACE::crc32(), dgram(), ECG_HEADER_SIZE, EWOULDBLOCK, iovec::iov_base, iovec::iov_len, ACE_Message_Block::length(), LM_ERROR, LM_WARNING, ACE_Message_Block::rd_ptr(), ACE_SOCK_Dgram::send(), ssize_t, TAO_ENCAP_BYTE_ORDER, ACE_OutputCDR::write_boolean(), ACE_OutputCDR::write_octet(), ACE_OutputCDR::write_octet_array(), and ACE_OutputCDR::write_ulong().

00202 {
00203   CORBA::ULong header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE
00204                      / sizeof(CORBA::ULong)
00205                      + ACE_CDR::MAX_ALIGNMENT];
00206   char* buf = reinterpret_cast<char*> (header);
00207   TAO_OutputCDR cdr (buf, sizeof(header));
00208   cdr.write_boolean (TAO_ENCAP_BYTE_ORDER);
00209   // Insert some known values in the padding bytes, so we can smoke
00210   // test the message on the receiving end.
00211   cdr.write_octet ('A'); cdr.write_octet ('B'); cdr.write_octet ('C');
00212   cdr.write_ulong (request_id);
00213   cdr.write_ulong (request_size);
00214   cdr.write_ulong (fragment_size);
00215   cdr.write_ulong (fragment_offset);
00216   cdr.write_ulong (fragment_id);
00217   cdr.write_ulong (fragment_count);
00218   CORBA::Octet padding[4];
00219 
00220 
00221   // MRH
00222   if (checksum_)
00223   {
00224     // Compute CRC
00225      iov[0].iov_base = cdr.begin ()->rd_ptr ();
00226      iov[0].iov_len  = cdr.begin ()->length ();
00227      unsigned int crc = 0;
00228      unsigned char *crc_parts = (unsigned char *)(&crc);
00229      if (iovcnt > 1)
00230        {
00231          crc = ACE::crc32 (iov, iovcnt);
00232          crc = htonl (crc);
00233        }
00234      for (int cnt=0; cnt<4; ++cnt)
00235        {
00236          padding[cnt] = crc_parts[cnt];
00237        }
00238    }
00239    else
00240    {
00241      for (int cnt=0; cnt<4; ++cnt)
00242      {
00243        padding[cnt] = 0;
00244      }
00245    }
00246    //End MRH
00247   cdr.write_octet_array (padding, 4);
00248 
00249   iov[0].iov_base = cdr.begin ()->rd_ptr ();
00250   iov[0].iov_len  = cdr.begin ()->length ();
00251 
00252   ssize_t n = this->dgram ().send (iov,
00253                                    iovcnt,
00254                                    addr);
00255   size_t expected_n = 0;
00256   for (int i = 0; i < iovcnt; ++i)
00257     expected_n += iov[i].iov_len;
00258   if (n > 0 && size_t(n) != expected_n)
00259     {
00260       ACE_DEBUG ((LM_ERROR, ("Sent only %d out of %d bytes "
00261                               "for mcast fragment.\n"),
00262                   n,
00263                   expected_n));
00264     }
00265 
00266   if (n == -1)
00267     {
00268       if (errno == EWOULDBLOCK)
00269         {
00270           ACE_DEBUG ((LM_ERROR, "Send of mcast fragment failed (%m).\n"));
00271           // @@ TODO Use a Event Channel specific exception
00272           ACE_THROW (CORBA::COMM_FAILURE ());
00273         }
00274       else
00275         {
00276           ACE_DEBUG ((LM_WARNING, "Send of mcast fragment blocked (%m).\n"));
00277         }
00278     }
00279   else if (n == 0)
00280     {
00281       ACE_DEBUG ((LM_WARNING, "EOF on send of mcast fragment (%m).\n"));
00282     }
00283 }

void TAO_ECG_CDR_Message_Sender::send_message const TAO_OutputCDR cdr,
const ACE_INET_Addr addr
throw (CORBA::SystemException)
 

The main method - send a CDR message.

Todo:
Under some platforms, notably Linux, the fragmentation code in this method is woefully naive. The fragments are sent it a big burst, unfortunately, that can fill up the local kernel buffer before all the data is sent. In those circumstances some of the fragments are silently (gulp!) dropped by the kernel, check the documentation for sendto(2) specially the ENOBUFS error condition. There is no easy solution that I know off, except "pacing" the fragments, i.e. never sending more than a prescribed number of bytes per-second, sleeping before sending more or queueing them to send later via the reactor.

Definition at line 35 of file ECG_CDR_Message_Sender.cpp.

References ACE_CHECK, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_IOV_MAX, ACE_THROW, ACE_Message_Block::cont(), ECG_HEADER_SIZE, ACE_Message_Block::end(), iovec::iov_base, iovec::iov_len, ACE_Message_Block::length(), LM_ERROR, and ACE_Message_Block::rd_ptr().

00039 {
00040   if (this->endpoint_rptr_.get () == 0)
00041     {
00042       ACE_ERROR ((LM_ERROR, "Attempt to invoke send_message() "
00043                             "on non-initialized sender object.\n"));
00044       ACE_THROW (CORBA::INTERNAL ());
00045     }
00046 
00047   CORBA::ULong max_fragment_payload = this->mtu () -
00048     TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE;
00049   // ACE_ASSERT (max_fragment_payload != 0);
00050 
00051 #if defined (ACE_HAS_BROKEN_DGRAM_SENDV)
00052   const int TAO_WRITEV_MAX = ACE_IOV_MAX - 1;
00053 #else
00054   const int TAO_WRITEV_MAX = ACE_IOV_MAX;
00055 #endif /* ACE_HAS_BROKEN_DGRAM_SENDV */
00056   iovec iov[TAO_WRITEV_MAX];
00057 
00058   CORBA::ULong total_length;
00059   CORBA::ULong fragment_count =
00060     this->compute_fragment_count (cdr.begin (),
00061                                   cdr.end (),
00062                                   TAO_WRITEV_MAX,
00063                                   max_fragment_payload,
00064                                   total_length);
00065 
00066   CORBA::ULong request_id = this->endpoint_rptr_->next_request_id ();
00067 
00068   // Reserve the first iovec for the header...
00069   int iovcnt = 1;
00070   CORBA::ULong fragment_id = 0;
00071   CORBA::ULong fragment_offset = 0;
00072   CORBA::ULong fragment_size = 0;
00073   for (const ACE_Message_Block* b = cdr.begin ();
00074        b != cdr.end ();
00075        b = b->cont ())
00076     {
00077       CORBA::ULong l = b->length ();
00078 
00079       char* rd_ptr = b->rd_ptr ();
00080 
00081       iov[iovcnt].iov_base = rd_ptr;
00082       iov[iovcnt].iov_len  = l;
00083       fragment_size += l;
00084       ++iovcnt;
00085       while (fragment_size > max_fragment_payload)
00086         {
00087           // This fragment is full, we have to send it...
00088 
00089           // First adjust the last iov entry:
00090           CORBA::ULong last_mb_length =
00091             max_fragment_payload - (fragment_size - l);
00092           iov[iovcnt - 1].iov_len = last_mb_length;
00093 
00094           this->send_fragment (addr,
00095                                request_id,
00096                                total_length,
00097                                max_fragment_payload,
00098                                fragment_offset,
00099                                fragment_id,
00100                                fragment_count,
00101                                iov,
00102                                iovcnt
00103                                ACE_ENV_ARG_PARAMETER);
00104           ACE_CHECK;
00105           ++fragment_id;
00106           fragment_offset += max_fragment_payload;
00107 
00108           // Reset, but don't forget that the last Message_Block
00109               // may need to be sent in multiple fragments..
00110           l -= last_mb_length;
00111           rd_ptr += last_mb_length;
00112           iov[1].iov_base = rd_ptr;
00113           iov[1].iov_len = l;
00114           fragment_size = l;
00115           iovcnt = 2;
00116         }
00117       if (fragment_size == max_fragment_payload)
00118         {
00119           // We filled a fragment, but this time it was filled
00120           // exactly, the treatment is a little different from the
00121           // loop above...
00122           this->send_fragment (addr,
00123                                request_id,
00124                                total_length,
00125                                max_fragment_payload,
00126                                fragment_offset,
00127                                fragment_id,
00128                                fragment_count,
00129                                iov,
00130                                iovcnt
00131                                ACE_ENV_ARG_PARAMETER);
00132           ACE_CHECK;
00133           ++fragment_id;
00134           fragment_offset += max_fragment_payload;
00135 
00136           iovcnt = 1;
00137           fragment_size = 0;
00138         }
00139       if (iovcnt == TAO_WRITEV_MAX)
00140         {
00141           // Now we ran out of space in the iovec, we must send a
00142           // fragment to work around that....
00143           this->send_fragment (addr,
00144                                request_id,
00145                                total_length,
00146                                fragment_size,
00147                                fragment_offset,
00148                                fragment_id,
00149                                fragment_count,
00150                                iov,
00151                                iovcnt
00152                                ACE_ENV_ARG_PARAMETER);
00153           ACE_CHECK;
00154           ++fragment_id;
00155           fragment_offset += fragment_size;
00156 
00157           iovcnt = 1;
00158           fragment_size = 0;
00159         }
00160     }
00161   // There is something left in the iovvec that we must send
00162   // also...
00163   if (iovcnt != 1)
00164     {
00165       // Now we ran out of space in the iovec, we must send a
00166       // fragment to work around that....
00167       this->send_fragment (addr,
00168                            request_id,
00169                            total_length,
00170                            fragment_size,
00171                            fragment_offset,
00172                            fragment_id,
00173                            fragment_count,
00174                            iov,
00175                            iovcnt
00176                            ACE_ENV_ARG_PARAMETER);
00177       ACE_CHECK;
00178       ++fragment_id;
00179       fragment_offset += fragment_size;
00180 
00181       // reset, not needed here...
00182       // iovcnt = 1;
00183       // fragment_size = 0;
00184     }
00185   // ACE_ASSERT (total_length == fragment_offset);
00186   // ACE_ASSERT (fragment_id == fragment_count);
00187 
00188 }

void TAO_ECG_CDR_Message_Sender::shutdown  ) 
 

Referenced by TAO_ECG_UDP_Sender::shutdown().


Member Data Documentation

CORBA::Boolean TAO_ECG_CDR_Message_Sender::checksum_ [private]
 

Should crc checksum be caluclated and sent?

Definition at line 179 of file ECG_CDR_Message_Sender.h.

Referenced by send_fragment().

TAO_ECG_Refcounted_Endpoint TAO_ECG_CDR_Message_Sender::endpoint_rptr_ [private]
 

The datagram used for sendto ().

Definition at line 173 of file ECG_CDR_Message_Sender.h.

Referenced by dgram(), and get_local_addr().

CORBA::ULong TAO_ECG_CDR_Message_Sender::mtu_ [private]
 

The MTU for this sender...

Definition at line 176 of file ECG_CDR_Message_Sender.h.

Referenced by mtu().


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 13:16:04 2006 for TAO_RTEvent by doxygen 1.3.6