TAO_ECG_CDR_Message_Sender Class Reference

Sends CDR messages using UDP. NOT THREAD-SAFE. More...

#include <ECG_CDR_Message_Sender.h>

Collaboration diagram for TAO_ECG_CDR_Message_Sender:

Collaboration graph
[legend]
List of all members.

Public Types

 ECG_HEADER_SIZE = 32
 ECG_MIN_MTU = 32 + 8
 ECG_MAX_MTU = 65536
 ECG_DEFAULT_MTU = 1024
enum  { ECG_HEADER_SIZE = 32, ECG_MIN_MTU = 32 + 8, ECG_MAX_MTU = 65536, ECG_DEFAULT_MTU = 1024 }

Public Member Functions

void send_message (const TAO_OutputCDR &cdr, const ACE_INET_Addr &addr)
 The main method - send a CDR message.
 TAO_ECG_CDR_Message_Sender (CORBA::Boolean crc=0)
 Initialization and termination methods.
void init (TAO_ECG_Refcounted_Endpoint endpoint_rptr)
 Set the endpoint for sending messages.
void shutdown (void)
 Initialization and termination methods.
int get_local_addr (ACE_INET_Addr &addr)
 Get the local endpoint used to send the events.
int mtu (CORBA::ULong mtu)
CORBA::ULong mtu (void) const
 Get the local endpoint used to send the events.

Private Member Functions

ACE_SOCK_Dgramdgram (void)
 Return the datagram...
void send_fragment (const ACE_INET_Addr &addr, CORBA::ULong request_id, CORBA::ULong request_size, CORBA::ULong fragment_size, CORBA::ULong fragment_offset, CORBA::ULong fragment_id, CORBA::ULong fragment_count, iovec iov[], int iovcnt)
CORBA::ULong compute_fragment_count (const ACE_Message_Block *begin, const ACE_Message_Block *end, int iov_size, CORBA::ULong max_fragment_payload, CORBA::ULong &total_length)

Private Attributes

TAO_ECG_Refcounted_Endpoint endpoint_rptr_
 The datagram used for sendto ().
CORBA::ULong mtu_
 The MTU for this sender...
CORBA::Boolean checksum_
 Should crc checksum be caluclated and sent?

Detailed Description

Sends CDR messages using UDP. NOT THREAD-SAFE.

This class breaks up a CDR message into fragments and sends each fragment with a header (described below) using UDP. The UDP address can be a normal IP address or it can be a multicast group. The UDP address is obtained from a RtecUDPAdmin::AddrServer class.

This class is used by various Gateway (Senders/Receivers) classes responsible for federating Event Channels with UDP/Mcast.

MESSAGE FORMAT

