TAO_ECG_CDR_Message_Receiver Class Reference

Receives UDP and Multicast messages. More...

#include <ECG_CDR_Message_Receiver.h>

Collaboration diagram for TAO_ECG_CDR_Message_Receiver:

Collaboration graph
[legend]
List of all members.

Public Member Functions

int handle_input (ACE_SOCK_Dgram &dgram, TAO_ECG_CDR_Processor *cdr_processor)
 TAO_ECG_CDR_Message_Receiver (CORBA::Boolean check_crc)
 ~TAO_ECG_CDR_Message_Receiver (void)
void init (TAO_ECG_Refcounted_Endpoint ignore_from)
void shutdown (void)

Static Public Attributes

TAO_ECG_UDP_Request_Entry Request_Completed_

Private Types

typedef ACE_Hash_Map_Manager<
ACE_INET_Addr, Requests *,
ACE_Null_Mutex
Request_Map
enum  { ECG_DEFAULT_MAX_FRAGMENTED_REQUESTS = 1024, ECG_DEFAULT_FRAGMENTED_REQUESTS_MIN_PURGE_COUNT = 32 }

Private Member Functions

int mark_received (const ACE_INET_Addr &from, CORBA::ULong request_id)
int process_fragment (const ACE_INET_Addr &from, const Mcast_Header &header, char *data_buf, TAO_ECG_CDR_Processor *cdr_processor)
Request_Map::ENTRYget_source_entry (const ACE_INET_Addr &from)

Private Attributes

TAO_ECG_Refcounted_Endpoint ignore_from_
 Ignore any events coming from this IP address.

Request_Map request_map_
size_t max_requests_
 Serializes use of .

size_t min_purge_count_
CORBA::Boolean check_crc_
 Flag to indicate whether CRC should be computed and checked.


Detailed Description

Receives UDP and Multicast messages.

Todo:
Update class documentation below.
5) Make status array size and purge_count configurable.

This class receives UDP and Multicast message fragments, assembles them (described in detail below), and passes complete messages in the form of cdr streams to the calling classes.

This class is used by various Gateway classes (Senders/Receivers) responsible for federating Event Channels with UDP/Mcast.

= REASSEMBLY Fragmentation is described in ECG_CDR_Message_Sender.h Whenever an incomplete fragment is received (one with fragment_count > 1) we allocate an entry for the message in an map indexed by (host,port,request_id). The entry contains the buffer, a bit vector to keep track of the fragments received so far, and a timeout counter. This timeout counter is set to 0 on each (new) fragment arrival, and incremented on a regular basis. If the counter reaches a maximum value the message is dropped. Once all the fragments have been received the message is sent up to the calling classes, and the memory reclaimed.

Definition at line 144 of file ECG_CDR_Message_Receiver.h.


Member Typedef Documentation

typedef ACE_Hash_Map_Manager<ACE_INET_Addr, Requests*, ACE_Null_Mutex> TAO_ECG_CDR_Message_Receiver::Request_Map [private]
 

Definition at line 193 of file ECG_CDR_Message_Receiver.h.


Member Enumeration Documentation

anonymous enum [private]
 

Enumeration values:
ECG_DEFAULT_MAX_FRAGMENTED_REQUESTS 
ECG_DEFAULT_FRAGMENTED_REQUESTS_MIN_PURGE_COUNT 

Definition at line 183 of file ECG_CDR_Message_Receiver.h.


Constructor & Destructor Documentation

ACE_INLINE TAO_ECG_CDR_Message_Receiver::TAO_ECG_CDR_Message_Receiver CORBA::Boolean  check_crc  ) 
 

Definition at line 20 of file ECG_CDR_Message_Receiver.i.

References check_crc_, ECG_DEFAULT_FRAGMENTED_REQUESTS_MIN_PURGE_COUNT, ECG_DEFAULT_MAX_FRAGMENTED_REQUESTS, ignore_from_, max_requests_, min_purge_count_, and request_map_.

