ECG_CDR_Message_Receiver.cpp

Go to the documentation of this file.
00001 // $Id: ECG_CDR_Message_Receiver.cpp 76842 2007-02-01 23:15:16Z mitza $
00002 
00003 #include "orbsvcs/Event/ECG_CDR_Message_Receiver.h"
00004 #include "orbsvcs/Event/ECG_CDR_Message_Sender.h"
00005 
00006 #include "tao/Exception.h"
00007 
00008 #include "ace/SOCK_Dgram.h"
00009 #include "ace/ACE.h"
00010 #include "ace/OS_NS_string.h"
00011 
00012 #if !defined(__ACE_INLINE__)
00013 #include "orbsvcs/Event/ECG_CDR_Message_Receiver.inl"
00014 #endif /* __ACE_INLINE__ */
00015 
00016 ACE_RCSID (Event,
00017            ECG_CDR_Message_Receiver,
00018            "$Id: ECG_CDR_Message_Receiver.cpp 76842 2007-02-01 23:15:16Z mitza $")
00019 
00020 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00021 
00022 TAO_ECG_CDR_Processor::~TAO_ECG_CDR_Processor (void)
00023 {
00024 }
00025 // ****************************************************************
00026 
00027 TAO_ECG_UDP_Request_Entry::~TAO_ECG_UDP_Request_Entry (void)
00028 {
00029   if (this->own_received_fragments_)
00030     {
00031       this->own_received_fragments_ = 0;
00032       delete[] this->received_fragments_;
00033     }
00034 }
00035 
00036 TAO_ECG_UDP_Request_Entry::
00037 TAO_ECG_UDP_Request_Entry (CORBA::Boolean byte_order,
00038                            CORBA::ULong request_id,
00039                            CORBA::ULong request_size,
00040                            CORBA::ULong fragment_count)
00041   : byte_order_ (byte_order)
00042   , request_id_ (request_id)
00043   , request_size_ (request_size)
00044   , fragment_count_ (fragment_count)
00045 {
00046   ACE_CDR::grow (&this->payload_, this->request_size_);
00047   this->payload_.wr_ptr (request_size_);
00048 
00049   this->received_fragments_ = this->default_received_fragments_;
00050   this->own_received_fragments_ = 0;
00051   const int bits_per_ulong = sizeof(CORBA::ULong) * CHAR_BIT;
00052   this->received_fragments_size_ =
00053     this->fragment_count_ / bits_per_ulong + 1;
00054   if (this->received_fragments_size_ > ECG_DEFAULT_FRAGMENT_BUFSIZ)
00055     {
00056       ACE_NEW (this->received_fragments_,
00057                CORBA::ULong[this->received_fragments_size_]);
00058       this->own_received_fragments_ = 1;
00059     }
00060 
00061   for (CORBA::ULong i = 0; i < this->received_fragments_size_; ++i)
00062     this->received_fragments_[i] = 0;
00063   CORBA::ULong idx = this->fragment_count_ / bits_per_ulong;
00064   CORBA::ULong bit = this->fragment_count_ % bits_per_ulong;
00065   this->received_fragments_[idx] = (0xFFFFFFFF << bit);
00066 }
00067 
00068 int
00069 TAO_ECG_UDP_Request_Entry::validate_fragment (CORBA::Boolean byte_order,
00070                                               CORBA::ULong request_size,
00071                                               CORBA::ULong fragment_size,
00072                                               CORBA::ULong fragment_offset,
00073                                               CORBA::ULong /* fragment_id */,
00074                                               CORBA::ULong fragment_count) const
00075 {
00076   if (byte_order != this->byte_order_
00077       || request_size != this->request_size_
00078       || fragment_count != this->fragment_count_)
00079     return 0;
00080 
00081   if (fragment_offset >= request_size
00082       || fragment_offset + fragment_size > request_size)
00083     return 0;
00084 
00085   return 1;
00086 }
00087 
00088 int
00089 TAO_ECG_UDP_Request_Entry::test_received (CORBA::ULong fragment_id) const
00090 {
00091   // Assume out-of-range fragments as received, so they are dropped...
00092   if (fragment_id > this->fragment_count_)
00093     return 1;
00094 
00095   const int bits_per_ulong = sizeof(CORBA::ULong) * CHAR_BIT;
00096   CORBA::ULong idx = fragment_id / bits_per_ulong;
00097   CORBA::ULong bit = fragment_id % bits_per_ulong;
00098   return ACE_BIT_ENABLED (this->received_fragments_[idx], 1<<bit);
00099 }
00100 
00101 void
00102 TAO_ECG_UDP_Request_Entry::mark_received (CORBA::ULong fragment_id)
00103 {
00104   // Assume out-of-range fragments as received, so they are dropped...
00105   if (fragment_id > this->fragment_count_)
00106     return;
00107 
00108   const int bits_per_ulong = sizeof(CORBA::ULong) * CHAR_BIT;
00109   CORBA::ULong idx = fragment_id / bits_per_ulong;
00110   CORBA::ULong bit = fragment_id % bits_per_ulong;
00111   ACE_SET_BITS (this->received_fragments_[idx], 1<<bit);
00112 }
00113 
00114 int
00115 TAO_ECG_UDP_Request_Entry::complete (void) const
00116 {
00117   for (CORBA::ULong i = 0;
00118        i < this->received_fragments_size_;
00119        ++i)
00120     {
00121       if (this->received_fragments_[i] != 0xFFFFFFFF)
00122         return 0;
00123     }
00124   return 1;
00125 }
00126 
00127 char*
00128 TAO_ECG_UDP_Request_Entry::fragment_buffer (CORBA::ULong fragment_offset)
00129 {
00130   return this->payload_.rd_ptr () + fragment_offset;
00131 }
00132 // ****************************************************************
00133 
00134 int
00135 TAO_ECG_CDR_Message_Receiver::Requests::init (size_t size,
00136                                               size_t min_purge_count)
00137 {
00138   // Already initialized.
00139   if (this->fragmented_requests_)
00140     return -1;
00141 
00142   ACE_NEW_RETURN (this->fragmented_requests_,
00143                   TAO_ECG_UDP_Request_Entry*[size],
00144                   -1);
00145 
00146   this->size_ = size;
00147   this->id_range_low_ = 0;
00148   this->id_range_high_ = size - 1;
00149   this->min_purge_count_ = min_purge_count;
00150 
00151   for (size_t i = 0; i < size; ++i)
00152     {
00153       this->fragmented_requests_[i] = 0;
00154     }
00155 
00156   return 0;
00157 }
00158 
00159 TAO_ECG_CDR_Message_Receiver::Requests::~Requests (void)
00160 {
00161   for (size_t i = 0; i < this->size_; ++i)
00162     {
00163       TAO_ECG_UDP_Request_Entry* request =
00164         this->fragmented_requests_[i];
00165 
00166       if (request != &TAO_ECG_CDR_Message_Receiver::Request_Completed_)
00167         delete request;
00168     }
00169 
00170   delete [] this->fragmented_requests_;
00171 
00172   this->fragmented_requests_ = 0;
00173   this->size_ = 0;
00174   this->id_range_low_ = 0;
00175   this->id_range_high_ = 0;
00176 }
00177 
00178 TAO_ECG_UDP_Request_Entry **
00179 TAO_ECG_CDR_Message_Receiver::Requests::get_request (CORBA::ULong request_id)
00180 {
00181   if (request_id < this->id_range_low_)
00182     // <request_id> is below the current range.
00183     {
00184       return 0;
00185     }
00186 
00187   if (request_id > this->id_range_high_)
00188     // <request_id> is above the current range - need to shift the range
00189     // to include it.
00190     {
00191       CORBA::ULong new_slots_needed = request_id - this->id_range_high_;
00192 
00193       if (new_slots_needed < this->min_purge_count_)
00194         new_slots_needed = this->min_purge_count_;
00195 
00196       if (new_slots_needed > this->size_)
00197         // Shifting the range by more than the size of array.
00198         {
00199           this->purge_requests (this->id_range_low_, this->id_range_high_);
00200           this->id_range_high_ = request_id;
00201           this->id_range_low_ = request_id - this->size_ + 1;
00202         }
00203       else
00204         {
00205           this->purge_requests (this->id_range_low_,
00206                                 this->id_range_low_ + new_slots_needed - 1);
00207           this->id_range_high_ += new_slots_needed;
00208           this->id_range_low_ += new_slots_needed;
00209         }
00210     }
00211 
00212   // Return array location for <request_id>.
00213   int index = request_id % this->size_;
00214   return this->fragmented_requests_ + index;
00215 }
00216 
00217 
00218 void
00219 TAO_ECG_CDR_Message_Receiver::Requests::purge_requests (
00220                                             CORBA::ULong purge_first,
00221                                             CORBA::ULong purge_last)
00222 {
00223   for (CORBA::ULong i = purge_first; i <= purge_last; ++i)
00224     {
00225       size_t index = i % this->size_;
00226       if (this->fragmented_requests_[index]
00227           != &TAO_ECG_CDR_Message_Receiver::Request_Completed_)
00228         {
00229           delete this->fragmented_requests_[index];
00230         }
00231       this->fragmented_requests_[index] = 0;
00232     }
00233 }
00234 
00235 // ****************************************************************
00236 
00237 TAO_ECG_UDP_Request_Entry
00238 TAO_ECG_CDR_Message_Receiver::Request_Completed_ (0, 0, 0, 0);
00239 
00240 int
00241 TAO_ECG_CDR_Message_Receiver::handle_input (
00242                                  ACE_SOCK_Dgram& dgram,
00243                                  TAO_ECG_CDR_Processor *cdr_processor)
00244 {
00245   char nonaligned_header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE
00246                         + ACE_CDR::MAX_ALIGNMENT];
00247   char *header_buf = ACE_ptr_align_binary (nonaligned_header,
00248                                            ACE_CDR::MAX_ALIGNMENT);
00249   char nonaligned_data[ACE_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT];
00250   char *data_buf = ACE_ptr_align_binary (nonaligned_data,
00251                                          ACE_CDR::MAX_ALIGNMENT);
00252 
00253   // Read the message from dgram.
00254 
00255   const int iovcnt = 2;
00256   iovec iov[iovcnt];
00257   iov[0].iov_base = header_buf;
00258   iov[0].iov_len  = TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE;
00259   iov[1].iov_base = data_buf;
00260   iov[1].iov_len  = ACE_MAX_DGRAM_SIZE;
00261 
00262   ACE_INET_Addr from;
00263   ssize_t n = dgram.recv (iov, iovcnt, from);
00264 
00265   if (n == -1)
00266     {
00267       if (errno == EWOULDBLOCK)
00268         return 0;
00269 
00270       ACE_ERROR_RETURN ((LM_ERROR, "Error reading mcast fragment (%m).\n"),
00271                         -1);
00272     }
00273 
00274   if (n == 0)
00275     {
00276       ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: "
00277                                   "read 0 bytes from socket.\n"),
00278                         0);
00279     }
00280 
00281   if (n < TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE)
00282     {
00283       ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: "
00284                                    "# of bytes read < mcast header size.\n"),
00285                          -1);
00286     }
00287 
00288   u_int crc = 0;
00289 
00290   if (this->check_crc_)
00291     {
00292       iov[1].iov_len = n - iov[0].iov_len;
00293       iov[0].iov_len -= 4;  // don't include crc
00294 
00295       crc = ACE::crc32 (iov, 2);
00296     }
00297   // Check whether the message is a loopback message.
00298   if (this->ignore_from_.get () != 0
00299       && this->ignore_from_->is_loopback (from))
00300     {
00301       return 0;
00302     }
00303 
00304   // Decode and validate mcast header.
00305   Mcast_Header header;
00306   if (header.read (header_buf, n, this->check_crc_) == -1)
00307     return -1;
00308 
00309   if ( this->check_crc_ && header.crc != crc)
00310     {
00311       static unsigned int err_count = 0;
00312       ACE_ERROR ((LM_ERROR,
00313                   "******************************\n"));
00314 
00315       ACE_ERROR ((LM_ERROR,
00316                   "ERROR DETECTED \n"));
00317 
00318       if (crc == 0)
00319         {
00320           ACE_ERROR ((LM_ERROR,
00321                       "Sending process may not have computed CRC \n"));
00322         }
00323       else
00324         {
00325           ACE_ERROR ((LM_ERROR,
00326                       " NETWORK CRC CHECKSUM FAILED\n"));
00327         }
00328 
00329       ACE_ERROR ((LM_ERROR,
00330                   "Message was received from [%s:%s:%d] \n",
00331                   from.get_host_name (),
00332                   from.get_host_addr (),
00333                   from.get_port_number()));
00334 
00335       ACE_ERROR ((LM_ERROR,
00336                   "Num errors = %d \n",
00337                   ++err_count));
00338       ACE_ERROR ((LM_ERROR,
00339                   "This is a bad thing. Attempting to ignore ..\n"));
00340 
00341       return 0;
00342     }
00343 
00344   // Process received data.
00345   if (header.fragment_count == 1)
00346     {
00347       // Update <request_map_> to mark this request as completed. (Not
00348       // needed if we don't care about duplicates.)
00349       int const result = this->mark_received (from, header.request_id);
00350       if (result != 1)
00351         return result;
00352 
00353       TAO_InputCDR cdr (data_buf, header.request_size, header.byte_order);
00354       if (cdr_processor->decode (cdr) == -1)
00355         return -1;
00356       else
00357         return 1;
00358     }
00359 
00360   return this->process_fragment (from, header, data_buf, cdr_processor);
00361 }
00362 
00363 int
00364 TAO_ECG_CDR_Message_Receiver::mark_received (const ACE_INET_Addr &from,
00365                                              CORBA::ULong request_id)
00366 {
00367   // ACE_GUARD_RETURN (ACE_Lock, guard, *this->lock_, -1);
00368 
00369   Request_Map::ENTRY * entry = this->get_source_entry (from);
00370   if (!entry)
00371     return -1;
00372 
00373   TAO_ECG_UDP_Request_Entry ** request =
00374     entry->int_id_->get_request (request_id);
00375 
00376   if (request == 0)
00377     {
00378       ACE_DEBUG ((LM_WARNING, "Received mcast request with sequence"
00379                               "below currently expected range.\n"));
00380       return 0;
00381     }
00382   if (*request == &Request_Completed_)
00383     {
00384       ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment. "
00385                            "(Request already complete).\n"));
00386       return 0;
00387     }
00388   if (*request != 0)
00389     {
00390       ACE_ERROR_RETURN ((LM_ERROR, "Inconsistent fragments for "
00391                                    "mcast request.\n"),
00392                          -1);
00393     }
00394 
00395   *request = &Request_Completed_;
00396   return 1;
00397 }
00398 
00399 int
00400 TAO_ECG_CDR_Message_Receiver::process_fragment (
00401                                    const ACE_INET_Addr &from,
00402                                    const Mcast_Header &header,
00403                                    char * data_buf,
00404                                    TAO_ECG_CDR_Processor *cdr_processor)
00405 {
00406   // ACE_GUARD_RETURN (ACE_Lock, guard, *this->lock_, -1);
00407 
00408   Request_Map::ENTRY * source_entry = this->get_source_entry (from);
00409   if (!source_entry)
00410     return -1;
00411 
00412   TAO_ECG_UDP_Request_Entry ** request =
00413     source_entry->int_id_->get_request (header.request_id);
00414 
00415   if (request == 0)
00416     {
00417       ACE_DEBUG ((LM_WARNING, "Received mcast request with sequence "
00418                               "below currently expected range.\n"));
00419       return 0;
00420     }
00421   if (*request == &Request_Completed_)
00422     {
00423       ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment. "
00424                            "(Request already complete).\n"));
00425       return 0;
00426     }
00427   if (*request == 0)
00428     // Entry for this request has not yet been allocated.
00429     {
00430       ACE_NEW_RETURN (*request,
00431                       TAO_ECG_UDP_Request_Entry (header.byte_order,
00432                                                  header.request_id,
00433                                                  header.request_size,
00434                                                  header.fragment_count),
00435                       -1);
00436     }
00437 
00438   // Validate the fragment.
00439   if ((*request)->validate_fragment (header.byte_order,
00440                                      header.request_size,
00441                                      header.fragment_size,
00442                                      header.fragment_offset,
00443                                      header.fragment_id,
00444                                      header.fragment_count) == 0)
00445     {
00446       ACE_ERROR_RETURN ((LM_ERROR,
00447                          "Received invalid mcast fragment.\n"),
00448                         -1);
00449     }
00450 
00451   // Check whether this fragment was already received.
00452   if ((*request)->test_received (header.fragment_id) == 1)
00453     {
00454       ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment.\n"));
00455       return 0;
00456     }
00457 
00458   // Add the fragment to the request entry.
00459   (*request)->mark_received (header.fragment_id);
00460   ACE_OS::memcpy ((*request)->fragment_buffer (header.fragment_offset),
00461                   data_buf,
00462                   header.fragment_size);
00463 
00464   // The request is not yet complete.
00465   if (!(*request)->complete ())
00466     {
00467       return 0;
00468     }
00469 
00470   // The request is complete - decode it.
00471   TAO_InputCDR cdr ((*request)->fragment_buffer (0),
00472                     header.request_size,
00473                     header.byte_order);
00474 
00475   if (cdr_processor->decode (cdr) == -1)
00476     return -1;
00477 
00478   delete *request;
00479   *request = &Request_Completed_;
00480   return 1;
00481 }
00482 
00483 TAO_ECG_CDR_Message_Receiver::Request_Map::ENTRY*
00484 TAO_ECG_CDR_Message_Receiver::get_source_entry (const ACE_INET_Addr &from)
00485 {
00486   // Get the entry for <from> from the <request_map_>.
00487   Request_Map::ENTRY * entry = 0;
00488 
00489   if (this->request_map_.find (from, entry) == -1)
00490     {
00491       // Create an entry if one doesn't exist.
00492       Requests *requests = 0;
00493       ACE_NEW_RETURN (requests,
00494                       Requests,
00495                       0);
00496       auto_ptr<Requests> requests_aptr (requests);
00497 
00498       if (requests->init (this->max_requests_, this->min_purge_count_) == -1
00499           || this->request_map_.bind (from, requests, entry) == -1)
00500         {
00501           ACE_ERROR_RETURN ((LM_ERROR, "Unable to create hash map "
00502                                        "entry for a new request.\n"),
00503                             0);
00504         }
00505       requests_aptr.release ();
00506     }
00507 
00508   return entry;
00509 }
00510 
00511 void
00512 TAO_ECG_CDR_Message_Receiver::shutdown (void)
00513 {
00514   // ACE_GUARD (ACE_Lock, guard, *this->lock_);
00515 
00516   Request_Map::iterator end = this->request_map_.end ();
00517   for (Request_Map::iterator i =  this->request_map_.begin ();
00518        i != end;
00519        ++i)
00520     {
00521       delete (*i).int_id_;
00522       (*i).int_id_ = 0;
00523     }
00524 
00525   this->ignore_from_.reset ();
00526 }
00527 
00528 // ****************************************************************
00529 int
00530 TAO_ECG_CDR_Message_Receiver::Mcast_Header::read (char *header,
00531                                                   size_t bytes_received,
00532                                                   CORBA::Boolean checkcrc)
00533 {
00534   // Decode.
00535   this->byte_order = header[0];
00536   if(this->byte_order != 0 && this->byte_order != 1)
00537     {
00538       ACE_ERROR_RETURN ((LM_ERROR, "Reading mcast packet header: byte "
00539                                    "order is neither 0 nor 1, it is %d.\n",
00540                          this->byte_order),
00541                         -1);
00542     }
00543 
00544   TAO_InputCDR header_cdr (header,
00545                            TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE,
00546                            byte_order);
00547   CORBA::Boolean unused;
00548   CORBA::Octet a, b, c;
00549   if (!header_cdr.read_boolean (unused)
00550       || !header_cdr.read_octet (a)
00551       || !header_cdr.read_octet (b)
00552       || !header_cdr.read_octet (c)
00553       || a != 'A' || b != 'B' || c != 'C')
00554     {
00555       ACE_ERROR_RETURN ((LM_ERROR, "Error reading magic bytes "
00556                                    "in mcast packet header.\n"),
00557                         -1);
00558     }
00559 
00560   if (!header_cdr.read_ulong (this->request_id)
00561       || !header_cdr.read_ulong (this->request_size)
00562       || !header_cdr.read_ulong (this->fragment_size)
00563       || !header_cdr.read_ulong (this->fragment_offset)
00564       || !header_cdr.read_ulong (this->fragment_id)
00565       || !header_cdr.read_ulong (this->fragment_count))
00566     {
00567       ACE_ERROR_RETURN ((LM_ERROR,
00568                         "Error decoding mcast packet header.\n"),
00569                         -1);
00570     }
00571 
00572   if (checkcrc)
00573     {
00574       CORBA::Octet padding[4];
00575       header_cdr.read_octet_array (padding, 4);
00576 
00577       unsigned char *crcparts = (unsigned char *)(&this->crc);
00578 
00579       for (int cnt=0; cnt != 4; ++cnt)
00580         {
00581           crcparts[cnt] = padding[cnt];
00582         }
00583 
00584       this->crc = ACE_NTOHL (this->crc);
00585     }
00586 
00587   // Validate.
00588   size_t const data_bytes_received =
00589     bytes_received - TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE;
00590 
00591   if (this->request_size < this->fragment_size
00592       || this->fragment_offset >= this->request_size
00593       || this->fragment_id >= this->fragment_count
00594       || (this->fragment_count == 1
00595           && (this->fragment_size != this->request_size
00596               || this->request_size != data_bytes_received)))
00597     {
00598       ACE_ERROR_RETURN ((LM_ERROR,
00599                         "Invalid mcast fragment: "
00600                         "inconsistent header fields.\n"),
00601                         -1);
00602     }
00603 
00604   return 0;
00605 }
00606 
00607 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:44:06 2010 for TAO_RTEvent by  doxygen 1.4.7