00001
00002
00003 #include "orbsvcs/Event/ECG_CDR_Message_Receiver.h"
00004 #include "orbsvcs/Event/ECG_CDR_Message_Sender.h"
00005
00006 #include "tao/Exception.h"
00007
00008 #include "ace/SOCK_Dgram.h"
00009 #include "ace/ACE.h"
00010 #include "ace/OS_NS_string.h"
00011
00012 #if !defined(__ACE_INLINE__)
00013 #include "orbsvcs/Event/ECG_CDR_Message_Receiver.inl"
00014 #endif
00015
00016 ACE_RCSID (Event,
00017 ECG_CDR_Message_Receiver,
00018 "$Id: ECG_CDR_Message_Receiver.cpp 76842 2007-02-01 23:15:16Z mitza $")
00019
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021
00022 TAO_ECG_CDR_Processor::~TAO_ECG_CDR_Processor (void)
00023 {
00024 }
00025
00026
00027 TAO_ECG_UDP_Request_Entry::~TAO_ECG_UDP_Request_Entry (void)
00028 {
00029 if (this->own_received_fragments_)
00030 {
00031 this->own_received_fragments_ = 0;
00032 delete[] this->received_fragments_;
00033 }
00034 }
00035
00036 TAO_ECG_UDP_Request_Entry::
00037 TAO_ECG_UDP_Request_Entry (CORBA::Boolean byte_order,
00038 CORBA::ULong request_id,
00039 CORBA::ULong request_size,
00040 CORBA::ULong fragment_count)
00041 : byte_order_ (byte_order)
00042 , request_id_ (request_id)
00043 , request_size_ (request_size)
00044 , fragment_count_ (fragment_count)
00045 {
00046 ACE_CDR::grow (&this->payload_, this->request_size_);
00047 this->payload_.wr_ptr (request_size_);
00048
00049 this->received_fragments_ = this->default_received_fragments_;
00050 this->own_received_fragments_ = 0;
00051 const int bits_per_ulong = sizeof(CORBA::ULong) * CHAR_BIT;
00052 this->received_fragments_size_ =
00053 this->fragment_count_ / bits_per_ulong + 1;
00054 if (this->received_fragments_size_ > ECG_DEFAULT_FRAGMENT_BUFSIZ)
00055 {
00056 ACE_NEW (this->received_fragments_,
00057 CORBA::ULong[this->received_fragments_size_]);
00058 this->own_received_fragments_ = 1;
00059 }
00060
00061 for (CORBA::ULong i = 0; i < this->received_fragments_size_; ++i)
00062 this->received_fragments_[i] = 0;
00063 CORBA::ULong idx = this->fragment_count_ / bits_per_ulong;
00064 CORBA::ULong bit = this->fragment_count_ % bits_per_ulong;
00065 this->received_fragments_[idx] = (0xFFFFFFFF << bit);
00066 }
00067
00068 int
00069 TAO_ECG_UDP_Request_Entry::validate_fragment (CORBA::Boolean byte_order,
00070 CORBA::ULong request_size,
00071 CORBA::ULong fragment_size,
00072 CORBA::ULong fragment_offset,
00073 CORBA::ULong ,
00074 CORBA::ULong fragment_count) const
00075 {
00076 if (byte_order != this->byte_order_
00077 || request_size != this->request_size_
00078 || fragment_count != this->fragment_count_)
00079 return 0;
00080
00081 if (fragment_offset >= request_size
00082 || fragment_offset + fragment_size > request_size)
00083 return 0;
00084
00085 return 1;
00086 }
00087
00088 int
00089 TAO_ECG_UDP_Request_Entry::test_received (CORBA::ULong fragment_id) const
00090 {
00091
00092 if (fragment_id > this->fragment_count_)
00093 return 1;
00094
00095 const int bits_per_ulong = sizeof(CORBA::ULong) * CHAR_BIT;
00096 CORBA::ULong idx = fragment_id / bits_per_ulong;
00097 CORBA::ULong bit = fragment_id % bits_per_ulong;
00098 return ACE_BIT_ENABLED (this->received_fragments_[idx], 1<<bit);
00099 }
00100
00101 void
00102 TAO_ECG_UDP_Request_Entry::mark_received (CORBA::ULong fragment_id)
00103 {
00104
00105 if (fragment_id > this->fragment_count_)
00106 return;
00107
00108 const int bits_per_ulong = sizeof(CORBA::ULong) * CHAR_BIT;
00109 CORBA::ULong idx = fragment_id / bits_per_ulong;
00110 CORBA::ULong bit = fragment_id % bits_per_ulong;
00111 ACE_SET_BITS (this->received_fragments_[idx], 1<<bit);
00112 }
00113
00114 int
00115 TAO_ECG_UDP_Request_Entry::complete (void) const
00116 {
00117 for (CORBA::ULong i = 0;
00118 i < this->received_fragments_size_;
00119 ++i)
00120 {
00121 if (this->received_fragments_[i] != 0xFFFFFFFF)
00122 return 0;
00123 }
00124 return 1;
00125 }
00126
00127 char*
00128 TAO_ECG_UDP_Request_Entry::fragment_buffer (CORBA::ULong fragment_offset)
00129 {
00130 return this->payload_.rd_ptr () + fragment_offset;
00131 }
00132
00133
00134 int
00135 TAO_ECG_CDR_Message_Receiver::Requests::init (size_t size,
00136 size_t min_purge_count)
00137 {
00138
00139 if (this->fragmented_requests_)
00140 return -1;
00141
00142 ACE_NEW_RETURN (this->fragmented_requests_,
00143 TAO_ECG_UDP_Request_Entry*[size],
00144 -1);
00145
00146 this->size_ = size;
00147 this->id_range_low_ = 0;
00148 this->id_range_high_ = size - 1;
00149 this->min_purge_count_ = min_purge_count;
00150
00151 for (size_t i = 0; i < size; ++i)
00152 {
00153 this->fragmented_requests_[i] = 0;
00154 }
00155
00156 return 0;
00157 }
00158
00159 TAO_ECG_CDR_Message_Receiver::Requests::~Requests (void)
00160 {
00161 for (size_t i = 0; i < this->size_; ++i)
00162 {
00163 TAO_ECG_UDP_Request_Entry* request =
00164 this->fragmented_requests_[i];
00165
00166 if (request != &TAO_ECG_CDR_Message_Receiver::Request_Completed_)
00167 delete request;
00168 }
00169
00170 delete [] this->fragmented_requests_;
00171
00172 this->fragmented_requests_ = 0;
00173 this->size_ = 0;
00174 this->id_range_low_ = 0;
00175 this->id_range_high_ = 0;
00176 }
00177
00178 TAO_ECG_UDP_Request_Entry **
00179 TAO_ECG_CDR_Message_Receiver::Requests::get_request (CORBA::ULong request_id)
00180 {
00181 if (request_id < this->id_range_low_)
00182
00183 {
00184 return 0;
00185 }
00186
00187 if (request_id > this->id_range_high_)
00188
00189
00190 {
00191 CORBA::ULong new_slots_needed = request_id - this->id_range_high_;
00192
00193 if (new_slots_needed < this->min_purge_count_)
00194 new_slots_needed = this->min_purge_count_;
00195
00196 if (new_slots_needed > this->size_)
00197
00198 {
00199 this->purge_requests (this->id_range_low_, this->id_range_high_);
00200 this->id_range_high_ = request_id;
00201 this->id_range_low_ = request_id - this->size_ + 1;
00202 }
00203 else
00204 {
00205 this->purge_requests (this->id_range_low_,
00206 this->id_range_low_ + new_slots_needed - 1);
00207 this->id_range_high_ += new_slots_needed;
00208 this->id_range_low_ += new_slots_needed;
00209 }
00210 }
00211
00212
00213 int index = request_id % this->size_;
00214 return this->fragmented_requests_ + index;
00215 }
00216
00217
00218 void
00219 TAO_ECG_CDR_Message_Receiver::Requests::purge_requests (
00220 CORBA::ULong purge_first,
00221 CORBA::ULong purge_last)
00222 {
00223 for (CORBA::ULong i = purge_first; i <= purge_last; ++i)
00224 {
00225 size_t index = i % this->size_;
00226 if (this->fragmented_requests_[index]
00227 != &TAO_ECG_CDR_Message_Receiver::Request_Completed_)
00228 {
00229 delete this->fragmented_requests_[index];
00230 }
00231 this->fragmented_requests_[index] = 0;
00232 }
00233 }
00234
00235
00236
00237 TAO_ECG_UDP_Request_Entry
00238 TAO_ECG_CDR_Message_Receiver::Request_Completed_ (0, 0, 0, 0);
00239
00240 int
00241 TAO_ECG_CDR_Message_Receiver::handle_input (
00242 ACE_SOCK_Dgram& dgram,
00243 TAO_ECG_CDR_Processor *cdr_processor)
00244 {
00245 char nonaligned_header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE
00246 + ACE_CDR::MAX_ALIGNMENT];
00247 char *header_buf = ACE_ptr_align_binary (nonaligned_header,
00248 ACE_CDR::MAX_ALIGNMENT);
00249 char nonaligned_data[ACE_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT];
00250 char *data_buf = ACE_ptr_align_binary (nonaligned_data,
00251 ACE_CDR::MAX_ALIGNMENT);
00252
00253
00254
00255 const int iovcnt = 2;
00256 iovec iov[iovcnt];
00257 iov[0].iov_base = header_buf;
00258 iov[0].iov_len = TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE;
00259 iov[1].iov_base = data_buf;
00260 iov[1].iov_len = ACE_MAX_DGRAM_SIZE;
00261
00262 ACE_INET_Addr from;
00263 ssize_t n = dgram.recv (iov, iovcnt, from);
00264
00265 if (n == -1)
00266 {
00267 if (errno == EWOULDBLOCK)
00268 return 0;
00269
00270 ACE_ERROR_RETURN ((LM_ERROR, "Error reading mcast fragment (%m).\n"),
00271 -1);
00272 }
00273
00274 if (n == 0)
00275 {
00276 ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: "
00277 "read 0 bytes from socket.\n"),
00278 0);
00279 }
00280
00281 if (n < TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE)
00282 {
00283 ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: "
00284 "# of bytes read < mcast header size.\n"),
00285 -1);
00286 }
00287
00288 u_int crc = 0;
00289
00290 if (this->check_crc_)
00291 {
00292 iov[1].iov_len = n - iov[0].iov_len;
00293 iov[0].iov_len -= 4;
00294
00295 crc = ACE::crc32 (iov, 2);
00296 }
00297
00298 if (this->ignore_from_.get () != 0
00299 && this->ignore_from_->is_loopback (from))
00300 {
00301 return 0;
00302 }
00303
00304
00305 Mcast_Header header;
00306 if (header.read (header_buf, n, this->check_crc_) == -1)
00307 return -1;
00308
00309 if ( this->check_crc_ && header.crc != crc)
00310 {
00311 static unsigned int err_count = 0;
00312 ACE_ERROR ((LM_ERROR,
00313 "******************************\n"));
00314
00315 ACE_ERROR ((LM_ERROR,
00316 "ERROR DETECTED \n"));
00317
00318 if (crc == 0)
00319 {
00320 ACE_ERROR ((LM_ERROR,
00321 "Sending process may not have computed CRC \n"));
00322 }
00323 else
00324 {
00325 ACE_ERROR ((LM_ERROR,
00326 " NETWORK CRC CHECKSUM FAILED\n"));
00327 }
00328
00329 ACE_ERROR ((LM_ERROR,
00330 "Message was received from [%s:%s:%d] \n",
00331 from.get_host_name (),
00332 from.get_host_addr (),
00333 from.get_port_number()));
00334
00335 ACE_ERROR ((LM_ERROR,
00336 "Num errors = %d \n",
00337 ++err_count));
00338 ACE_ERROR ((LM_ERROR,
00339 "This is a bad thing. Attempting to ignore ..\n"));
00340
00341 return 0;
00342 }
00343
00344
00345 if (header.fragment_count == 1)
00346 {
00347
00348
00349 int const result = this->mark_received (from, header.request_id);
00350 if (result != 1)
00351 return result;
00352
00353 TAO_InputCDR cdr (data_buf, header.request_size, header.byte_order);
00354 if (cdr_processor->decode (cdr) == -1)
00355 return -1;
00356 else
00357 return 1;
00358 }
00359
00360 return this->process_fragment (from, header, data_buf, cdr_processor);
00361 }
00362
00363 int
00364 TAO_ECG_CDR_Message_Receiver::mark_received (const ACE_INET_Addr &from,
00365 CORBA::ULong request_id)
00366 {
00367
00368
00369 Request_Map::ENTRY * entry = this->get_source_entry (from);
00370 if (!entry)
00371 return -1;
00372
00373 TAO_ECG_UDP_Request_Entry ** request =
00374 entry->int_id_->get_request (request_id);
00375
00376 if (request == 0)
00377 {
00378 ACE_DEBUG ((LM_WARNING, "Received mcast request with sequence"
00379 "below currently expected range.\n"));
00380 return 0;
00381 }
00382 if (*request == &Request_Completed_)
00383 {
00384 ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment. "
00385 "(Request already complete).\n"));
00386 return 0;
00387 }
00388 if (*request != 0)
00389 {
00390 ACE_ERROR_RETURN ((LM_ERROR, "Inconsistent fragments for "
00391 "mcast request.\n"),
00392 -1);
00393 }
00394
00395 *request = &Request_Completed_;
00396 return 1;
00397 }
00398
00399 int
00400 TAO_ECG_CDR_Message_Receiver::process_fragment (
00401 const ACE_INET_Addr &from,
00402 const Mcast_Header &header,
00403 char * data_buf,
00404 TAO_ECG_CDR_Processor *cdr_processor)
00405 {
00406
00407
00408 Request_Map::ENTRY * source_entry = this->get_source_entry (from);
00409 if (!source_entry)
00410 return -1;
00411
00412 TAO_ECG_UDP_Request_Entry ** request =
00413 source_entry->int_id_->get_request (header.request_id);
00414
00415 if (request == 0)
00416 {
00417 ACE_DEBUG ((LM_WARNING, "Received mcast request with sequence "
00418 "below currently expected range.\n"));
00419 return 0;
00420 }
00421 if (*request == &Request_Completed_)
00422 {
00423 ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment. "
00424 "(Request already complete).\n"));
00425 return 0;
00426 }
00427 if (*request == 0)
00428
00429 {
00430 ACE_NEW_RETURN (*request,
00431 TAO_ECG_UDP_Request_Entry (header.byte_order,
00432 header.request_id,
00433 header.request_size,
00434 header.fragment_count),
00435 -1);
00436 }
00437
00438
00439 if ((*request)->validate_fragment (header.byte_order,
00440 header.request_size,
00441 header.fragment_size,
00442 header.fragment_offset,
00443 header.fragment_id,
00444 header.fragment_count) == 0)
00445 {
00446 ACE_ERROR_RETURN ((LM_ERROR,
00447 "Received invalid mcast fragment.\n"),
00448 -1);
00449 }
00450
00451
00452 if ((*request)->test_received (header.fragment_id) == 1)
00453 {
00454 ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment.\n"));
00455 return 0;
00456 }
00457
00458
00459 (*request)->mark_received (header.fragment_id);
00460 ACE_OS::memcpy ((*request)->fragment_buffer (header.fragment_offset),
00461 data_buf,
00462 header.fragment_size);
00463
00464
00465 if (!(*request)->complete ())
00466 {
00467 return 0;
00468 }
00469
00470
00471 TAO_InputCDR cdr ((*request)->fragment_buffer (0),
00472 header.request_size,
00473 header.byte_order);
00474
00475 if (cdr_processor->decode (cdr) == -1)
00476 return -1;
00477
00478 delete *request;
00479 *request = &Request_Completed_;
00480 return 1;
00481 }
00482
00483 TAO_ECG_CDR_Message_Receiver::Request_Map::ENTRY*
00484 TAO_ECG_CDR_Message_Receiver::get_source_entry (const ACE_INET_Addr &from)
00485 {
00486
00487 Request_Map::ENTRY * entry = 0;
00488
00489 if (this->request_map_.find (from, entry) == -1)
00490 {
00491
00492 Requests *requests = 0;
00493 ACE_NEW_RETURN (requests,
00494 Requests,
00495 0);
00496 auto_ptr<Requests> requests_aptr (requests);
00497
00498 if (requests->init (this->max_requests_, this->min_purge_count_) == -1
00499 || this->request_map_.bind (from, requests, entry) == -1)
00500 {
00501 ACE_ERROR_RETURN ((LM_ERROR, "Unable to create hash map "
00502 "entry for a new request.\n"),
00503 0);
00504 }
00505 requests_aptr.release ();
00506 }
00507
00508 return entry;
00509 }
00510
00511 void
00512 TAO_ECG_CDR_Message_Receiver::shutdown (void)
00513 {
00514
00515
00516 Request_Map::iterator end = this->request_map_.end ();
00517 for (Request_Map::iterator i = this->request_map_.begin ();
00518 i != end;
00519 ++i)
00520 {
00521 delete (*i).int_id_;
00522 (*i).int_id_ = 0;
00523 }
00524
00525 this->ignore_from_.reset ();
00526 }
00527
00528
00529 int
00530 TAO_ECG_CDR_Message_Receiver::Mcast_Header::read (char *header,
00531 size_t bytes_received,
00532 CORBA::Boolean checkcrc)
00533 {
00534
00535 this->byte_order = header[0];
00536 if(this->byte_order != 0 && this->byte_order != 1)
00537 {
00538 ACE_ERROR_RETURN ((LM_ERROR, "Reading mcast packet header: byte "
00539 "order is neither 0 nor 1, it is %d.\n",
00540 this->byte_order),
00541 -1);
00542 }
00543
00544 TAO_InputCDR header_cdr (header,
00545 TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE,
00546 byte_order);
00547 CORBA::Boolean unused;
00548 CORBA::Octet a, b, c;
00549 if (!header_cdr.read_boolean (unused)
00550 || !header_cdr.read_octet (a)
00551 || !header_cdr.read_octet (b)
00552 || !header_cdr.read_octet (c)
00553 || a != 'A' || b != 'B' || c != 'C')
00554 {
00555 ACE_ERROR_RETURN ((LM_ERROR, "Error reading magic bytes "
00556 "in mcast packet header.\n"),
00557 -1);
00558 }
00559
00560 if (!header_cdr.read_ulong (this->request_id)
00561 || !header_cdr.read_ulong (this->request_size)
00562 || !header_cdr.read_ulong (this->fragment_size)
00563 || !header_cdr.read_ulong (this->fragment_offset)
00564 || !header_cdr.read_ulong (this->fragment_id)
00565 || !header_cdr.read_ulong (this->fragment_count))
00566 {
00567 ACE_ERROR_RETURN ((LM_ERROR,
00568 "Error decoding mcast packet header.\n"),
00569 -1);
00570 }
00571
00572 if (checkcrc)
00573 {
00574 CORBA::Octet padding[4];
00575 header_cdr.read_octet_array (padding, 4);
00576
00577 unsigned char *crcparts = (unsigned char *)(&this->crc);
00578
00579 for (int cnt=0; cnt != 4; ++cnt)
00580 {
00581 crcparts[cnt] = padding[cnt];
00582 }
00583
00584 this->crc = ACE_NTOHL (this->crc);
00585 }
00586
00587
00588 size_t const data_bytes_received =
00589 bytes_received - TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE;
00590
00591 if (this->request_size < this->fragment_size
00592 || this->fragment_offset >= this->request_size
00593 || this->fragment_id >= this->fragment_count
00594 || (this->fragment_count == 1
00595 && (this->fragment_size != this->request_size
00596 || this->request_size != data_bytes_received)))
00597 {
00598 ACE_ERROR_RETURN ((LM_ERROR,
00599 "Invalid mcast fragment: "
00600 "inconsistent header fields.\n"),
00601 -1);
00602 }
00603
00604 return 0;
00605 }
00606
00607 TAO_END_VERSIONED_NAMESPACE_DECL