00021   : ignore_from_ ()
00022   , request_map_ ()
00023   /* , lock_ (0) */
00024   , max_requests_ (ECG_DEFAULT_MAX_FRAGMENTED_REQUESTS)
00025   , min_purge_count_ (ECG_DEFAULT_FRAGMENTED_REQUESTS_MIN_PURGE_COUNT)
00026   , check_crc_ (crc)
00027 {
00028 //    ACE_NEW (this->lock_,
00029 //             ACE_Lock_Adapter<ACE_Null_Mutex>);
00030 }

ACE_INLINE TAO_ECG_CDR_Message_Receiver::~TAO_ECG_CDR_Message_Receiver void   ) 
 

Definition at line 33 of file ECG_CDR_Message_Receiver.i.

References shutdown().

00034 {
00035   this->shutdown ();
00036 }


Member Function Documentation

TAO_ECG_CDR_Message_Receiver::Request_Map::ENTRY * TAO_ECG_CDR_Message_Receiver::get_source_entry const ACE_INET_Addr from  )  [private]
 

Definition at line 485 of file ECG_CDR_Message_Receiver.cpp.

References ACE_ERROR_RETURN, ACE_NEW_RETURN, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_LOCK >::bind(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_LOCK >::find(), TAO_ECG_CDR_Message_Receiver::Requests::init(), LM_ERROR, ACE_Auto_Basic_Ptr< X >::release(), and request_map_.

Referenced by mark_received(), and process_fragment().

00486 {
00487   // Get the entry for <from> from the <request_map_>.
00488   Request_Map::ENTRY * entry;
00489 
00490   if (this->request_map_.find (from, entry) == -1)
00491     {
00492       // Create an entry if one doesn't exist.
00493       Requests *requests = 0;
00494       ACE_NEW_RETURN (requests,
00495                       Requests,
00496                       0);
00497       auto_ptr<Requests> requests_aptr (requests);
00498 
00499       if (requests->init (this->max_requests_, this->min_purge_count_) == -1
00500           || this->request_map_.bind (from, requests, entry) == -1)
00501         {
00502           ACE_ERROR_RETURN ((LM_ERROR, "Unable to create hash map "
00503                                        "entry for a new request.\n"),
00504                             0);
00505         }
00506       requests_aptr.release ();
00507     }
00508 
00509   return entry;
00510 }

int TAO_ECG_CDR_Message_Receiver::handle_input ACE_SOCK_Dgram dgram,
TAO_ECG_CDR_Processor cdr_processor
 

Returns 1 if data was read successfully and accepted by without errors. Returns 0 if there were no errors, but no data has been passed to , either due to request being incomplete (not all fragments received), or it being a duplicate. Returns -1 if there were errors.

Definition at line 241 of file ECG_CDR_Message_Receiver.cpp.

References ACE_ERROR, ACE_ERROR_RETURN, ACE_MAX_DGRAM_SIZE, ACE_ptr_align_binary, TAO_ECG_CDR_Message_Receiver::Mcast_Header::byte_order, check_crc_, TAO_ECG_CDR_Message_Receiver::Mcast_Header::crc, ACE::crc32(), TAO_ECG_CDR_Processor::decode(), EWOULDBLOCK, TAO_ECG_CDR_Message_Receiver::Mcast_Header::fragment_count, ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::get(), ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_host_name(), ACE_INET_Addr::get_port_number(), ignore_from_, iovec::iov_base, iovec::iov_len, LM_DEBUG, LM_ERROR, mark_received(), process_fragment(), TAO_ECG_CDR_Message_Receiver::Mcast_Header::read(), ACE_SOCK_Dgram::recv(), TAO_ECG_CDR_Message_Receiver::Mcast_Header::request_id, TAO_ECG_CDR_Message_Receiver::Mcast_Header::request_size, and ssize_t.

Referenced by TAO_ECG_UDP_Receiver::handle_input().

00244 {
00245   char nonaligned_header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE
00246                         + ACE_CDR::MAX_ALIGNMENT];
00247   char *header_buf = ACE_ptr_align_binary (nonaligned_header,
00248                                            ACE_CDR::MAX_ALIGNMENT);
00249 
00250   char nonaligned_data[ACE_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT];
00251   char *data_buf = ACE_ptr_align_binary (nonaligned_data,
00252                                          ACE_CDR::MAX_ALIGNMENT);
00253 
00254   // Read the message from dgram.
00255 
00256   const int iovcnt = 2;
00257   iovec iov[iovcnt];
00258   iov[0].iov_base = header_buf;
00259   iov[0].iov_len  = TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE;
00260   iov[1].iov_base = data_buf;
00261   iov[1].iov_len  = ACE_MAX_DGRAM_SIZE;
00262 
00263   ACE_INET_Addr from;
00264   ssize_t n = dgram.recv (iov, iovcnt, from);
00265 
00266   if (n == -1)
00267     {
00268       if (errno == EWOULDBLOCK)
00269         return 0;
00270 
00271       ACE_ERROR_RETURN ((LM_ERROR, "Error reading mcast fragment (%m).\n"),
00272                         -1);
00273     }
00274 
00275   if (n == 0)
00276     {
00277       ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: "
00278                                   "read 0 bytes from socket.\n"),
00279                         0);
00280     }
00281 
00282   if (n < TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE)
00283     {
00284       ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: "
00285                                    "# of bytes read < mcast header size.\n"),
00286                          -1);
00287     }
00288 
00289   u_int crc = 0;
00290 
00291   if (this->check_crc_)
00292     {
00293       iov[1].iov_len = n - iov[0].iov_len;
00294       iov[0].iov_len -= 4;  // don't include crc
00295 
00296       crc = ACE::crc32 (iov, 2);
00297     }
00298   // Check whether the message is a loopback message.
00299   if (this->ignore_from_.get () != 0
00300       && this->ignore_from_->is_loopback (from))
00301     {
00302       return 0;
00303     }
00304 
00305   // Decode and validate mcast header.
00306   Mcast_Header header;
00307   if (header.read (header_buf, n, this->check_crc_) == -1)
00308     return -1;
00309 
00310   if ( this->check_crc_ && header.crc != crc)
00311     {
00312       static unsigned int err_count = 0;
00313       ACE_ERROR ((LM_DEBUG,
00314                   "******************************\n"));
00315 
00316       ACE_ERROR ((LM_DEBUG,
00317                   "ERROR DETECTED \n"));
00318 
00319       if (crc == 0)
00320         {
00321           ACE_ERROR ((LM_DEBUG,
00322                       "Sending process may not have computed CRC \n"));
00323         }
00324       else
00325         {
00326           ACE_ERROR ((LM_DEBUG,
00327                       " NETWORK CRC CHECKSUM FAILED\n"));
00328         }
00329 
00330       ACE_ERROR ((LM_ERROR,
00331                   "Message was received from [%s:%s:%d] \n",
00332                   from.get_host_name (),
00333                   from.get_host_addr (),
00334                   from.get_port_number()));
00335 
00336       ACE_ERROR ((LM_ERROR,
00337                   "Num errors = %d \n",
00338                   ++err_count));
00339       ACE_ERROR ((LM_ERROR,
00340                   "This is a bad thing. Attempting to ignore ..\n"));
00341 
00342       return 0;
00343     }
00344 
00345   // Process received data.
00346   if (header.fragment_count == 1)
00347     {
00348       // Update <request_map_> to mark this request as completed. (Not
00349       // needed if we don't care about duplicates.)
00350       int result = this->mark_received (from, header.request_id);
00351       if (result != 1)
00352         return result;
00353 
00354       TAO_InputCDR cdr (data_buf, header.request_size, header.byte_order);
00355       if (cdr_processor->decode (cdr) == -1)
00356         return -1;
00357       else
00358         return 1;
00359     }
00360 
00361   return this->process_fragment (from, header, data_buf, cdr_processor);
00362 }