Message header are encapsulated using CDR, with the following format: struct Header { octet byte_order_flags; // bit 0 represents the byte order as in GIOP 1.1 // bit 1 is set if this is the last fragment unsigned long request_id; // The request ID, senders must not send two requests with // the same ID, senders can be distinguished using recvfrom.. unsigned long request_size; // The size of this request, this can be used to pre-allocate // the request buffer. unsgined long fragment_size; // The size of this fragment, excluding the header... unsigned long fragment_offset; // Where does this fragment fit in the complete message... unsigned long fragment_id; // The ID of this fragment... unsigned long fragment_count; // The total number of fragments to expect in this request

//

Todo:
This could be eliminated if efficient reassembly // could be implemented without it. octet padding[4];
// Ensures the header ends at an 8-byte boundary. }; // size (in CDR stream) = 32

Definition at line 75 of file ECG_CDR_Message_Sender.h.


Member Enumeration Documentation

anonymous enum

Enumerator:
ECG_HEADER_SIZE 
ECG_MIN_MTU 
ECG_MAX_MTU 
ECG_DEFAULT_MTU 

Definition at line 79 of file ECG_CDR_Message_Sender.h.

00079        {
00080     ECG_HEADER_SIZE = 32,
00081     ECG_MIN_MTU = 32 + 8,
00082     ECG_MAX_MTU = 65536, // Really optimistic...
00083     ECG_DEFAULT_MTU = 1024
00084   };


Constructor & Destructor Documentation

TAO_ECG_CDR_Message_Sender::TAO_ECG_CDR_Message_Sender ( CORBA::Boolean  crc = 0  ) 

Initialization and termination methods.


Member Function Documentation

CORBA::ULong TAO_ECG_CDR_Message_Sender::compute_fragment_count ( const ACE_Message_Block begin,
const ACE_Message_Block end,
int  iov_size,
CORBA::ULong  max_fragment_payload,
CORBA::ULong total_length 
) [private]

Count the number of fragments that will be required to send the message blocks in the range [begin,end) The maximum fragment payload (i.e. the size without the header is also required); <total_length> returns the total message size.

Definition at line 274 of file ECG_CDR_Message_Sender.cpp.

References ACE_Message_Block::cont().

Referenced by send_message().

00279 {
00280   CORBA::ULong fragment_count = 0;
00281   total_length = 0;
00282 
00283   CORBA::ULong fragment_size = 0;
00284   // Reserve the first iovec for the header...
00285   int iovcnt = 1;
00286   for (const ACE_Message_Block* b = begin;
00287        b != end;
00288        b = b->cont ())
00289     {
00290       CORBA::ULong l = b->length ();
00291       total_length += l;
00292       fragment_size += l;
00293       ++iovcnt;
00294       while (fragment_size > max_fragment_payload)
00295         {
00296           // Ran out of space, must create a fragment...
00297           ++fragment_count;
00298 
00299           // The next iovector will contain what remains of this
00300           // buffer, but also consider
00301           iovcnt = 2;
00302           l -= max_fragment_payload - (fragment_size - l);
00303           fragment_size = l;
00304         }
00305       if (fragment_size == max_fragment_payload)
00306         {
00307           ++fragment_count;
00308           iovcnt = 1;
00309           fragment_size = 0;
00310         }
00311       if (iovcnt >= iov_size)
00312         {
00313           // Ran out of space in the iovector....
00314           ++fragment_count;
00315           iovcnt = 1;
00316           fragment_size = 0;
00317         }
00318     }
00319   if (iovcnt != 1)
00320     {
00321       // Send the remaining data in another fragment
00322       ++fragment_count;
00323     }
00324   return fragment_count;
00325 }

ACE_SOCK_Dgram& TAO_ECG_CDR_Message_Sender::dgram ( void   )  [private]

Return the datagram...

Referenced by send_fragment().

int TAO_ECG_CDR_Message_Sender::get_local_addr ( ACE_INET_Addr addr  ) 

Get the local endpoint used to send the events.

TAO_BEGIN_VERSIONED_NAMESPACE_DECL void TAO_ECG_CDR_Message_Sender::init ( TAO_ECG_Refcounted_Endpoint  endpoint_rptr  ) 

Set the endpoint for sending messages.

If init () is successful, shutdown () must be called when the sender is no longer needed. If shutdown () is not called by the user, cleanup activities will be performed by the destructor.

Definition at line 18 of file ECG_CDR_Message_Sender.cpp.

References ACE_ERROR, and LM_ERROR.

Referenced by TAO_ECG_UDP_Sender::init().

00020 {
00021   if (endpoint_rptr.get () == 0
00022       || endpoint_rptr->dgram ().get_handle () == ACE_INVALID_HANDLE)
00023     {
00024       ACE_ERROR ((LM_ERROR, "TAO_ECG_CDR_Message_Sender::init(): "
00025                             "nil or unitialized endpoint argument.\n"));
00026       throw CORBA::INTERNAL ();
00027     }
00028 
00029   this->endpoint_rptr_ = endpoint_rptr;
00030 }

CORBA::ULong TAO_ECG_CDR_Message_Sender::mtu ( void   )  const

Get the local endpoint used to send the events.

Referenced by send_message().

int TAO_ECG_CDR_Message_Sender::mtu ( CORBA::ULong  mtu  ) 

The sender may need to fragment the message, otherwise the network may drop the packets. Setting the MTU can fail if the value is too small (at least the header + 8 bytes must fit).

void TAO_ECG_CDR_Message_Sender::send_fragment ( const ACE_INET_Addr addr,
CORBA::ULong  request_id,
CORBA::ULong  request_size,
CORBA::ULong  fragment_size,
CORBA::ULong  fragment_offset,
CORBA::ULong  fragment_id,
CORBA::ULong  fragment_count,
iovec  iov[],
int  iovcnt 
) [private]

Send one fragment, the first entry in the iovec is used to send the header, the rest of the iovec array should contain pointers to the actual data.

Definition at line 180 of file ECG_CDR_Message_Sender.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_HTONL, checksum_, ACE::crc32(), dgram(), ECG_HEADER_SIZE, LM_ERROR, LM_WARNING, ACE_CDR::MAX_ALIGNMENT, ACE_SOCK_Dgram::send(), and TAO_ENCAP_BYTE_ORDER.

Referenced by send_message().

00189 {
00190   CORBA::ULong header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE
00191                      / sizeof(CORBA::ULong)
00192                      + ACE_CDR::MAX_ALIGNMENT];
00193   char* buf = reinterpret_cast<char*> (header);
00194   TAO_OutputCDR cdr (buf, sizeof(header));
00195   cdr.write_boolean (TAO_ENCAP_BYTE_ORDER);
00196   // Insert some known values in the padding bytes, so we can smoke
00197   // test the message on the receiving end.
00198   cdr.write_octet ('A'); cdr.write_octet ('B'); cdr.write_octet ('C');
00199   cdr.write_ulong (request_id);
00200   cdr.write_ulong (request_size);
00201   cdr.write_ulong (fragment_size);
00202   cdr.write_ulong (fragment_offset);
00203   cdr.write_ulong (fragment_id);
00204   cdr.write_ulong (fragment_count);
00205   CORBA::Octet padding[4];
00206 
00207 
00208   // MRH
00209   if (checksum_)
00210   {
00211     // Compute CRC
00212      iov[0].iov_base = cdr.begin ()->rd_ptr ();
00213      iov[0].iov_len  = cdr.begin ()->length ();
00214      unsigned int crc = 0;
00215      unsigned char *crc_parts = (unsigned char *)(&crc);
00216      if (iovcnt > 1)
00217        {
00218          crc = ACE::crc32 (iov, iovcnt);
00219          crc = ACE_HTONL (crc);
00220        }
00221      for (int cnt=0; cnt<4; ++cnt)
00222        {
00223          padding[cnt] = crc_parts[cnt];
00224        }
00225    }
00226    else
00227    {
00228      for (int cnt=0; cnt<4; ++cnt)
00229      {
00230        padding[cnt] = 0;
00231      }
00232    }
00233    //End MRH
00234   cdr.write_octet_array (padding, 4);
00235 
00236   iov[0].iov_base = cdr.begin ()->rd_ptr ();
00237   iov[0].iov_len  = cdr.begin ()->length ();
00238 
00239   ssize_t n = this->dgram ().send (iov,
00240                                    iovcnt,
00241                                    addr);
00242   size_t expected_n = 0;
00243   for (int i = 0; i < iovcnt; ++i)
00244     expected_n += iov[i].iov_len;
00245   if (n > 0 && size_t(n) != expected_n)
00246     {
00247       ACE_ERROR ((LM_ERROR, ("Sent only %d out of %d bytes "
00248                               "for mcast fragment.\n"),
00249                   n,
00250                   expected_n));
00251     }
00252 
00253   if (n == -1)
00254     {
00255       if (errno == EWOULDBLOCK)
00256         {
00257           ACE_ERROR ((LM_ERROR, "Send of mcast fragment failed (%m).\n"));
00258           // @@ TODO Use a Event Channel specific exception
00259           throw CORBA::COMM_FAILURE ();
00260         }
00261       else
00262         {
00263           ACE_DEBUG ((LM_WARNING, "Send of mcast fragment blocked (%m).\n"));
00264         }
00265     }
00266   else if (n == 0)
00267     {
00268       ACE_DEBUG ((LM_WARNING, "EOF on send of mcast fragment (%m).\n"));
00269     }
00270 }

