ECG_CDR_Message_Sender.cpp

Go to the documentation of this file.
00001 // $Id: ECG_CDR_Message_Sender.cpp 77001 2007-02-12 07:54:49Z johnnyw $
00002 
00003 #include "orbsvcs/Event/ECG_CDR_Message_Sender.h"
00004 #include "tao/CDR.h"
00005 #include "ace/SOCK_Dgram.h"
00006 #include "ace/INET_Addr.h"
00007 #include "ace/ACE.h"
00008 
00009 #if !defined(__ACE_INLINE__)
00010 #include "orbsvcs/Event/ECG_CDR_Message_Sender.inl"
00011 #endif /* __ACE_INLINE__ */
00012 
00013 ACE_RCSID(Event, ECG_CDR_Message_Sender, "$Id: ECG_CDR_Message_Sender.cpp 77001 2007-02-12 07:54:49Z johnnyw $")
00014 
00015 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00016 
00017 void
00018 TAO_ECG_CDR_Message_Sender::init (
00019       TAO_ECG_Refcounted_Endpoint endpoint_rptr)
00020 {
00021   if (endpoint_rptr.get () == 0
00022       || endpoint_rptr->dgram ().get_handle () == ACE_INVALID_HANDLE)
00023     {
00024       ACE_ERROR ((LM_ERROR, "TAO_ECG_CDR_Message_Sender::init(): "
00025                             "nil or unitialized endpoint argument.\n"));
00026       throw CORBA::INTERNAL ();
00027     }
00028 
00029   this->endpoint_rptr_ = endpoint_rptr;
00030 }
00031 
00032 void
00033 TAO_ECG_CDR_Message_Sender::send_message  (const TAO_OutputCDR &cdr,
00034                                            const ACE_INET_Addr &addr)
00035 {
00036   if (this->endpoint_rptr_.get () == 0)
00037     {
00038       ACE_ERROR ((LM_ERROR, "Attempt to invoke send_message() "
00039                             "on non-initialized sender object.\n"));
00040       throw CORBA::INTERNAL ();
00041     }
00042 
00043   CORBA::ULong max_fragment_payload = this->mtu () -
00044     TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE;
00045   // ACE_ASSERT (max_fragment_payload != 0);
00046 
00047 #if defined (ACE_HAS_BROKEN_DGRAM_SENDV)
00048   const int TAO_WRITEV_MAX = ACE_IOV_MAX - 1;
00049 #else
00050   const int TAO_WRITEV_MAX = ACE_IOV_MAX;
00051 #endif /* ACE_HAS_BROKEN_DGRAM_SENDV */
00052   iovec iov[TAO_WRITEV_MAX];
00053 
00054   CORBA::ULong total_length;
00055   CORBA::ULong fragment_count =
00056     this->compute_fragment_count (cdr.begin (),
00057                                   cdr.end (),
00058                                   TAO_WRITEV_MAX,
00059                                   max_fragment_payload,
00060                                   total_length);
00061 
00062   CORBA::ULong request_id = this->endpoint_rptr_->next_request_id ();
00063 
00064   // Reserve the first iovec for the header...
00065   int iovcnt = 1;
00066   CORBA::ULong fragment_id = 0;
00067   CORBA::ULong fragment_offset = 0;
00068   CORBA::ULong fragment_size = 0;
00069   for (const ACE_Message_Block* b = cdr.begin ();
00070        b != cdr.end ();
00071        b = b->cont ())
00072     {
00073       CORBA::ULong l = b->length ();
00074 
00075       char* rd_ptr = b->rd_ptr ();
00076 
00077       iov[iovcnt].iov_base = rd_ptr;
00078       iov[iovcnt].iov_len  = l;
00079       fragment_size += l;
00080       ++iovcnt;
00081       while (fragment_size > max_fragment_payload)
00082         {
00083           // This fragment is full, we have to send it...
00084 
00085           // First adjust the last iov entry:
00086           CORBA::ULong last_mb_length =
00087             max_fragment_payload - (fragment_size - l);
00088           iov[iovcnt - 1].iov_len = last_mb_length;
00089 
00090           this->send_fragment (addr,
00091                                request_id,
00092                                total_length,
00093                                max_fragment_payload,
00094                                fragment_offset,
00095                                fragment_id,
00096                                fragment_count,
00097                                iov,
00098                                iovcnt);
00099           ++fragment_id;
00100           fragment_offset += max_fragment_payload;
00101 
00102           // Reset, but don't forget that the last Message_Block
00103               // may need to be sent in multiple fragments..
00104           l -= last_mb_length;
00105           rd_ptr += last_mb_length;
00106           iov[1].iov_base = rd_ptr;
00107           iov[1].iov_len = l;
00108           fragment_size = l;
00109           iovcnt = 2;
00110         }
00111       if (fragment_size == max_fragment_payload)
00112         {
00113           // We filled a fragment, but this time it was filled
00114           // exactly, the treatment is a little different from the
00115           // loop above...
00116           this->send_fragment (addr,
00117                                request_id,
00118                                total_length,
00119                                max_fragment_payload,
00120                                fragment_offset,
00121                                fragment_id,
00122                                fragment_count,
00123                                iov,
00124                                iovcnt);
00125           ++fragment_id;
00126           fragment_offset += max_fragment_payload;
00127 
00128           iovcnt = 1;
00129           fragment_size = 0;
00130         }
00131       if (iovcnt == TAO_WRITEV_MAX)
00132         {
00133           // Now we ran out of space in the iovec, we must send a
00134           // fragment to work around that....
00135           this->send_fragment (addr,
00136                                request_id,
00137                                total_length,
00138                                fragment_size,
00139                                fragment_offset,
00140                                fragment_id,
00141                                fragment_count,
00142                                iov,
00143                                iovcnt);
00144           ++fragment_id;
00145           fragment_offset += fragment_size;
00146 
00147           iovcnt = 1;
00148           fragment_size = 0;
00149         }
00150     }
00151   // There is something left in the iovvec that we must send
00152   // also...
00153   if (iovcnt != 1)
00154     {
00155       // Now we ran out of space in the iovec, we must send a
00156       // fragment to work around that....
00157       this->send_fragment (addr,
00158                            request_id,
00159                            total_length,
00160                            fragment_size,
00161                            fragment_offset,
00162                            fragment_id,
00163                            fragment_count,
00164                            iov,
00165                            iovcnt);
00166       ++fragment_id;
00167       fragment_offset += fragment_size;
00168 
00169       // reset, not needed here...
00170       // iovcnt = 1;
00171       // fragment_size = 0;
00172     }
00173   // ACE_ASSERT (total_length == fragment_offset);
00174   // ACE_ASSERT (fragment_id == fragment_count);
00175 
00176 }
00177 
00178 
00179 void
00180 TAO_ECG_CDR_Message_Sender::send_fragment (const ACE_INET_Addr &addr,
00181                                            CORBA::ULong request_id,
00182                                            CORBA::ULong request_size,
00183                                            CORBA::ULong fragment_size,
00184                                            CORBA::ULong fragment_offset,
00185                                            CORBA::ULong fragment_id,
00186                                            CORBA::ULong fragment_count,
00187                                            iovec iov[],
00188                                            int iovcnt)
00189 {
00190   CORBA::ULong header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE
00191                      / sizeof(CORBA::ULong)
00192                      + ACE_CDR::MAX_ALIGNMENT];
00193   char* buf = reinterpret_cast<char*> (header);
00194   TAO_OutputCDR cdr (buf, sizeof(header));
00195   cdr.write_boolean (TAO_ENCAP_BYTE_ORDER);
00196   // Insert some known values in the padding bytes, so we can smoke
00197   // test the message on the receiving end.
00198   cdr.write_octet ('A'); cdr.write_octet ('B'); cdr.write_octet ('C');
00199   cdr.write_ulong (request_id);
00200   cdr.write_ulong (request_size);
00201   cdr.write_ulong (fragment_size);
00202   cdr.write_ulong (fragment_offset);
00203   cdr.write_ulong (fragment_id);
00204   cdr.write_ulong (fragment_count);
00205   CORBA::Octet padding[4];
00206 
00207 
00208   // MRH
00209   if (checksum_)
00210   {
00211     // Compute CRC
00212      iov[0].iov_base = cdr.begin ()->rd_ptr ();
00213      iov[0].iov_len  = cdr.begin ()->length ();
00214      unsigned int crc = 0;
00215      unsigned char *crc_parts = (unsigned char *)(&crc);
00216      if (iovcnt > 1)
00217        {
00218          crc = ACE::crc32 (iov, iovcnt);
00219          crc = ACE_HTONL (crc);
00220        }
00221      for (int cnt=0; cnt<4; ++cnt)
00222        {
00223          padding[cnt] = crc_parts[cnt];
00224        }
00225    }
00226    else
00227    {
00228      for (int cnt=0; cnt<4; ++cnt)
00229      {
00230        padding[cnt] = 0;
00231      }
00232    }
00233    //End MRH
00234   cdr.write_octet_array (padding, 4);
00235 
00236   iov[0].iov_base = cdr.begin ()->rd_ptr ();
00237   iov[0].iov_len  = cdr.begin ()->length ();
00238 
00239   ssize_t n = this->dgram ().send (iov,
00240                                    iovcnt,
00241                                    addr);
00242   size_t expected_n = 0;
00243   for (int i = 0; i < iovcnt; ++i)
00244     expected_n += iov[i].iov_len;
00245   if (n > 0 && size_t(n) != expected_n)
00246     {
00247       ACE_ERROR ((LM_ERROR, ("Sent only %d out of %d bytes "
00248                               "for mcast fragment.\n"),
00249                   n,
00250                   expected_n));
00251     }
00252 
00253   if (n == -1)
00254     {
00255       if (errno == EWOULDBLOCK)
00256         {
00257           ACE_ERROR ((LM_ERROR, "Send of mcast fragment failed (%m).\n"));
00258           // @@ TODO Use a Event Channel specific exception
00259           throw CORBA::COMM_FAILURE ();
00260         }
00261       else
00262         {
00263           ACE_DEBUG ((LM_WARNING, "Send of mcast fragment blocked (%m).\n"));
00264         }
00265     }
00266   else if (n == 0)
00267     {
00268       ACE_DEBUG ((LM_WARNING, "EOF on send of mcast fragment (%m).\n"));
00269     }
00270 }
00271 
00272 
00273 CORBA::ULong
00274 TAO_ECG_CDR_Message_Sender::compute_fragment_count (const ACE_Message_Block* begin,
00275                                                     const ACE_Message_Block* end,
00276                                                     int iov_size,
00277                                                     CORBA::ULong max_fragment_payload,
00278                                                     CORBA::ULong& total_length)
00279 {
00280   CORBA::ULong fragment_count = 0;
00281   total_length = 0;
00282 
00283   CORBA::ULong fragment_size = 0;
00284   // Reserve the first iovec for the header...
00285   int iovcnt = 1;
00286   for (const ACE_Message_Block* b = begin;
00287        b != end;
00288        b = b->cont ())
00289     {
00290       CORBA::ULong l = b->length ();
00291       total_length += l;
00292       fragment_size += l;
00293       ++iovcnt;
00294       while (fragment_size > max_fragment_payload)
00295         {
00296           // Ran out of space, must create a fragment...
00297           ++fragment_count;
00298 
00299           // The next iovector will contain what remains of this
00300           // buffer, but also consider
00301           iovcnt = 2;
00302           l -= max_fragment_payload - (fragment_size - l);
00303           fragment_size = l;
00304         }
00305       if (fragment_size == max_fragment_payload)
00306         {
00307           ++fragment_count;
00308           iovcnt = 1;
00309           fragment_size = 0;
00310         }
00311       if (iovcnt >= iov_size)
00312         {
00313           // Ran out of space in the iovector....
00314           ++fragment_count;
00315           iovcnt = 1;
00316           fragment_size = 0;
00317         }
00318     }
00319   if (iovcnt != 1)
00320     {
00321       // Send the remaining data in another fragment
00322       ++fragment_count;
00323     }
00324   return fragment_count;
00325 }
00326 
00327 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:44:06 2010 for TAO_RTEvent by  doxygen 1.4.7