ECG_CDR_Message_Receiver.cpp

Go to the documentation of this file.
00001 // ECG_CDR_Message_Receiver.cpp,v 1.14 2006/03/14 06:14:25 jtc Exp
00002 
00003 #include "orbsvcs/Event/ECG_CDR_Message_Receiver.h"
00004 #include "orbsvcs/Event/ECG_CDR_Message_Sender.h"
00005 
00006 #include "tao/Exception.h"
00007 
00008 #include "ace/SOCK_Dgram.h"
00009 #include "ace/ACE.h"
00010 #include "ace/OS_NS_string.h"
00011 
00012 #if !defined(__ACE_INLINE__)
00013 #include "orbsvcs/Event/ECG_CDR_Message_Receiver.i"
00014 #endif /* __ACE_INLINE__ */
00015 
00016 ACE_RCSID (Event,
00017            ECG_CDR_Message_Receiver,
00018            "ECG_CDR_Message_Receiver.cpp,v 1.14 2006/03/14 06:14:25 jtc Exp")
00019 
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021 
00022 TAO_ECG_CDR_Processor::~TAO_ECG_CDR_Processor (void)
00023 {
00024 }
00025 // ****************************************************************
00026 
00027 TAO_ECG_UDP_Request_Entry::~TAO_ECG_UDP_Request_Entry (void)
00028 {
00029   if (this->own_received_fragments_)
00030     {
00031       this->own_received_fragments_ = 0;
00032       delete[] this->received_fragments_;
00033     }
00034 }
00035 
00036 TAO_ECG_UDP_Request_Entry::
00037 TAO_ECG_UDP_Request_Entry (CORBA::Boolean byte_order,
00038                            CORBA::ULong request_id,
00039                            CORBA::ULong request_size,
00040                            CORBA::ULong fragment_count)
00041   : byte_order_ (byte_order)
00042   , request_id_ (request_id)
00043   , request_size_ (request_size)
00044   , fragment_count_ (fragment_count)
00045 {
00046   ACE_CDR::grow (&this->payload_, this->request_size_);
00047   this->payload_.wr_ptr (request_size_);
00048 
00049   this->received_fragments_ = this->default_received_fragments_;
00050   this->own_received_fragments_ = 0;
00051   const int bits_per_ulong = sizeof(CORBA::ULong) * CHAR_BIT;
00052   this->received_fragments_size_ =
00053     this->fragment_count_ / bits_per_ulong + 1;
00054   if (this->received_fragments_size_ > ECG_DEFAULT_FRAGMENT_BUFSIZ)
00055     {
00056       ACE_NEW (this->received_fragments_,
00057                CORBA::ULong[this->received_fragments_size_]);
00058       this->own_received_fragments_ = 1;
00059     }
00060 
00061   for (CORBA::ULong i = 0; i < this->received_fragments_size_; ++i)
00062     this->received_fragments_[i] = 0;
00063   CORBA::ULong idx = this->fragment_count_ / bits_per_ulong;
00064   CORBA::ULong bit = this->fragment_count_ % bits_per_ulong;
00065   this->received_fragments_[idx] = (0xFFFFFFFF << bit);
00066 }
00067 
00068 int
00069 TAO_ECG_UDP_Request_Entry::validate_fragment (CORBA::Boolean byte_order,
00070                                               CORBA::ULong request_size,
00071                                               CORBA::ULong fragment_size,
00072                                               CORBA::ULong fragment_offset,
00073                                               CORBA::ULong /* fragment_id */,
00074                                               CORBA::ULong fragment_count) const
00075 {
00076   if (byte_order != this->byte_order_
00077       || request_size != this->request_size_
00078       || fragment_count != this->fragment_count_)
00079     return 0;
00080 
00081   if (fragment_offset >= request_size
00082       || fragment_offset + fragment_size > request_size)
00083     return 0;
00084 
00085   return 1;
00086 }
00087 
00088 int
00089 TAO_ECG_UDP_Request_Entry::test_received (CORBA::ULong fragment_id) const
00090 {
00091   // Assume out-of-range fragments as received, so they are dropped...
00092   if (fragment_id > this->fragment_count_)
00093     return 1;
00094 
00095   const int bits_per_ulong = sizeof(CORBA::ULong) * CHAR_BIT;
00096   CORBA::ULong idx = fragment_id / bits_per_ulong;
00097   CORBA::ULong bit = fragment_id % bits_per_ulong;
00098   return ACE_BIT_ENABLED (this->received_fragments_[idx], 1<<bit);
00099 }
00100 
00101 void
00102 TAO_ECG_UDP_Request_Entry::mark_received (CORBA::ULong fragment_id)
00103 {
00104   // Assume out-of-range fragments as received, so they are dropped...
00105   if (fragment_id > this->fragment_count_)
00106     return;
00107 
00108   const int bits_per_ulong = sizeof(CORBA::ULong) * CHAR_BIT;
00109   CORBA::ULong idx = fragment_id / bits_per_ulong;
00110   CORBA::ULong bit = fragment_id % bits_per_ulong;
00111   ACE_SET_BITS (this->received_fragments_[idx], 1<<bit);
00112 }
00113 
00114 int
00115 TAO_ECG_UDP_Request_Entry::complete (void) const
00116 {
00117   for (CORBA::ULong i = 0;
00118        i < this->received_fragments_size_;
00119        ++i)
00120     {
00121       if (this->received_fragments_[i] != 0xFFFFFFFF)
00122         return 0;
00123     }
00124   return 1;
00125 }
00126 
00127 char*
00128 TAO_ECG_UDP_Request_Entry::fragment_buffer (CORBA::ULong fragment_offset)
00129 {
00130   return this->payload_.rd_ptr () + fragment_offset;
00131 }
00132 // ****************************************************************
00133 
00134 int
00135 TAO_ECG_CDR_Message_Receiver::Requests::init (size_t size,
00136                                               size_t min_purge_count)
00137 {
00138   // Already initialized.
00139   if (this->fragmented_requests_)
00140     return -1;
00141 
00142   ACE_NEW_RETURN (this->fragmented_requests_,
00143                   TAO_ECG_UDP_Request_Entry*[size],
00144                   -1);
00145 
00146   this->size_ = size;
00147   this->id_range_low_ = 0;
00148   this->id_range_high_ = size - 1;
00149   this->min_purge_count_ = min_purge_count;
00150 
00151   for (size_t i = 0; i < size; ++i)
00152     {
00153       this->fragmented_requests_[i] = 0;
00154     }
00155 
00156   return 0;
00157 }
00158 
00159 TAO_ECG_CDR_Message_Receiver::Requests::~Requests (void)
00160 {
00161   for (size_t i = 0; i < this->size_; ++i)
00162     {
00163       TAO_ECG_UDP_Request_Entry* request =
00164         this->fragmented_requests_[i];
00165 
00166       if (request != &TAO_ECG_CDR_Message_Receiver::Request_Completed_)
00167         delete request;
00168     }
00169 
00170   delete [] this->fragmented_requests_;
00171 
00172   this->fragmented_requests_ = 0;
00173   this->size_ = 0;
00174   this->id_range_low_ = 0;
00175   this->id_range_high_ = 0;
00176 }
00177 
00178 TAO_ECG_UDP_Request_Entry **
00179 TAO_ECG_CDR_Message_Receiver::Requests::get_request (CORBA::ULong request_id)
00180 {
00181   if (request_id < this->id_range_low_)
00182     // <request_id> is below the current range.
00183     {
00184       return 0;
00185     }
00186 
00187   if (request_id > this->id_range_high_)
00188     // <request_id> is above the current range - need to shift the range
00189     // to include it.
00190     {
00191       CORBA::ULong new_slots_needed = request_id - this->id_range_high_;
00192 
00193       if (new_slots_needed < this->min_purge_count_)
00194         new_slots_needed = this->min_purge_count_;
00195 
00196       if (new_slots_needed > this->size_)
00197         // Shifting the range by more than the size of array.
00198         {
00199           this->purge_requests (this->id_range_low_, this->id_range_high_);
00200           this->id_range_high_ = request_id;
00201           this->id_range_low_ = request_id - this->size_ + 1;
00202         }
00203       else
00204         {
00205           this->purge_requests (this->id_range_low_,
00206                                 this->id_range_low_ + new_slots_needed - 1);
00207           this->id_range_high_ += new_slots_needed;
00208           this->id_range_low_ += new_slots_needed;
00209         }
00210     }
00211 
00212   // Return array location for <request_id>.
00213   int index = request_id % this->size_;
00214   return this->fragmented_requests_ + index;
00215 }
00216 
00217 
00218 void
00219 TAO_ECG_CDR_Message_Receiver::Requests::purge_requests (
00220                                             CORBA::ULong purge_first,
00221                                             CORBA::ULong purge_last)
00222 {
00223   for (CORBA::ULong i = purge_first; i <= purge_last; ++i)
00224     {
00225       size_t index = i % this->size_;
00226       if (this->fragmented_requests_[index]
00227           != &TAO_ECG_CDR_Message_Receiver::Request_Completed_)
00228         {
00229           delete this->fragmented_requests_[index];
00230         }
00231       this->fragmented_requests_[index] = 0;
00232     }
00233 }
00234 
00235 // ****************************************************************
00236 
00237 TAO_ECG_UDP_Request_Entry
00238 TAO_ECG_CDR_Message_Receiver::Request_Completed_ (0, 0, 0, 0);
00239 
00240 int
00241 TAO_ECG_CDR_Message_Receiver::handle_input (
00242                                  ACE_SOCK_Dgram& dgram,
00243                                  TAO_ECG_CDR_Processor *cdr_processor)
00244 {
00245   char nonaligned_header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE
00246                         + ACE_CDR::MAX_ALIGNMENT];
00247   char *header_buf = ACE_ptr_align_binary (nonaligned_header,
00248                                            ACE_CDR::MAX_ALIGNMENT);
00249 
00250   char nonaligned_data[ACE_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT];
00251   char *data_buf = ACE_ptr_align_binary (nonaligned_data,
00252                                          ACE_CDR::MAX_ALIGNMENT);
00253 
00254   // Read the message from dgram.
00255 
00256   const int iovcnt = 2;
00257   iovec iov[iovcnt];
00258   iov[0].iov_base = header_buf;
00259   iov[0].iov_len  = TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE;
00260   iov[1].iov_base = data_buf;
00261   iov[1].iov_len  = ACE_MAX_DGRAM_SIZE;
00262 
00263   ACE_INET_Addr from;
00264   ssize_t n = dgram.recv (iov, iovcnt, from);
00265 
00266   if (n == -1)
00267     {
00268       if (errno == EWOULDBLOCK)
00269         return 0;
00270 
00271       ACE_ERROR_RETURN ((LM_ERROR, "Error reading mcast fragment (%m).\n"),
00272                         -1);
00273     }
00274 
00275   if (n == 0)
00276     {
00277       ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: "
00278                                   "read 0 bytes from socket.\n"),
00279                         0);
00280     }
00281 
00282   if (n < TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE)
00283     {
00284       ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: "
00285                                    "# of bytes read < mcast header size.\n"),
00286                          -1);
00287     }
00288 
00289   u_int crc = 0;
00290 
00291   if (this->check_crc_)
00292     {
00293       iov[1].iov_len = n - iov[0].iov_len;
00294       iov[0].iov_len -= 4;  // don't include crc
00295 
00296       crc = ACE::crc32 (iov, 2);
00297     }
00298   // Check whether the message is a loopback message.
00299   if (this->ignore_from_.get () != 0
00300       && this->ignore_from_->is_loopback (from))
00301     {
00302       return 0;
00303     }
00304 
00305   // Decode and validate mcast header.
00306   Mcast_Header header;
00307   if (header.read (header_buf, n, this->check_crc_) == -1)
00308     return -1;
00309 
00310   if ( this->check_crc_ && header.crc != crc)
00311     {
00312       static unsigned int err_count = 0;
00313       ACE_ERROR ((LM_DEBUG,
00314                   "******************************\n"));
00315 
00316       ACE_ERROR ((LM_DEBUG,
00317                   "ERROR DETECTED \n"));
00318 
00319       if (crc == 0)
00320         {
00321           ACE_ERROR ((LM_DEBUG,
00322                       "Sending process may not have computed CRC \n"));
00323         }
00324       else
00325         {
00326           ACE_ERROR ((LM_DEBUG,
00327                       " NETWORK CRC CHECKSUM FAILED\n"));
00328         }
00329 
00330       ACE_ERROR ((LM_ERROR,
00331                   "Message was received from [%s:%s:%d] \n",
00332                   from.get_host_name (),
00333                   from.get_host_addr (),
00334                   from.get_port_number()));
00335 
00336       ACE_ERROR ((LM_ERROR,
00337                   "Num errors = %d \n",
00338                   ++err_count));
00339       ACE_ERROR ((LM_ERROR,
00340                   "This is a bad thing. Attempting to ignore ..\n"));
00341 
00342       return 0;
00343     }
00344 
00345   // Process received data.
00346   if (header.fragment_count == 1)
00347     {
00348       // Update <request_map_> to mark this request as completed. (Not
00349       // needed if we don't care about duplicates.)
00350       int result = this->mark_received (from, header.request_id);
00351       if (result != 1)
00352         return result;
00353 
00354       TAO_InputCDR cdr (data_buf, header.request_size, header.byte_order);
00355       if (cdr_processor->decode (cdr) == -1)
00356         return -1;
00357       else
00358         return 1;
00359     }
00360 
00361   return this->process_fragment (from, header, data_buf, cdr_processor);
00362 }
00363 
00364 int
00365 TAO_ECG_CDR_Message_Receiver::mark_received (const ACE_INET_Addr &from,
00366                                              CORBA::ULong request_id)
00367 {
00368   // ACE_GUARD_RETURN (ACE_Lock, guard, *this->lock_, -1);
00369 
00370   Request_Map::ENTRY * entry = this->get_source_entry (from);
00371   if (!entry)
00372     return -1;
00373 
00374   TAO_ECG_UDP_Request_Entry ** request =
00375     entry->int_id_->get_request (request_id);
00376 
00377   if (request == 0)
00378     {
00379       ACE_DEBUG ((LM_WARNING, "Received mcast request with sequence"
00380                               "below currently expected range.\n"));
00381       return 0;
00382     }
00383   if (*request == &Request_Completed_)
00384     {
00385       ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment. "
00386                            "(Request already complete).\n"));
00387       return 0;
00388     }
00389   if (*request != 0)
00390     {
00391       ACE_ERROR_RETURN ((LM_ERROR, "Inconsistent fragments for "
00392                                    "mcast request.\n"),
00393                          -1);
00394     }
00395 
00396   *request = &Request_Completed_;
00397   return 1;
00398 }
00399 
00400 int
00401 TAO_ECG_CDR_Message_Receiver::process_fragment (
00402                                    const ACE_INET_Addr &from,
00403                                    const Mcast_Header &header,
00404                                    char * data_buf,
00405                                    TAO_ECG_CDR_Processor *cdr_processor)
00406 {
00407   // ACE_GUARD_RETURN (ACE_Lock, guard, *this->lock_, -1);
00408 
00409   Request_Map::ENTRY * source_entry = this->get_source_entry (from);
00410   if (!source_entry)
00411     return -1;
00412 
00413   TAO_ECG_UDP_Request_Entry ** request =
00414     source_entry->int_id_->get_request (header.request_id);
00415 
00416   if (request == 0)
00417     {
00418       ACE_DEBUG ((LM_WARNING, "Received mcast request with sequence "
00419                               "below currently expected range.\n"));
00420       return 0;
00421     }
00422   if (*request == &Request_Completed_)
00423     {
00424       ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment. "
00425                            "(Request already complete).\n"));
00426       return 0;
00427     }
00428   if (*request == 0)
00429     // Entry for this request has not yet been allocated.
00430     {
00431       ACE_NEW_RETURN (*request,
00432                       TAO_ECG_UDP_Request_Entry (header.byte_order,
00433                                                  header.request_id,
00434                                                  header.request_size,
00435                                                  header.fragment_count),
00436                       -1);
00437     }
00438 
00439   // Validate the fragment.
00440   if ((*request)->validate_fragment (header.byte_order,
00441                                      header.request_size,
00442                                      header.fragment_size,
00443                                      header.fragment_offset,
00444                                      header.fragment_id,
00445                                      header.fragment_count) == 0)
00446     {
00447       ACE_ERROR_RETURN ((LM_ERROR,
00448                          "Received invalid mcast fragment.\n"),
00449                         -1);
00450     }
00451 
00452   // Check whether this fragment was already received.
00453   if ((*request)->test_received (header.fragment_id) == 1)
00454     {
00455       ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment.\n"));
00456       return 0;
00457     }
00458 
00459   // Add the fragment to the request entry.
00460   (*request)->mark_received (header.fragment_id);
00461   ACE_OS::memcpy ((*request)->fragment_buffer (header.fragment_offset),
00462                   data_buf,
00463                   header.fragment_size);
00464 
00465   // The request is not yet complete.
00466   if (!(*request)->complete ())
00467     {
00468       return 0;
00469     }
00470 
00471   // The request is complete - decode it.
00472   TAO_InputCDR cdr ((*request)->fragment_buffer (0),
00473                     header.request_size,
00474                     header.byte_order);
00475 
00476   if (cdr_processor->decode (cdr) == -1)
00477     return -1;
00478 
00479   delete *request;
00480   *request = &Request_Completed_;
00481   return 1;
00482 }
00483 
00484 TAO_ECG_CDR_Message_Receiver::Request_Map::ENTRY*
00485 TAO_ECG_CDR_Message_Receiver::get_source_entry (const ACE_INET_Addr &from)
00486 {
00487   // Get the entry for <from> from the <request_map_>.
00488   Request_Map::ENTRY * entry;
00489 
00490   if (this->request_map_.find (from, entry) == -1)
00491     {
00492       // Create an entry if one doesn't exist.
00493       Requests *requests = 0;
00494       ACE_NEW_RETURN (requests,
00495                       Requests,
00496                       0);
00497       auto_ptr<Requests> requests_aptr (requests);
00498 
00499       if (requests->init (this->max_requests_, this->min_purge_count_) == -1
00500           || this->request_map_.bind (from, requests, entry) == -1)
00501         {
00502           ACE_ERROR_RETURN ((LM_ERROR, "Unable to create hash map "
00503                                        "entry for a new request.\n"),
00504                             0);
00505         }
00506       requests_aptr.release ();
00507     }
00508 
00509   return entry;
00510 }
00511 
00512 void
00513 TAO_ECG_CDR_Message_Receiver::shutdown (void)
00514 {
00515   // ACE_GUARD (ACE_Lock, guard, *this->lock_);
00516 
00517   Request_Map::iterator end = this->request_map_.end ();
00518   for (Request_Map::iterator i =  this->request_map_.begin ();
00519        i != end;
00520        ++i)
00521     {
00522       delete (*i).int_id_;
00523       (*i).int_id_ = 0;
00524     }
00525 
00526   this->ignore_from_.reset ();
00527 }
00528 
00529 // ****************************************************************
00530 int
00531 TAO_ECG_CDR_Message_Receiver::Mcast_Header::read (char *header,
00532                                                   size_t bytes_received,
00533                                                   CORBA::Boolean checkcrc)
00534 {
00535   // Decode.
00536   this->byte_order = header[0];
00537   if(this->byte_order != 0 && this->byte_order != 1)
00538     {
00539       ACE_ERROR_RETURN ((LM_ERROR, "Reading mcast packet header: byte "
00540                                    "order is neither 0 nor 1, it is %d.\n",
00541                          this->byte_order),
00542                         -1);
00543     }
00544 
00545   TAO_InputCDR header_cdr (header,
00546                            TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE,
00547                            byte_order);
00548   CORBA::Boolean unused;
00549   CORBA::Octet a, b, c;
00550   if (!header_cdr.read_boolean (unused)
00551       || !header_cdr.read_octet (a)
00552       || !header_cdr.read_octet (b)
00553       || !header_cdr.read_octet (c)
00554       || a != 'A' || b != 'B' || c != 'C')
00555     {
00556       ACE_ERROR_RETURN ((LM_ERROR, "Error reading magic bytes "
00557                                    "in mcast packet header.\n"),
00558                         -1);
00559     }
00560 
00561   if (!header_cdr.read_ulong (this->request_id)
00562       || !header_cdr.read_ulong (this->request_size)
00563       || !header_cdr.read_ulong (this->fragment_size)
00564       || !header_cdr.read_ulong (this->fragment_offset)
00565       || !header_cdr.read_ulong (this->fragment_id)
00566       || !header_cdr.read_ulong (this->fragment_count))
00567     {
00568       ACE_ERROR_RETURN ((LM_ERROR,
00569                         "Error decoding mcast packet header.\n"),
00570                         -1);
00571     }
00572 
00573   if (checkcrc)
00574     {
00575       CORBA::Octet padding[4];
00576       header_cdr.read_octet_array (padding, 4);
00577 
00578       unsigned char *crcparts = (unsigned char *)(&this->crc);
00579 
00580       for (int cnt=0; cnt != 4; ++cnt)
00581         {
00582           crcparts[cnt] = padding[cnt];
00583         }
00584 
00585       this->crc = ntohl (this->crc);
00586     }
00587 
00588   // Validate.
00589   size_t data_bytes_received =
00590     bytes_received - TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE;
00591 
00592   if (this->request_size < this->fragment_size
00593       || this->fragment_offset >= this->request_size
00594       || this->fragment_id >= this->fragment_count
00595       || (this->fragment_count == 1
00596           && (this->fragment_size != this->request_size
00597               || this->request_size != data_bytes_received)))
00598     {
00599       ACE_ERROR_RETURN ((LM_ERROR,
00600                         "Invalid mcast fragment: "
00601                         "inconsistent header fields.\n"),
00602                         -1);
00603     }
00604 
00605   return 0;
00606 }
00607 
00608 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:11:10 2006 for TAO_RTEvent by doxygen 1.3.6