void TAO_ECG_CDR_Message_Sender::send_message ( const TAO_OutputCDR cdr,
const ACE_INET_Addr addr 
)

The main method - send a CDR message.

Todo:
Under some platforms, notably Linux, the fragmentation code in this method is woefully naive. The fragments are sent it a big burst, unfortunately, that can fill up the local kernel buffer before all the data is sent. In those circumstances some of the fragments are silently (gulp!) dropped by the kernel, check the documentation for sendto(2) specially the ENOBUFS error condition. There is no easy solution that I know off, except "pacing" the fragments, i.e. never sending more than a prescribed number of bytes per-second, sleeping before sending more or queueing them to send later via the reactor.

Definition at line 33 of file ECG_CDR_Message_Sender.cpp.

References ACE_ERROR, ACE_IOV_MAX, ACE_OutputCDR::begin(), compute_fragment_count(), ACE_Message_Block::cont(), ECG_HEADER_SIZE, ACE_OutputCDR::end(), endpoint_rptr_, LM_ERROR, mtu(), and send_fragment().

Referenced by TAO_ECG_UDP_Sender::push().

00035 {
00036   if (this->endpoint_rptr_.get () == 0)
00037     {
00038       ACE_ERROR ((LM_ERROR, "Attempt to invoke send_message() "
00039                             "on non-initialized sender object.\n"));
00040       throw CORBA::INTERNAL ();
00041     }
00042 
00043   CORBA::ULong max_fragment_payload = this->mtu () -
00044     TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE;
00045   // ACE_ASSERT (max_fragment_payload != 0);
00046 
00047 #if defined (ACE_HAS_BROKEN_DGRAM_SENDV)
00048   const int TAO_WRITEV_MAX = ACE_IOV_MAX - 1;
00049 #else
00050   const int TAO_WRITEV_MAX = ACE_IOV_MAX;
00051 #endif /* ACE_HAS_BROKEN_DGRAM_SENDV */
00052   iovec iov[TAO_WRITEV_MAX];
00053 
00054   CORBA::ULong total_length;
00055   CORBA::ULong fragment_count =
00056     this->compute_fragment_count (cdr.begin (),
00057                                   cdr.end (),
00058                                   TAO_WRITEV_MAX,
00059                                   max_fragment_payload,
00060                                   total_length);
00061 
00062   CORBA::ULong request_id = this->endpoint_rptr_->next_request_id ();
00063 
00064   // Reserve the first iovec for the header...
00065   int iovcnt = 1;
00066   CORBA::ULong fragment_id = 0;
00067   CORBA::ULong fragment_offset = 0;
00068   CORBA::ULong fragment_size = 0;
00069   for (const ACE_Message_Block* b = cdr.begin ();
00070        b != cdr.end ();
00071        b = b->cont ())
00072     {
00073       CORBA::ULong l = b->length ();
00074 
00075       char* rd_ptr = b->rd_ptr ();
00076 
00077       iov[iovcnt].iov_base = rd_ptr;
00078       iov[iovcnt].iov_len  = l;
00079       fragment_size += l;
00080       ++iovcnt;
00081       while (fragment_size > max_fragment_payload)
00082         {
00083           // This fragment is full, we have to send it...
00084 
00085           // First adjust the last iov entry:
00086           CORBA::ULong last_mb_length =
00087             max_fragment_payload - (fragment_size - l);
00088           iov[iovcnt - 1].iov_len = last_mb_length;
00089 
00090           this->send_fragment (addr,
00091                                request_id,
00092                                total_length,
00093                                max_fragment_payload,
00094                                fragment_offset,
00095                                fragment_id,
00096                                fragment_count,
00097                                iov,
00098                                iovcnt);
00099           ++fragment_id;
00100           fragment_offset += max_fragment_payload;
00101 
00102           // Reset, but don't forget that the last Message_Block
00103               // may need to be sent in multiple fragments..
00104           l -= last_mb_length;
00105           rd_ptr += last_mb_length;
00106           iov[1].iov_base = rd_ptr;
00107           iov[1].iov_len = l;
00108           fragment_size = l;
00109           iovcnt = 2;
00110         }
00111       if (fragment_size == max_fragment_payload)
00112         {
00113           // We filled a fragment, but this time it was filled
00114           // exactly, the treatment is a little different from the
00115           // loop above...
00116           this->send_fragment (addr,
00117                                request_id,
00118                                total_length,
00119                                max_fragment_payload,
00120                                fragment_offset,
00121                                fragment_id,
00122                                fragment_count,
00123                                iov,
00124                                iovcnt);
00125           ++fragment_id;
00126           fragment_offset += max_fragment_payload;
00127 
00128           iovcnt = 1;
00129           fragment_size = 0;
00130         }
00131       if (iovcnt == TAO_WRITEV_MAX)
00132         {
00133           // Now we ran out of space in the iovec, we must send a
00134           // fragment to work around that....
00135           this->send_fragment (addr,
00136                                request_id,
00137                                total_length,
00138                                fragment_size,
00139                                fragment_offset,
00140                                fragment_id,
00141                                fragment_count,
00142                                iov,
00143                                iovcnt);
00144           ++fragment_id;
00145           fragment_offset += fragment_size;
00146 
00147           iovcnt = 1;
00148           fragment_size = 0;
00149         }
00150     }
00151   // There is something left in the iovvec that we must send
00152   // also...
00153   if (iovcnt != 1)
00154     {
00155       // Now we ran out of space in the iovec, we must send a
00156       // fragment to work around that....
00157       this->send_fragment (addr,
00158                            request_id,
00159                            total_length,
00160                            fragment_size,
00161                            fragment_offset,
00162                            fragment_id,
00163                            fragment_count,
00164                            iov,
00165                            iovcnt);
00166       ++fragment_id;
00167       fragment_offset += fragment_size;
00168 
00169       // reset, not needed here...
00170       // iovcnt = 1;
00171       // fragment_size = 0;
00172     }
00173   // ACE_ASSERT (total_length == fragment_offset);
00174   // ACE_ASSERT (fragment_id == fragment_count);
00175 
00176 }

void TAO_ECG_CDR_Message_Sender::shutdown ( void   ) 

Initialization and termination methods.

Referenced by TAO_ECG_UDP_Sender::shutdown().


Member Data Documentation

CORBA::Boolean TAO_ECG_CDR_Message_Sender::checksum_ [private]

Should crc checksum be caluclated and sent?

Definition at line 173 of file ECG_CDR_Message_Sender.h.

Referenced by send_fragment().

TAO_ECG_Refcounted_Endpoint TAO_ECG_CDR_Message_Sender::endpoint_rptr_ [private]

The datagram used for sendto ().

Definition at line 167 of file ECG_CDR_Message_Sender.h.

Referenced by send_message().

CORBA::ULong TAO_ECG_CDR_Message_Sender::mtu_ [private]

The MTU for this sender...

Definition at line 170 of file ECG_CDR_Message_Sender.h.


The documentation for this class was generated from the following files:
Generated on Tue Feb 2 17:44:38 2010 for TAO_RTEvent by  doxygen 1.4.7