ACE_INLINE void TAO_ECG_CDR_Message_Receiver::init TAO_ECG_Refcounted_Endpoint  ignore_from  ) 
 

Parameters:
ignore_from Endpoint used to remove events generated by the same process.

Definition at line 39 of file ECG_CDR_Message_Receiver.i.

References ignore_from_, and TAO_ECG_Refcounted_Endpoint.

Referenced by TAO_ECG_UDP_Receiver::init().

00041 {
00042   this->ignore_from_ = ignore_from;
00043 
00044 //    if (lock)
00045 //      {
00046 //        delete this->lock_;
00047 //        this->lock_ = lock;
00048 //      }
00049 }

int TAO_ECG_CDR_Message_Receiver::mark_received const ACE_INET_Addr from,
CORBA::ULong  request_id
[private]
 

Returns 1 on success, 0 if has already been received or is below current request range, and -1 on error.

Definition at line 365 of file ECG_CDR_Message_Receiver.cpp.

References ACE_DEBUG, ACE_ERROR_RETURN, get_source_entry(), LM_ERROR, LM_INFO, LM_WARNING, and Request_Completed_.

Referenced by handle_input().

00367 {
00368   // ACE_GUARD_RETURN (ACE_Lock, guard, *this->lock_, -1);
00369 
00370   Request_Map::ENTRY * entry = this->get_source_entry (from);
00371   if (!entry)
00372     return -1;
00373 
00374   TAO_ECG_UDP_Request_Entry ** request =
00375     entry->int_id_->get_request (request_id);
00376 
00377   if (request == 0)
00378     {
00379       ACE_DEBUG ((LM_WARNING, "Received mcast request with sequence"
00380                               "below currently expected range.\n"));
00381       return 0;
00382     }
00383   if (*request == &Request_Completed_)
00384     {
00385       ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment. "
00386                            "(Request already complete).\n"));
00387       return 0;
00388     }
00389   if (*request != 0)
00390     {
00391       ACE_ERROR_RETURN ((LM_ERROR, "Inconsistent fragments for "
00392                                    "mcast request.\n"),
00393                          -1);
00394     }
00395 
00396   *request = &Request_Completed_;
00397   return 1;
00398 }

