00001
00002
00003 #include "orbsvcs/Event/ECG_CDR_Message_Sender.h"
00004 #include "tao/CDR.h"
00005 #include "ace/SOCK_Dgram.h"
00006 #include "ace/INET_Addr.h"
00007 #include "ace/ACE.h"
00008
00009 #if !defined(__ACE_INLINE__)
00010 #include "orbsvcs/Event/ECG_CDR_Message_Sender.inl"
00011 #endif
00012
00013 ACE_RCSID(Event, ECG_CDR_Message_Sender, "$Id: ECG_CDR_Message_Sender.cpp 77001 2007-02-12 07:54:49Z johnnyw $")
00014
00015 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00016
00017 void
00018 TAO_ECG_CDR_Message_Sender::init (
00019 TAO_ECG_Refcounted_Endpoint endpoint_rptr)
00020 {
00021 if (endpoint_rptr.get () == 0
00022 || endpoint_rptr->dgram ().get_handle () == ACE_INVALID_HANDLE)
00023 {
00024 ACE_ERROR ((LM_ERROR, "TAO_ECG_CDR_Message_Sender::init(): "
00025 "nil or unitialized endpoint argument.\n"));
00026 throw CORBA::INTERNAL ();
00027 }
00028
00029 this->endpoint_rptr_ = endpoint_rptr;
00030 }
00031
00032 void
00033 TAO_ECG_CDR_Message_Sender::send_message (const TAO_OutputCDR &cdr,
00034 const ACE_INET_Addr &addr)
00035 {
00036 if (this->endpoint_rptr_.get () == 0)
00037 {
00038 ACE_ERROR ((LM_ERROR, "Attempt to invoke send_message() "
00039 "on non-initialized sender object.\n"));
00040 throw CORBA::INTERNAL ();
00041 }
00042
00043 CORBA::ULong max_fragment_payload = this->mtu () -
00044 TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE;
00045
00046
00047 #if defined (ACE_HAS_BROKEN_DGRAM_SENDV)
00048 const int TAO_WRITEV_MAX = ACE_IOV_MAX - 1;
00049 #else
00050 const int TAO_WRITEV_MAX = ACE_IOV_MAX;
00051 #endif
00052 iovec iov[TAO_WRITEV_MAX];
00053
00054 CORBA::ULong total_length;
00055 CORBA::ULong fragment_count =
00056 this->compute_fragment_count (cdr.begin (),
00057 cdr.end (),
00058 TAO_WRITEV_MAX,
00059 max_fragment_payload,
00060 total_length);
00061
00062 CORBA::ULong request_id = this->endpoint_rptr_->next_request_id ();
00063
00064
00065 int iovcnt = 1;
00066 CORBA::ULong fragment_id = 0;
00067 CORBA::ULong fragment_offset = 0;
00068 CORBA::ULong fragment_size = 0;
00069 for (const ACE_Message_Block* b = cdr.begin ();
00070 b != cdr.end ();
00071 b = b->cont ())
00072 {
00073 CORBA::ULong l = b->length ();
00074
00075 char* rd_ptr = b->rd_ptr ();
00076
00077 iov[iovcnt].iov_base = rd_ptr;
00078 iov[iovcnt].iov_len = l;
00079 fragment_size += l;
00080 ++iovcnt;
00081 while (fragment_size > max_fragment_payload)
00082 {
00083
00084
00085
00086 CORBA::ULong last_mb_length =
00087 max_fragment_payload - (fragment_size - l);
00088 iov[iovcnt - 1].iov_len = last_mb_length;
00089
00090 this->send_fragment (addr,
00091 request_id,
00092 total_length,
00093 max_fragment_payload,
00094 fragment_offset,
00095 fragment_id,
00096 fragment_count,
00097 iov,
00098 iovcnt);
00099 ++fragment_id;
00100 fragment_offset += max_fragment_payload;
00101
00102
00103
00104 l -= last_mb_length;
00105 rd_ptr += last_mb_length;
00106 iov[1].iov_base = rd_ptr;
00107 iov[1].iov_len = l;
00108 fragment_size = l;
00109 iovcnt = 2;
00110 }
00111 if (fragment_size == max_fragment_payload)
00112 {
00113
00114
00115
00116 this->send_fragment (addr,
00117 request_id,
00118 total_length,
00119 max_fragment_payload,
00120 fragment_offset,
00121 fragment_id,
00122 fragment_count,
00123 iov,
00124 iovcnt);
00125 ++fragment_id;
00126 fragment_offset += max_fragment_payload;
00127
00128 iovcnt = 1;
00129 fragment_size = 0;
00130 }
00131 if (iovcnt == TAO_WRITEV_MAX)
00132 {
00133
00134
00135 this->send_fragment (addr,
00136 request_id,
00137 total_length,
00138 fragment_size,
00139 fragment_offset,
00140 fragment_id,
00141 fragment_count,
00142 iov,
00143 iovcnt);
00144 ++fragment_id;
00145 fragment_offset += fragment_size;
00146
00147 iovcnt = 1;
00148 fragment_size = 0;
00149 }
00150 }
00151
00152
00153 if (iovcnt != 1)
00154 {
00155
00156
00157 this->send_fragment (addr,
00158 request_id,
00159 total_length,
00160 fragment_size,
00161 fragment_offset,
00162 fragment_id,
00163 fragment_count,
00164 iov,
00165 iovcnt);
00166 ++fragment_id;
00167 fragment_offset += fragment_size;
00168
00169
00170
00171
00172 }
00173
00174
00175
00176 }
00177
00178
00179 void
00180 TAO_ECG_CDR_Message_Sender::send_fragment (const ACE_INET_Addr &addr,
00181 CORBA::ULong request_id,
00182 CORBA::ULong request_size,
00183 CORBA::ULong fragment_size,
00184 CORBA::ULong fragment_offset,
00185 CORBA::ULong fragment_id,
00186 CORBA::ULong fragment_count,
00187 iovec iov[],
00188 int iovcnt)
00189 {
00190 CORBA::ULong header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE
00191 / sizeof(CORBA::ULong)
00192 + ACE_CDR::MAX_ALIGNMENT];
00193 char* buf = reinterpret_cast<char*> (header);
00194 TAO_OutputCDR cdr (buf, sizeof(header));
00195 cdr.write_boolean (TAO_ENCAP_BYTE_ORDER);
00196
00197
00198 cdr.write_octet ('A'); cdr.write_octet ('B'); cdr.write_octet ('C');
00199 cdr.write_ulong (request_id);
00200 cdr.write_ulong (request_size);
00201 cdr.write_ulong (fragment_size);
00202 cdr.write_ulong (fragment_offset);
00203 cdr.write_ulong (fragment_id);
00204 cdr.write_ulong (fragment_count);
00205 CORBA::Octet padding[4];
00206
00207
00208
00209 if (checksum_)
00210 {
00211
00212 iov[0].iov_base = cdr.begin ()->rd_ptr ();
00213 iov[0].iov_len = cdr.begin ()->length ();
00214 unsigned int crc = 0;
00215 unsigned char *crc_parts = (unsigned char *)(&crc);
00216 if (iovcnt > 1)
00217 {
00218 crc = ACE::crc32 (iov, iovcnt);
00219 crc = ACE_HTONL (crc);
00220 }
00221 for (int cnt=0; cnt<4; ++cnt)
00222 {
00223 padding[cnt] = crc_parts[cnt];
00224 }
00225 }
00226 else
00227 {
00228 for (int cnt=0; cnt<4; ++cnt)
00229 {
00230 padding[cnt] = 0;
00231 }
00232 }
00233
00234 cdr.write_octet_array (padding, 4);
00235
00236 iov[0].iov_base = cdr.begin ()->rd_ptr ();
00237 iov[0].iov_len = cdr.begin ()->length ();
00238
00239 ssize_t n = this->dgram ().send (iov,
00240 iovcnt,
00241 addr);
00242 size_t expected_n = 0;
00243 for (int i = 0; i < iovcnt; ++i)
00244 expected_n += iov[i].iov_len;
00245 if (n > 0 && size_t(n) != expected_n)
00246 {
00247 ACE_ERROR ((LM_ERROR, ("Sent only %d out of %d bytes "
00248 "for mcast fragment.\n"),
00249 n,
00250 expected_n));
00251 }
00252
00253 if (n == -1)
00254 {
00255 if (errno == EWOULDBLOCK)
00256 {
00257 ACE_ERROR ((LM_ERROR, "Send of mcast fragment failed (%m).\n"));
00258
00259 throw CORBA::COMM_FAILURE ();
00260 }
00261 else
00262 {
00263 ACE_DEBUG ((LM_WARNING, "Send of mcast fragment blocked (%m).\n"));
00264 }
00265 }
00266 else if (n == 0)
00267 {
00268 ACE_DEBUG ((LM_WARNING, "EOF on send of mcast fragment (%m).\n"));
00269 }
00270 }
00271
00272
00273 CORBA::ULong
00274 TAO_ECG_CDR_Message_Sender::compute_fragment_count (const ACE_Message_Block* begin,
00275 const ACE_Message_Block* end,
00276 int iov_size,
00277 CORBA::ULong max_fragment_payload,
00278 CORBA::ULong& total_length)
00279 {
00280 CORBA::ULong fragment_count = 0;
00281 total_length = 0;
00282
00283 CORBA::ULong fragment_size = 0;
00284
00285 int iovcnt = 1;
00286 for (const ACE_Message_Block* b = begin;
00287 b != end;
00288 b = b->cont ())
00289 {
00290 CORBA::ULong l = b->length ();
00291 total_length += l;
00292 fragment_size += l;
00293 ++iovcnt;
00294 while (fragment_size > max_fragment_payload)
00295 {
00296
00297 ++fragment_count;
00298
00299
00300
00301 iovcnt = 2;
00302 l -= max_fragment_payload - (fragment_size - l);
00303 fragment_size = l;
00304 }
00305 if (fragment_size == max_fragment_payload)
00306 {
00307 ++fragment_count;
00308 iovcnt = 1;
00309 fragment_size = 0;
00310 }
00311 if (iovcnt >= iov_size)
00312 {
00313
00314 ++fragment_count;
00315 iovcnt = 1;
00316 fragment_size = 0;
00317 }
00318 }
00319 if (iovcnt != 1)
00320 {
00321
00322 ++fragment_count;
00323 }
00324 return fragment_count;
00325 }
00326
00327 TAO_END_VERSIONED_NAMESPACE_DECL