00001
00002
00003 #include "orbsvcs/Event/ECG_CDR_Message_Sender.h"
00004 #include "tao/CDR.h"
00005 #include "ace/SOCK_Dgram.h"
00006 #include "ace/INET_Addr.h"
00007 #include "ace/ACE.h"
00008
00009 #if !defined(__ACE_INLINE__)
00010 #include "orbsvcs/Event/ECG_CDR_Message_Sender.i"
00011 #endif
00012
00013 ACE_RCSID(Event, ECG_CDR_Message_Sender, "ECG_CDR_Message_Sender.cpp,v 1.10 2006/03/14 06:14:25 jtc Exp")
00014
00015 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00016
00017 void
00018 TAO_ECG_CDR_Message_Sender::init (
00019 TAO_ECG_Refcounted_Endpoint endpoint_rptr
00020 ACE_ENV_ARG_DECL)
00021 ACE_THROW_SPEC ((CORBA::SystemException))
00022 {
00023 if (endpoint_rptr.get () == 0
00024 || endpoint_rptr->dgram ().get_handle () == ACE_INVALID_HANDLE)
00025 {
00026 ACE_ERROR ((LM_ERROR, "TAO_ECG_CDR_Message_Sender::init(): "
00027 "nil or unitialized endpoint argument.\n"));
00028 ACE_THROW (CORBA::INTERNAL ());
00029 }
00030
00031 this->endpoint_rptr_ = endpoint_rptr;
00032 }
00033
00034 void
00035 TAO_ECG_CDR_Message_Sender::send_message (const TAO_OutputCDR &cdr,
00036 const ACE_INET_Addr &addr
00037 ACE_ENV_ARG_DECL)
00038 ACE_THROW_SPEC ((CORBA::SystemException))
00039 {
00040 if (this->endpoint_rptr_.get () == 0)
00041 {
00042 ACE_ERROR ((LM_ERROR, "Attempt to invoke send_message() "
00043 "on non-initialized sender object.\n"));
00044 ACE_THROW (CORBA::INTERNAL ());
00045 }
00046
00047 CORBA::ULong max_fragment_payload = this->mtu () -
00048 TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE;
00049
00050
00051 #if defined (ACE_HAS_BROKEN_DGRAM_SENDV)
00052 const int TAO_WRITEV_MAX = ACE_IOV_MAX - 1;
00053 #else
00054 const int TAO_WRITEV_MAX = ACE_IOV_MAX;
00055 #endif
00056 iovec iov[TAO_WRITEV_MAX];
00057
00058 CORBA::ULong total_length;
00059 CORBA::ULong fragment_count =
00060 this->compute_fragment_count (cdr.begin (),
00061 cdr.end (),
00062 TAO_WRITEV_MAX,
00063 max_fragment_payload,
00064 total_length);
00065
00066 CORBA::ULong request_id = this->endpoint_rptr_->next_request_id ();
00067
00068
00069 int iovcnt = 1;
00070 CORBA::ULong fragment_id = 0;
00071 CORBA::ULong fragment_offset = 0;
00072 CORBA::ULong fragment_size = 0;
00073 for (const ACE_Message_Block* b = cdr.begin ();
00074 b != cdr.end ();
00075 b = b->cont ())
00076 {
00077 CORBA::ULong l = b->length ();
00078
00079 char* rd_ptr = b->rd_ptr ();
00080
00081 iov[iovcnt].iov_base = rd_ptr;
00082 iov[iovcnt].iov_len = l;
00083 fragment_size += l;
00084 ++iovcnt;
00085 while (fragment_size > max_fragment_payload)
00086 {
00087
00088
00089
00090 CORBA::ULong last_mb_length =
00091 max_fragment_payload - (fragment_size - l);
00092 iov[iovcnt - 1].iov_len = last_mb_length;
00093
00094 this->send_fragment (addr,
00095 request_id,
00096 total_length,
00097 max_fragment_payload,
00098 fragment_offset,
00099 fragment_id,
00100 fragment_count,
00101 iov,
00102 iovcnt
00103 ACE_ENV_ARG_PARAMETER);
00104 ACE_CHECK;
00105 ++fragment_id;
00106 fragment_offset += max_fragment_payload;
00107
00108
00109
00110 l -= last_mb_length;
00111 rd_ptr += last_mb_length;
00112 iov[1].iov_base = rd_ptr;
00113 iov[1].iov_len = l;
00114 fragment_size = l;
00115 iovcnt = 2;
00116 }
00117 if (fragment_size == max_fragment_payload)
00118 {
00119
00120
00121
00122 this->send_fragment (addr,
00123 request_id,
00124 total_length,
00125 max_fragment_payload,
00126 fragment_offset,
00127 fragment_id,
00128 fragment_count,
00129 iov,
00130 iovcnt
00131 ACE_ENV_ARG_PARAMETER);
00132 ACE_CHECK;
00133 ++fragment_id;
00134 fragment_offset += max_fragment_payload;
00135
00136 iovcnt = 1;
00137 fragment_size = 0;
00138 }
00139 if (iovcnt == TAO_WRITEV_MAX)
00140 {
00141
00142
00143 this->send_fragment (addr,
00144 request_id,
00145 total_length,
00146 fragment_size,
00147 fragment_offset,
00148 fragment_id,
00149 fragment_count,
00150 iov,
00151 iovcnt
00152 ACE_ENV_ARG_PARAMETER);
00153 ACE_CHECK;
00154 ++fragment_id;
00155 fragment_offset += fragment_size;
00156
00157 iovcnt = 1;
00158 fragment_size = 0;
00159 }
00160 }
00161
00162
00163 if (iovcnt != 1)
00164 {
00165
00166
00167 this->send_fragment (addr,
00168 request_id,
00169 total_length,
00170 fragment_size,
00171 fragment_offset,
00172 fragment_id,
00173 fragment_count,
00174 iov,
00175 iovcnt
00176 ACE_ENV_ARG_PARAMETER);
00177 ACE_CHECK;
00178 ++fragment_id;
00179 fragment_offset += fragment_size;
00180
00181
00182
00183
00184 }
00185
00186
00187
00188 }
00189
00190
00191 void
00192 TAO_ECG_CDR_Message_Sender::send_fragment (const ACE_INET_Addr &addr,
00193 CORBA::ULong request_id,
00194 CORBA::ULong request_size,
00195 CORBA::ULong fragment_size,
00196 CORBA::ULong fragment_offset,
00197 CORBA::ULong fragment_id,
00198 CORBA::ULong fragment_count,
00199 iovec iov[],
00200 int iovcnt
00201 ACE_ENV_ARG_DECL)
00202 {
00203 CORBA::ULong header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE
00204 / sizeof(CORBA::ULong)
00205 + ACE_CDR::MAX_ALIGNMENT];
00206 char* buf = reinterpret_cast<char*> (header);
00207 TAO_OutputCDR cdr (buf, sizeof(header));
00208 cdr.write_boolean (TAO_ENCAP_BYTE_ORDER);
00209
00210
00211 cdr.write_octet ('A'); cdr.write_octet ('B'); cdr.write_octet ('C');
00212 cdr.write_ulong (request_id);
00213 cdr.write_ulong (request_size);
00214 cdr.write_ulong (fragment_size);
00215 cdr.write_ulong (fragment_offset);
00216 cdr.write_ulong (fragment_id);
00217 cdr.write_ulong (fragment_count);
00218 CORBA::Octet padding[4];
00219
00220
00221
00222 if (checksum_)
00223 {
00224
00225 iov[0].iov_base = cdr.begin ()->rd_ptr ();
00226 iov[0].iov_len = cdr.begin ()->length ();
00227 unsigned int crc = 0;
00228 unsigned char *crc_parts = (unsigned char *)(&crc);
00229 if (iovcnt > 1)
00230 {
00231 crc = ACE::crc32 (iov, iovcnt);
00232 crc = htonl (crc);
00233 }
00234 for (int cnt=0; cnt<4; ++cnt)
00235 {
00236 padding[cnt] = crc_parts[cnt];
00237 }
00238 }
00239 else
00240 {
00241 for (int cnt=0; cnt<4; ++cnt)
00242 {
00243 padding[cnt] = 0;
00244 }
00245 }
00246
00247 cdr.write_octet_array (padding, 4);
00248
00249 iov[0].iov_base = cdr.begin ()->rd_ptr ();
00250 iov[0].iov_len = cdr.begin ()->length ();
00251
00252 ssize_t n = this->dgram ().send (iov,
00253 iovcnt,
00254 addr);
00255 size_t expected_n = 0;
00256 for (int i = 0; i < iovcnt; ++i)
00257 expected_n += iov[i].iov_len;
00258 if (n > 0 && size_t(n) != expected_n)
00259 {
00260 ACE_DEBUG ((LM_ERROR, ("Sent only %d out of %d bytes "
00261 "for mcast fragment.\n"),
00262 n,
00263 expected_n));
00264 }
00265
00266 if (n == -1)
00267 {
00268 if (errno == EWOULDBLOCK)
00269 {
00270 ACE_DEBUG ((LM_ERROR, "Send of mcast fragment failed (%m).\n"));
00271
00272 ACE_THROW (CORBA::COMM_FAILURE ());
00273 }
00274 else
00275 {
00276 ACE_DEBUG ((LM_WARNING, "Send of mcast fragment blocked (%m).\n"));
00277 }
00278 }
00279 else if (n == 0)
00280 {
00281 ACE_DEBUG ((LM_WARNING, "EOF on send of mcast fragment (%m).\n"));
00282 }
00283 }
00284
00285
00286 CORBA::ULong
00287 TAO_ECG_CDR_Message_Sender::compute_fragment_count (const ACE_Message_Block* begin,
00288 const ACE_Message_Block* end,
00289 int iov_size,
00290 CORBA::ULong max_fragment_payload,
00291 CORBA::ULong& total_length)
00292 {
00293 CORBA::ULong fragment_count = 0;
00294 total_length = 0;
00295
00296 CORBA::ULong fragment_size = 0;
00297
00298 int iovcnt = 1;
00299 for (const ACE_Message_Block* b = begin;
00300 b != end;
00301 b = b->cont ())
00302 {
00303 CORBA::ULong l = b->length ();
00304 total_length += l;
00305 fragment_size += l;
00306 ++iovcnt;
00307 while (fragment_size > max_fragment_payload)
00308 {
00309
00310 ++fragment_count;
00311
00312
00313
00314 iovcnt = 2;
00315 l -= max_fragment_payload - (fragment_size - l);
00316 fragment_size = l;
00317 }
00318 if (fragment_size == max_fragment_payload)
00319 {
00320 ++fragment_count;
00321 iovcnt = 1;
00322 fragment_size = 0;
00323 }
00324 if (iovcnt >= iov_size)
00325 {
00326
00327 ++fragment_count;
00328 iovcnt = 1;
00329 fragment_size = 0;
00330 }
00331 }
00332 if (iovcnt != 1)
00333 {
00334
00335 ++fragment_count;
00336 }
00337 return fragment_count;
00338 }
00339
00340 TAO_END_VERSIONED_NAMESPACE_DECL