int TAO_ECG_CDR_Message_Receiver::process_fragment const ACE_INET_Addr from,
const Mcast_Header header,
char *  data_buf,
TAO_ECG_CDR_Processor cdr_processor
[private]
 

Returns 1 if complete request is received and is populated, 0 if request has only partially been received or is a duplicate, and -1 on error.

Definition at line 401 of file ECG_CDR_Message_Receiver.cpp.

References ACE_DEBUG, ACE_ERROR_RETURN, ACE_NEW_RETURN, TAO_ECG_CDR_Message_Receiver::Mcast_Header::byte_order, TAO_ECG_CDR_Processor::decode(), TAO_ECG_CDR_Message_Receiver::Mcast_Header::fragment_count, TAO_ECG_CDR_Message_Receiver::Mcast_Header::fragment_id, TAO_ECG_CDR_Message_Receiver::Mcast_Header::fragment_offset, TAO_ECG_CDR_Message_Receiver::Mcast_Header::fragment_size, get_source_entry(), LM_ERROR, LM_INFO, LM_WARNING, ACE_OS::memcpy(), Request_Completed_, TAO_ECG_CDR_Message_Receiver::Mcast_Header::request_id, and TAO_ECG_CDR_Message_Receiver::Mcast_Header::request_size.

Referenced by handle_input().

00406 {
00407   // ACE_GUARD_RETURN (ACE_Lock, guard, *this->lock_, -1);
00408 
00409   Request_Map::ENTRY * source_entry = this->get_source_entry (from);
00410   if (!source_entry)
00411     return -1;
00412 
00413   TAO_ECG_UDP_Request_Entry ** request =
00414     source_entry->int_id_->get_request (header.request_id);
00415 
00416   if (request == 0)
00417     {
00418       ACE_DEBUG ((LM_WARNING, "Received mcast request with sequence "
00419                               "below currently expected range.\n"));
00420       return 0;
00421     }
00422   if (*request == &Request_Completed_)
00423     {
00424       ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment. "
00425                            "(Request already complete).\n"));
00426       return 0;
00427     }
00428   if (*request == 0)
00429     // Entry for this request has not yet been allocated.
00430     {
00431       ACE_NEW_RETURN (*request,
00432                       TAO_ECG_UDP_Request_Entry (header.byte_order,
00433                                                  header.request_id,
00434                                                  header.request_size,
00435                                                  header.fragment_count),
00436                       -1);
00437     }
00438 
00439   // Validate the fragment.
00440   if ((*request)->validate_fragment (header.byte_order,
00441                                      header.request_size,
00442                                      header.fragment_size,
00443                                      header.fragment_offset,
00444                                      header.fragment_id,
00445                                      header.fragment_count) == 0)
00446     {
00447       ACE_ERROR_RETURN ((LM_ERROR,
00448                          "Received invalid mcast fragment.\n"),
00449                         -1);
00450     }
00451 
00452   // Check whether this fragment was already received.
00453   if ((*request)->test_received (header.fragment_id) == 1)
00454     {
00455       ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment.\n"));
00456       return 0;
00457     }
00458 
00459   // Add the fragment to the request entry.
00460   (*request)->mark_received (header.fragment_id);
00461   ACE_OS::memcpy ((*request)->fragment_buffer (header.fragment_offset),
00462                   data_buf,
00463                   header.fragment_size);
00464 
00465   // The request is not yet complete.
00466   if (!(*request)->complete ())
00467     {
00468       return 0;
00469     }
00470 
00471   // The request is complete - decode it.
00472   TAO_InputCDR cdr ((*request)->fragment_buffer (0),
00473                     header.request_size,
00474                     header.byte_order);
00475 
00476   if (cdr_processor->decode (cdr) == -1)
00477     return -1;
00478 
00479   delete *request;
00480   *request = &Request_Completed_;
00481   return 1;
00482 }

