ECG_CDR_Message_Sender.cpp

Go to the documentation of this file.
00001 // ECG_CDR_Message_Sender.cpp,v 1.10 2006/03/14 06:14:25 jtc Exp
00002 
00003 #include "orbsvcs/Event/ECG_CDR_Message_Sender.h"
00004 #include "tao/CDR.h"
00005 #include "ace/SOCK_Dgram.h"
00006 #include "ace/INET_Addr.h"
00007 #include "ace/ACE.h"
00008 
00009 #if !defined(__ACE_INLINE__)
00010 #include "orbsvcs/Event/ECG_CDR_Message_Sender.i"
00011 #endif /* __ACE_INLINE__ */
00012 
00013 ACE_RCSID(Event, ECG_CDR_Message_Sender, "ECG_CDR_Message_Sender.cpp,v 1.10 2006/03/14 06:14:25 jtc Exp")
00014 
00015 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00016 
00017 void
00018 TAO_ECG_CDR_Message_Sender::init (
00019       TAO_ECG_Refcounted_Endpoint endpoint_rptr
00020       ACE_ENV_ARG_DECL)
00021   ACE_THROW_SPEC ((CORBA::SystemException))
00022 {
00023   if (endpoint_rptr.get () == 0
00024       || endpoint_rptr->dgram ().get_handle () == ACE_INVALID_HANDLE)
00025     {
00026       ACE_ERROR ((LM_ERROR, "TAO_ECG_CDR_Message_Sender::init(): "
00027                             "nil or unitialized endpoint argument.\n"));
00028       ACE_THROW (CORBA::INTERNAL ());
00029     }
00030 
00031   this->endpoint_rptr_ = endpoint_rptr;
00032 }
00033 
00034 void
00035 TAO_ECG_CDR_Message_Sender::send_message  (const TAO_OutputCDR &cdr,
00036                                            const ACE_INET_Addr &addr
00037                                            ACE_ENV_ARG_DECL)
00038   ACE_THROW_SPEC ((CORBA::SystemException))
00039 {
00040   if (this->endpoint_rptr_.get () == 0)
00041     {
00042       ACE_ERROR ((LM_ERROR, "Attempt to invoke send_message() "
00043                             "on non-initialized sender object.\n"));
00044       ACE_THROW (CORBA::INTERNAL ());
00045     }
00046 
00047   CORBA::ULong max_fragment_payload = this->mtu () -
00048     TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE;
00049   // ACE_ASSERT (max_fragment_payload != 0);
00050 
00051 #if defined (ACE_HAS_BROKEN_DGRAM_SENDV)
00052   const int TAO_WRITEV_MAX = ACE_IOV_MAX - 1;
00053 #else
00054   const int TAO_WRITEV_MAX = ACE_IOV_MAX;
00055 #endif /* ACE_HAS_BROKEN_DGRAM_SENDV */
00056   iovec iov[TAO_WRITEV_MAX];
00057 
00058   CORBA::ULong total_length;
00059   CORBA::ULong fragment_count =
00060     this->compute_fragment_count (cdr.begin (),
00061                                   cdr.end (),
00062                                   TAO_WRITEV_MAX,
00063                                   max_fragment_payload,
00064                                   total_length);
00065 
00066   CORBA::ULong request_id = this->endpoint_rptr_->next_request_id ();
00067 
00068   // Reserve the first iovec for the header...
00069   int iovcnt = 1;
00070   CORBA::ULong fragment_id = 0;
00071   CORBA::ULong fragment_offset = 0;
00072   CORBA::ULong fragment_size = 0;
00073   for (const ACE_Message_Block* b = cdr.begin ();
00074        b != cdr.end ();
00075        b = b->cont ())
00076     {
00077       CORBA::ULong l = b->length ();
00078 
00079       char* rd_ptr = b->rd_ptr ();
00080 
00081       iov[iovcnt].iov_base = rd_ptr;
00082       iov[iovcnt].iov_len  = l;
00083       fragment_size += l;
00084       ++iovcnt;
00085       while (fragment_size > max_fragment_payload)
00086         {
00087           // This fragment is full, we have to send it...
00088 
00089           // First adjust the last iov entry:
00090           CORBA::ULong last_mb_length =
00091             max_fragment_payload - (fragment_size - l);
00092           iov[iovcnt - 1].iov_len = last_mb_length;
00093 
00094           this->send_fragment (addr,
00095                                request_id,
00096                                total_length,
00097                                max_fragment_payload,
00098                                fragment_offset,
00099                                fragment_id,
00100                                fragment_count,
00101                                iov,
00102                                iovcnt
00103                                ACE_ENV_ARG_PARAMETER);
00104           ACE_CHECK;
00105           ++fragment_id;
00106           fragment_offset += max_fragment_payload;
00107 
00108           // Reset, but don't forget that the last Message_Block
00109               // may need to be sent in multiple fragments..
00110           l -= last_mb_length;
00111           rd_ptr += last_mb_length;
00112           iov[1].iov_base = rd_ptr;
00113           iov[1].iov_len = l;
00114           fragment_size = l;
00115           iovcnt = 2;
00116         }
00117       if (fragment_size == max_fragment_payload)
00118         {
00119           // We filled a fragment, but this time it was filled
00120           // exactly, the treatment is a little different from the
00121           // loop above...
00122           this->send_fragment (addr,
00123                                request_id,
00124                                total_length,
00125                                max_fragment_payload,
00126                                fragment_offset,
00127                                fragment_id,
00128                                fragment_count,
00129                                iov,
00130                                iovcnt
00131                                ACE_ENV_ARG_PARAMETER);
00132           ACE_CHECK;
00133           ++fragment_id;
00134           fragment_offset += max_fragment_payload;
00135 
00136           iovcnt = 1;
00137           fragment_size = 0;
00138         }
00139       if (iovcnt == TAO_WRITEV_MAX)
00140         {
00141           // Now we ran out of space in the iovec, we must send a
00142           // fragment to work around that....
00143           this->send_fragment (addr,
00144                                request_id,
00145                                total_length,
00146                                fragment_size,
00147                                fragment_offset,
00148                                fragment_id,
00149                                fragment_count,
00150                                iov,
00151                                iovcnt
00152                                ACE_ENV_ARG_PARAMETER);
00153           ACE_CHECK;
00154           ++fragment_id;
00155           fragment_offset += fragment_size;
00156 
00157           iovcnt = 1;
00158           fragment_size = 0;
00159         }
00160     }
00161   // There is something left in the iovvec that we must send
00162   // also...
00163   if (iovcnt != 1)
00164     {
00165       // Now we ran out of space in the iovec, we must send a
00166       // fragment to work around that....
00167       this->send_fragment (addr,
00168                            request_id,
00169                            total_length,
00170                            fragment_size,
00171                            fragment_offset,
00172                            fragment_id,
00173                            fragment_count,
00174                            iov,
00175                            iovcnt
00176                            ACE_ENV_ARG_PARAMETER);
00177       ACE_CHECK;
00178       ++fragment_id;
00179       fragment_offset += fragment_size;
00180 
00181       // reset, not needed here...
00182       // iovcnt = 1;
00183       // fragment_size = 0;
00184     }
00185   // ACE_ASSERT (total_length == fragment_offset);
00186   // ACE_ASSERT (fragment_id == fragment_count);
00187 
00188 }
00189 
00190 
00191 void
00192 TAO_ECG_CDR_Message_Sender::send_fragment (const ACE_INET_Addr &addr,
00193                                            CORBA::ULong request_id,
00194                                            CORBA::ULong request_size,
00195                                            CORBA::ULong fragment_size,
00196                                            CORBA::ULong fragment_offset,
00197                                            CORBA::ULong fragment_id,
00198                                            CORBA::ULong fragment_count,
00199                                            iovec iov[],
00200                                            int iovcnt
00201                                            ACE_ENV_ARG_DECL)
00202 {
00203   CORBA::ULong header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE
00204                      / sizeof(CORBA::ULong)
00205                      + ACE_CDR::MAX_ALIGNMENT];
00206   char* buf = reinterpret_cast<char*> (header);
00207   TAO_OutputCDR cdr (buf, sizeof(header));
00208   cdr.write_boolean (TAO_ENCAP_BYTE_ORDER);
00209   // Insert some known values in the padding bytes, so we can smoke
00210   // test the message on the receiving end.
00211   cdr.write_octet ('A'); cdr.write_octet ('B'); cdr.write_octet ('C');
00212   cdr.write_ulong (request_id);
00213   cdr.write_ulong (request_size);
00214   cdr.write_ulong (fragment_size);
00215   cdr.write_ulong (fragment_offset);
00216   cdr.write_ulong (fragment_id);
00217   cdr.write_ulong (fragment_count);
00218   CORBA::Octet padding[4];
00219 
00220 
00221   // MRH
00222   if (checksum_)
00223   {
00224     // Compute CRC
00225      iov[0].iov_base = cdr.begin ()->rd_ptr ();
00226      iov[0].iov_len  = cdr.begin ()->length ();
00227      unsigned int crc = 0;
00228      unsigned char *crc_parts = (unsigned char *)(&crc);
00229      if (iovcnt > 1)
00230        {
00231          crc = ACE::crc32 (iov, iovcnt);
00232          crc = htonl (crc);
00233        }
00234      for (int cnt=0; cnt<4; ++cnt)
00235        {
00236          padding[cnt] = crc_parts[cnt];
00237        }
00238    }
00239    else
00240    {
00241      for (int cnt=0; cnt<4; ++cnt)
00242      {
00243        padding[cnt] = 0;
00244      }
00245    }
00246    //End MRH
00247   cdr.write_octet_array (padding, 4);
00248 
00249   iov[0].iov_base = cdr.begin ()->rd_ptr ();
00250   iov[0].iov_len  = cdr.begin ()->length ();
00251 
00252   ssize_t n = this->dgram ().send (iov,
00253                                    iovcnt,
00254                                    addr);
00255   size_t expected_n = 0;
00256   for (int i = 0; i < iovcnt; ++i)
00257     expected_n += iov[i].iov_len;
00258   if (n > 0 && size_t(n) != expected_n)
00259     {
00260       ACE_DEBUG ((LM_ERROR, ("Sent only %d out of %d bytes "
00261                               "for mcast fragment.\n"),
00262                   n,
00263                   expected_n));
00264     }
00265 
00266   if (n == -1)
00267     {
00268       if (errno == EWOULDBLOCK)
00269         {
00270           ACE_DEBUG ((LM_ERROR, "Send of mcast fragment failed (%m).\n"));
00271           // @@ TODO Use a Event Channel specific exception
00272           ACE_THROW (CORBA::COMM_FAILURE ());
00273         }
00274       else
00275         {
00276           ACE_DEBUG ((LM_WARNING, "Send of mcast fragment blocked (%m).\n"));
00277         }
00278     }
00279   else if (n == 0)
00280     {
00281       ACE_DEBUG ((LM_WARNING, "EOF on send of mcast fragment (%m).\n"));
00282     }
00283 }
00284 
00285 
00286 CORBA::ULong
00287 TAO_ECG_CDR_Message_Sender::compute_fragment_count (const ACE_Message_Block* begin,
00288                                                     const ACE_Message_Block* end,
00289                                                     int iov_size,
00290                                                     CORBA::ULong max_fragment_payload,
00291                                                     CORBA::ULong& total_length)
00292 {
00293   CORBA::ULong fragment_count = 0;
00294   total_length = 0;
00295 
00296   CORBA::ULong fragment_size = 0;
00297   // Reserve the first iovec for the header...
00298   int iovcnt = 1;
00299   for (const ACE_Message_Block* b = begin;
00300        b != end;
00301        b = b->cont ())
00302     {
00303       CORBA::ULong l = b->length ();
00304       total_length += l;
00305       fragment_size += l;
00306       ++iovcnt;
00307       while (fragment_size > max_fragment_payload)
00308         {
00309           // Ran out of space, must create a fragment...
00310           ++fragment_count;
00311 
00312           // The next iovector will contain what remains of this
00313           // buffer, but also consider
00314           iovcnt = 2;
00315           l -= max_fragment_payload - (fragment_size - l);
00316           fragment_size = l;
00317         }
00318       if (fragment_size == max_fragment_payload)
00319         {
00320           ++fragment_count;
00321           iovcnt = 1;
00322           fragment_size = 0;
00323         }
00324       if (iovcnt >= iov_size)
00325         {
00326           // Ran out of space in the iovector....
00327           ++fragment_count;
00328           iovcnt = 1;
00329           fragment_size = 0;
00330         }
00331     }
00332   if (iovcnt != 1)
00333     {
00334       // Send the remaining data in another fragment
00335       ++fragment_count;
00336     }
00337   return fragment_count;
00338 }
00339 
00340 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:11:10 2006 for TAO_RTEvent by doxygen 1.3.6