00001
00002
00003 #include "orbsvcs/Event/ECG_CDR_Message_Receiver.h"
00004 #include "orbsvcs/Event/ECG_CDR_Message_Sender.h"
00005
00006 #include "tao/Exception.h"
00007
00008 #include "ace/SOCK_Dgram.h"
00009 #include "ace/ACE.h"
00010 #include "ace/OS_NS_string.h"
00011
00012 #if !defined(__ACE_INLINE__)
00013 #include "orbsvcs/Event/ECG_CDR_Message_Receiver.i"
00014 #endif
00015
00016 ACE_RCSID (Event,
00017 ECG_CDR_Message_Receiver,
00018 "ECG_CDR_Message_Receiver.cpp,v 1.14 2006/03/14 06:14:25 jtc Exp")
00019
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021
00022 TAO_ECG_CDR_Processor::~TAO_ECG_CDR_Processor (void)
00023 {
00024 }
00025
00026
00027 TAO_ECG_UDP_Request_Entry::~TAO_ECG_UDP_Request_Entry (void)
00028 {
00029 if (this->own_received_fragments_)
00030 {
00031 this->own_received_fragments_ = 0;
00032 delete[] this->received_fragments_;
00033 }
00034 }
00035
00036 TAO_ECG_UDP_Request_Entry::
00037 TAO_ECG_UDP_Request_Entry (CORBA::Boolean byte_order,
00038 CORBA::ULong request_id,
00039 CORBA::ULong request_size,
00040 CORBA::ULong fragment_count)
00041 : byte_order_ (byte_order)
00042 , request_id_ (request_id)
00043 , request_size_ (request_size)
00044 , fragment_count_ (fragment_count)
00045 {
00046 ACE_CDR::grow (&this->payload_, this->request_size_);
00047 this->payload_.wr_ptr (request_size_);
00048
00049 this->received_fragments_ = this->default_received_fragments_;
00050 this->own_received_fragments_ = 0;
00051 const int bits_per_ulong = sizeof(CORBA::ULong) * CHAR_BIT;
00052 this->received_fragments_size_ =
00053 this->fragment_count_ / bits_per_ulong + 1;
00054 if (this->received_fragments_size_ > ECG_DEFAULT_FRAGMENT_BUFSIZ)
00055 {
00056 ACE_NEW (this->received_fragments_,
00057 CORBA::ULong[this->received_fragments_size_]);
00058 this->own_received_fragments_ = 1;
00059 }
00060
00061 for (CORBA::ULong i = 0; i < this->received_fragments_size_; ++i)
00062 this->received_fragments_[i] = 0;
00063 CORBA::ULong idx = this->fragment_count_ / bits_per_ulong;
00064 CORBA::ULong bit = this->fragment_count_ % bits_per_ulong;
00065 this->received_fragments_[idx] = (0xFFFFFFFF << bit);
00066 }
00067
00068 int
00069 TAO_ECG_UDP_Request_Entry::validate_fragment (CORBA::Boolean byte_order,
00070 CORBA::ULong request_size,
00071 CORBA::ULong fragment_size,
00072 CORBA::ULong fragment_offset,
00073 CORBA::ULong ,
00074 CORBA::ULong fragment_count) const
00075 {
00076 if (byte_order != this->byte_order_
00077 || request_size != this->request_size_
00078 || fragment_count != this->fragment_count_)
00079 return 0;
00080
00081 if (fragment_offset >= request_size
00082 || fragment_offset + fragment_size > request_size)
00083 return 0;
00084
00085 return 1;
00086 }
00087
00088 int
00089 TAO_ECG_UDP_Request_Entry::test_received (CORBA::ULong fragment_id) const
00090 {
00091
00092 if (fragment_id > this->fragment_count_)
00093 return 1;
00094
00095 const int bits_per_ulong = sizeof(CORBA::ULong) * CHAR_BIT;
00096 CORBA::ULong idx = fragment_id / bits_per_ulong;
00097 CORBA::ULong bit = fragment_id % bits_per_ulong;
00098 return ACE_BIT_ENABLED (this->received_fragments_[idx], 1<<bit);
00099 }
00100
00101 void
00102 TAO_ECG_UDP_Request_Entry::mark_received (CORBA::ULong fragment_id)
00103 {
00104
00105 if (fragment_id > this->fragment_count_)
00106 return;
00107
00108 const int bits_per_ulong = sizeof(CORBA::ULong) * CHAR_BIT;
00109 CORBA::ULong idx = fragment_id / bits_per_ulong;
00110 CORBA::ULong bit = fragment_id % bits_per_ulong;
00111 ACE_SET_BITS (this->received_fragments_[idx], 1<<bit);
00112 }
00113
00114 int
00115 TAO_ECG_UDP_Request_Entry::complete (void) const
00116 {
00117 for (CORBA::ULong i = 0;
00118 i < this->received_fragments_size_;
00119 ++i)
00120 {
00121 if (this->received_fragments_[i] != 0xFFFFFFFF)
00122 return 0;
00123 }
00124 return 1;
00125 }
00126
00127 char*
00128 TAO_ECG_UDP_Request_Entry::fragment_buffer (CORBA::ULong fragment_offset)
00129 {
00130 return this->payload_.rd_ptr () + fragment_offset;
00131 }
00132
00133
00134 int
00135 TAO_ECG_CDR_Message_Receiver::Requests::init (size_t size,
00136 size_t min_purge_count)
00137 {
00138
00139 if (this->fragmented_requests_)
00140 return -1;
00141
00142 ACE_NEW_RETURN (this->fragmented_requests_,
00143 TAO_ECG_UDP_Request_Entry*[size],
00144 -1);
00145
00146 this->size_ = size;
00147 this->id_range_low_ = 0;
00148 this->id_range_high_ = size - 1;
00149 this->min_purge_count_ = min_purge_count;
00150
00151 for (size_t i = 0; i < size; ++i)
00152 {
00153 this->fragmented_requests_[i] = 0;
00154 }
00155
00156 return 0;
00157 }
00158
00159 TAO_ECG_CDR_Message_Receiver::Requests::~Requests (void)
00160 {
00161 for (size_t i = 0; i < this->size_; ++i)
00162 {
00163 TAO_ECG_UDP_Request_Entry* request =
00164 this->fragmented_requests_[i];
00165
00166 if (request != &TAO_ECG_CDR_Message_Receiver::Request_Completed_)
00167 delete request;
00168 }
00169
00170 delete [] this->fragmented_requests_;
00171
00172 this->fragmented_requests_ = 0;
00173 this->size_ = 0;
00174 this->id_range_low_ = 0;
00175 this->id_range_high_ = 0;
00176 }
00177
00178 TAO_ECG_UDP_Request_Entry **
00179 TAO_ECG_CDR_Message_Receiver::Requests::get_request (CORBA::ULong request_id)
00180 {
00181 if (request_id < this->id_range_low_)
00182
00183 {
00184 return 0;
00185 }
00186
00187 if (request_id > this->id_range_high_)
00188
00189
00190 {
00191 CORBA::ULong new_slots_needed = request_id - this->id_range_high_;
00192
00193 if (new_slots_needed < this->min_purge_count_)
00194 new_slots_needed = this->min_purge_count_;
00195
00196 if (new_slots_needed > this->size_)
00197
00198 {
00199 this->purge_requests (this->id_range_low_, this->id_range_high_);
00200 this->id_range_high_ = request_id;
00201 this->id_range_low_ = request_id - this->size_ + 1;
00202 }
00203 else
00204 {
00205 this->purge_requests (this->id_range_low_,
00206 this->id_range_low_ + new_slots_needed - 1);
00207 this->id_range_high_ += new_slots_needed;
00208 this->id_range_low_ += new_slots_needed;
00209 }
00210 }
00211
00212
00213 int index = request_id % this->size_;
00214 return this->fragmented_requests_ + index;
00215 }
00216
00217
00218 void
00219 TAO_ECG_CDR_Message_Receiver::Requests::purge_requests (
00220 CORBA::ULong purge_first,
00221 CORBA::ULong purge_last)
00222 {
00223 for (CORBA::ULong i = purge_first; i <= purge_last; ++i)
00224 {
00225 size_t index = i % this->size_;
00226 if (this->fragmented_requests_[index]
00227 != &TAO_ECG_CDR_Message_Receiver::Request_Completed_)
00228 {
00229 delete this->fragmented_requests_[index];
00230 }
00231 this->fragmented_requests_[index] = 0;
00232 }
00233 }
00234
00235
00236
00237 TAO_ECG_UDP_Request_Entry
00238 TAO_ECG_CDR_Message_Receiver::Request_Completed_ (0, 0, 0, 0);
00239
00240 int
00241 TAO_ECG_CDR_Message_Receiver::handle_input (
00242 ACE_SOCK_Dgram& dgram,
00243 TAO_ECG_CDR_Processor *cdr_processor)
00244 {
00245 char nonaligned_header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE
00246 + ACE_CDR::MAX_ALIGNMENT];
00247 char *header_buf = ACE_ptr_align_binary (nonaligned_header,
00248 ACE_CDR::MAX_ALIGNMENT);
00249
00250 char nonaligned_data[ACE_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT];
00251 char *data_buf = ACE_ptr_align_binary (nonaligned_data,
00252 ACE_CDR::MAX_ALIGNMENT);
00253
00254
00255
00256 const int iovcnt = 2;
00257 iovec iov[iovcnt];
00258 iov[0].iov_base = header_buf;
00259 iov[0].iov_len = TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE;
00260 iov[1].iov_base = data_buf;
00261 iov[1].iov_len = ACE_MAX_DGRAM_SIZE;
00262
00263 ACE_INET_Addr from;
00264 ssize_t n = dgram.recv (iov, iovcnt, from);
00265
00266 if (n == -1)
00267 {
00268 if (errno == EWOULDBLOCK)
00269 return 0;
00270
00271 ACE_ERROR_RETURN ((LM_ERROR, "Error reading mcast fragment (%m).\n"),
00272 -1);
00273 }
00274
00275 if (n == 0)
00276 {
00277 ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: "
00278 "read 0 bytes from socket.\n"),
00279 0);
00280 }
00281
00282 if (n < TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE)
00283 {
00284 ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: "
00285 "# of bytes read < mcast header size.\n"),
00286 -1);
00287 }
00288
00289 u_int crc = 0;
00290
00291 if (this->check_crc_)
00292 {
00293 iov[1].iov_len = n - iov[0].iov_len;
00294 iov[0].iov_len -= 4;
00295
00296 crc = ACE::crc32 (iov, 2);
00297 }
00298
00299 if (this->ignore_from_.get () != 0
00300 && this->ignore_from_->is_loopback (from))
00301 {
00302 return 0;
00303 }
00304
00305
00306 Mcast_Header header;
00307 if (header.read (header_buf, n, this->check_crc_) == -1)
00308 return -1;
00309
00310 if ( this->check_crc_ && header.crc != crc)
00311 {
00312 static unsigned int err_count = 0;
00313 ACE_ERROR ((LM_DEBUG,
00314 "******************************\n"));
00315
00316 ACE_ERROR ((LM_DEBUG,
00317 "ERROR DETECTED \n"));
00318
00319 if (crc == 0)
00320 {
00321 ACE_ERROR ((LM_DEBUG,
00322 "Sending process may not have computed CRC \n"));
00323 }
00324 else
00325 {
00326 ACE_ERROR ((LM_DEBUG,
00327 " NETWORK CRC CHECKSUM FAILED\n"));
00328 }
00329
00330 ACE_ERROR ((LM_ERROR,
00331 "Message was received from [%s:%s:%d] \n",
00332 from.get_host_name (),
00333 from.get_host_addr (),
00334 from.get_port_number()));
00335
00336 ACE_ERROR ((LM_ERROR,
00337 "Num errors = %d \n",
00338 ++err_count));
00339 ACE_ERROR ((LM_ERROR,
00340 "This is a bad thing. Attempting to ignore ..\n"));
00341
00342 return 0;
00343 }
00344
00345
00346 if (header.fragment_count == 1)
00347 {
00348
00349
00350 int result = this->mark_received (from, header.request_id);
00351 if (result != 1)
00352 return result;
00353
00354 TAO_InputCDR cdr (data_buf, header.request_size, header.byte_order);
00355 if (cdr_processor->decode (cdr) == -1)
00356 return -1;
00357 else
00358 return 1;
00359 }
00360
00361 return this->process_fragment (from, header, data_buf, cdr_processor);
00362 }
00363
00364 int
00365 TAO_ECG_CDR_Message_Receiver::mark_received (const ACE_INET_Addr &from,
00366 CORBA::ULong request_id)
00367 {
00368
00369
00370 Request_Map::ENTRY * entry = this->get_source_entry (from);
00371 if (!entry)
00372 return -1;
00373
00374 TAO_ECG_UDP_Request_Entry ** request =
00375 entry->int_id_->get_request (request_id);
00376
00377 if (request == 0)
00378 {
00379 ACE_DEBUG ((LM_WARNING, "Received mcast request with sequence"
00380 "below currently expected range.\n"));
00381 return 0;
00382 }
00383 if (*request == &Request_Completed_)
00384 {
00385 ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment. "
00386 "(Request already complete).\n"));
00387 return 0;
00388 }
00389 if (*request != 0)
00390 {
00391 ACE_ERROR_RETURN ((LM_ERROR, "Inconsistent fragments for "
00392 "mcast request.\n"),
00393 -1);
00394 }
00395
00396 *request = &Request_Completed_;
00397 return 1;
00398 }
00399
00400 int
00401 TAO_ECG_CDR_Message_Receiver::process_fragment (
00402 const ACE_INET_Addr &from,
00403 const Mcast_Header &header,
00404 char * data_buf,
00405 TAO_ECG_CDR_Processor *cdr_processor)
00406 {
00407
00408
00409 Request_Map::ENTRY * source_entry = this->get_source_entry (from);
00410 if (!source_entry)
00411 return -1;
00412
00413 TAO_ECG_UDP_Request_Entry ** request =
00414 source_entry->int_id_->get_request (header.request_id);
00415
00416 if (request == 0)
00417 {
00418 ACE_DEBUG ((LM_WARNING, "Received mcast request with sequence "
00419 "below currently expected range.\n"));
00420 return 0;
00421 }
00422 if (*request == &Request_Completed_)
00423 {
00424 ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment. "
00425 "(Request already complete).\n"));
00426 return 0;
00427 }
00428 if (*request == 0)
00429
00430 {
00431 ACE_NEW_RETURN (*request,
00432 TAO_ECG_UDP_Request_Entry (header.byte_order,
00433 header.request_id,
00434 header.request_size,
00435 header.fragment_count),
00436 -1);
00437 }
00438
00439
00440 if ((*request)->validate_fragment (header.byte_order,
00441 header.request_size,
00442 header.fragment_size,
00443 header.fragment_offset,
00444 header.fragment_id,
00445 header.fragment_count) == 0)
00446 {
00447 ACE_ERROR_RETURN ((LM_ERROR,
00448 "Received invalid mcast fragment.\n"),
00449 -1);
00450 }
00451
00452
00453 if ((*request)->test_received (header.fragment_id) == 1)
00454 {
00455 ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment.\n"));
00456 return 0;
00457 }
00458
00459
00460 (*request)->mark_received (header.fragment_id);
00461 ACE_OS::memcpy ((*request)->fragment_buffer (header.fragment_offset),
00462 data_buf,
00463 header.fragment_size);
00464
00465
00466 if (!(*request)->complete ())
00467 {
00468 return 0;
00469 }
00470
00471
00472 TAO_InputCDR cdr ((*request)->fragment_buffer (0),
00473 header.request_size,
00474 header.byte_order);
00475
00476 if (cdr_processor->decode (cdr) == -1)
00477 return -1;
00478
00479 delete *request;
00480 *request = &Request_Completed_;
00481 return 1;
00482 }
00483
00484 TAO_ECG_CDR_Message_Receiver::Request_Map::ENTRY*
00485 TAO_ECG_CDR_Message_Receiver::get_source_entry (const ACE_INET_Addr &from)
00486 {
00487
00488 Request_Map::ENTRY * entry;
00489
00490 if (this->request_map_.find (from, entry) == -1)
00491 {
00492
00493 Requests *requests = 0;
00494 ACE_NEW_RETURN (requests,
00495 Requests,
00496 0);
00497 auto_ptr<Requests> requests_aptr (requests);
00498
00499 if (requests->init (this->max_requests_, this->min_purge_count_) == -1
00500 || this->request_map_.bind (from, requests, entry) == -1)
00501 {
00502 ACE_ERROR_RETURN ((LM_ERROR, "Unable to create hash map "
00503 "entry for a new request.\n"),
00504 0);
00505 }
00506 requests_aptr.release ();
00507 }
00508
00509 return entry;
00510 }
00511
00512 void
00513 TAO_ECG_CDR_Message_Receiver::shutdown (void)
00514 {
00515
00516
00517 Request_Map::iterator end = this->request_map_.end ();
00518 for (Request_Map::iterator i = this->request_map_.begin ();
00519 i != end;
00520 ++i)
00521 {
00522 delete (*i).int_id_;
00523 (*i).int_id_ = 0;
00524 }
00525
00526 this->ignore_from_.reset ();
00527 }
00528
00529
00530 int
00531 TAO_ECG_CDR_Message_Receiver::Mcast_Header::read (char *header,
00532 size_t bytes_received,
00533 CORBA::Boolean checkcrc)
00534 {
00535
00536 this->byte_order = header[0];
00537 if(this->byte_order != 0 && this->byte_order != 1)
00538 {
00539 ACE_ERROR_RETURN ((LM_ERROR, "Reading mcast packet header: byte "
00540 "order is neither 0 nor 1, it is %d.\n",
00541 this->byte_order),
00542 -1);
00543 }
00544
00545 TAO_InputCDR header_cdr (header,
00546 TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE,
00547 byte_order);
00548 CORBA::Boolean unused;
00549 CORBA::Octet a, b, c;
00550 if (!header_cdr.read_boolean (unused)
00551 || !header_cdr.read_octet (a)
00552 || !header_cdr.read_octet (b)
00553 || !header_cdr.read_octet (c)
00554 || a != 'A' || b != 'B' || c != 'C')
00555 {
00556 ACE_ERROR_RETURN ((LM_ERROR, "Error reading magic bytes "
00557 "in mcast packet header.\n"),
00558 -1);
00559 }
00560
00561 if (!header_cdr.read_ulong (this->request_id)
00562 || !header_cdr.read_ulong (this->request_size)
00563 || !header_cdr.read_ulong (this->fragment_size)
00564 || !header_cdr.read_ulong (this->fragment_offset)
00565 || !header_cdr.read_ulong (this->fragment_id)
00566 || !header_cdr.read_ulong (this->fragment_count))
00567 {
00568 ACE_ERROR_RETURN ((LM_ERROR,
00569 "Error decoding mcast packet header.\n"),
00570 -1);
00571 }
00572
00573 if (checkcrc)
00574 {
00575 CORBA::Octet padding[4];
00576 header_cdr.read_octet_array (padding, 4);
00577
00578 unsigned char *crcparts = (unsigned char *)(&this->crc);
00579
00580 for (int cnt=0; cnt != 4; ++cnt)
00581 {
00582 crcparts[cnt] = padding[cnt];
00583 }
00584
00585 this->crc = ntohl (this->crc);
00586 }
00587
00588
00589 size_t data_bytes_received =
00590 bytes_received - TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE;
00591
00592 if (this->request_size < this->fragment_size
00593 || this->fragment_offset >= this->request_size
00594 || this->fragment_id >= this->fragment_count
00595 || (this->fragment_count == 1
00596 && (this->fragment_size != this->request_size
00597 || this->request_size != data_bytes_received)))
00598 {
00599 ACE_ERROR_RETURN ((LM_ERROR,
00600 "Invalid mcast fragment: "
00601 "inconsistent header fields.\n"),
00602 -1);
00603 }
00604
00605 return 0;
00606 }
00607
00608 TAO_END_VERSIONED_NAMESPACE_DECL