void TAO_ECG_CDR_Message_Receiver::shutdown void   ) 
 

Definition at line 513 of file ECG_CDR_Message_Receiver.cpp.

References ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_LOCK >::begin(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_LOCK >::end(), ignore_from_, request_map_, and ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::reset().

Referenced by TAO_ECG_UDP_Receiver::shutdown(), and ~TAO_ECG_CDR_Message_Receiver().

00514 {
00515   // ACE_GUARD (ACE_Lock, guard, *this->lock_);
00516 
00517   Request_Map::iterator end = this->request_map_.end ();
00518   for (Request_Map::iterator i =  this->request_map_.begin ();
00519        i != end;
00520        ++i)
00521     {
00522       delete (*i).int_id_;
00523       (*i).int_id_ = 0;
00524     }
00525 
00526   this->ignore_from_.reset ();
00527 }


Member Data Documentation

CORBA::Boolean TAO_ECG_CDR_Message_Receiver::check_crc_ [private]
 

Flag to indicate whether CRC should be computed and checked.

Definition at line 235 of file ECG_CDR_Message_Receiver.h.

Referenced by handle_input(), and TAO_ECG_CDR_Message_Receiver().

TAO_ECG_Refcounted_Endpoint TAO_ECG_CDR_Message_Receiver::ignore_from_ [private]
 

Ignore any events coming from this IP address.

Definition at line 216 of file ECG_CDR_Message_Receiver.h.

Referenced by handle_input(), init(), shutdown(), and TAO_ECG_CDR_Message_Receiver().

size_t TAO_ECG_CDR_Message_Receiver::max_requests_ [private]
 

Serializes use of .

Size of a fragmented requests array, i.e., max number of partially received requests kept at any given time per source.

Definition at line 227 of file ECG_CDR_Message_Receiver.h.

Referenced by TAO_ECG_CDR_Message_Receiver().

size_t TAO_ECG_CDR_Message_Receiver::min_purge_count_ [private]
 

Minimum number of requests purged from a fragmented requests array when the range of requests represented there needs to be shifted.

Definition at line 232 of file ECG_CDR_Message_Receiver.h.

Referenced by TAO_ECG_CDR_Message_Receiver().

TAO_ECG_UDP_Request_Entry TAO_ECG_CDR_Message_Receiver::Request_Completed_ [static]
 

Represents any request that has been fully received and serviced, to simplify the internal logic.

Referenced by mark_received(), process_fragment(), TAO_ECG_CDR_Message_Receiver::Requests::purge_requests(), and TAO_ECG_CDR_Message_Receiver::Requests::~Requests().

Request_Map TAO_ECG_CDR_Message_Receiver::request_map_ [private]
 

The map containing all the incoming requests which have been partially received.

Definition at line 220 of file ECG_CDR_Message_Receiver.h.

Referenced by get_source_entry(), shutdown(), and TAO_ECG_CDR_Message_Receiver().


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 13:16:03 2006 for TAO_RTEvent by doxygen 1.3.6