TAO_ECG_CDR_Message_Receiver Class Reference

Receives UDP and Multicast messages. More...

#include <ECG_CDR_Message_Receiver.h>

Collaboration diagram for TAO_ECG_CDR_Message_Receiver:

Collaboration graph
[legend]
List of all members.

Public Member Functions

int handle_input (ACE_SOCK_Dgram &dgram, TAO_ECG_CDR_Processor *cdr_processor)
 TAO_ECG_CDR_Message_Receiver (CORBA::Boolean check_crc)
 Initialization and termination methods.
 ~TAO_ECG_CDR_Message_Receiver (void)
 Initialization and termination methods.
void init (TAO_ECG_Refcounted_Endpoint ignore_from)
void shutdown (void)
 Initialization and termination methods.

Static Public Attributes

static TAO_ECG_UDP_Request_Entry Request_Completed_

Private Types

typedef ACE_Hash_Map_Manager<
ACE_INET_Addr, Requests *,
ACE_Null_Mutex
Request_Map
 ECG_DEFAULT_MAX_FRAGMENTED_REQUESTS = 1024
 ECG_DEFAULT_FRAGMENTED_REQUESTS_MIN_PURGE_COUNT = 32
enum  { ECG_DEFAULT_MAX_FRAGMENTED_REQUESTS = 1024, ECG_DEFAULT_FRAGMENTED_REQUESTS_MIN_PURGE_COUNT = 32 }

Private Member Functions

int mark_received (const ACE_INET_Addr &from, CORBA::ULong request_id)
int process_fragment (const ACE_INET_Addr &from, const Mcast_Header &header, char *data_buf, TAO_ECG_CDR_Processor *cdr_processor)
Request_Map::ENTRYget_source_entry (const ACE_INET_Addr &from)

Private Attributes

TAO_ECG_Refcounted_Endpoint ignore_from_
 Ignore any events coming from this IP address.
Request_Map request_map_
size_t max_requests_
 Serializes use of <request_map_>.
size_t min_purge_count_
CORBA::Boolean check_crc_
 Flag to indicate whether CRC should be computed and checked.

Classes

struct  Mcast_Header
 Helper for decoding, validating and storing mcast header. More...
class  Requests

Detailed Description

Receives UDP and Multicast messages.

Todo:
Update class documentation below.
5) Make status array size and purge_count configurable.

This class receives UDP and Multicast message fragments, assembles them (described in detail below), and passes complete messages in the form of cdr streams to the calling classes.

This class is used by various Gateway classes (Senders/Receivers) responsible for federating Event Channels with UDP/Mcast.

= REASSEMBLY Fragmentation is described in ECG_CDR_Message_Sender.h Whenever an incomplete fragment is received (one with fragment_count > 1) we allocate an entry for the message in an map indexed by (host,port,request_id). The entry contains the buffer, a bit vector to keep track of the fragments received so far, and a timeout counter. This timeout counter is set to 0 on each (new) fragment arrival, and incremented on a regular basis. If the counter reaches a maximum value the message is dropped. Once all the fragments have been received the message is sent up to the calling classes, and the memory reclaimed.

Definition at line 143 of file ECG_CDR_Message_Receiver.h.


Member Typedef Documentation

typedef ACE_Hash_Map_Manager<ACE_INET_Addr, Requests*, ACE_Null_Mutex> TAO_ECG_CDR_Message_Receiver::Request_Map [private]

Definition at line 188 of file ECG_CDR_Message_Receiver.h.


Member Enumeration Documentation

anonymous enum [private]

Enumerator:
ECG_DEFAULT_MAX_FRAGMENTED_REQUESTS 
ECG_DEFAULT_FRAGMENTED_REQUESTS_MIN_PURGE_COUNT 

Definition at line 182 of file ECG_CDR_Message_Receiver.h.


Constructor & Destructor Documentation

TAO_ECG_CDR_Message_Receiver::TAO_ECG_CDR_Message_Receiver ( CORBA::Boolean  check_crc  ) 

Initialization and termination methods.

TAO_ECG_CDR_Message_Receiver::~TAO_ECG_CDR_Message_Receiver ( void   ) 

Initialization and termination methods.


Member Function Documentation

TAO_ECG_CDR_Message_Receiver::Request_Map::ENTRY * TAO_ECG_CDR_Message_Receiver::get_source_entry ( const ACE_INET_Addr from  )  [private]

Definition at line 484 of file ECG_CDR_Message_Receiver.cpp.

References ACE_ERROR_RETURN, ACE_NEW_RETURN, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::bind(), LM_ERROR, ACE_Auto_Basic_Ptr< X >::release(), and request_map_.

Referenced by mark_received(), and process_fragment().

00485 {
00486   // Get the entry for <from> from the <request_map_>.
00487   Request_Map::ENTRY * entry = 0;
00488 
00489   if (this->request_map_.find (from, entry) == -1)
00490     {
00491       // Create an entry if one doesn't exist.
00492       Requests *requests = 0;
00493       ACE_NEW_RETURN (requests,
00494                       Requests,
00495                       0);
00496       auto_ptr<Requests> requests_aptr (requests);
00497 
00498       if (requests->init (this->max_requests_, this->min_purge_count_) == -1
00499           || this->request_map_.bind (from, requests, entry) == -1)
00500         {
00501           ACE_ERROR_RETURN ((LM_ERROR, "Unable to create hash map "
00502                                        "entry for a new request.\n"),
00503                             0);
00504         }
00505       requests_aptr.release ();
00506     }
00507 
00508   return entry;
00509 }

int TAO_ECG_CDR_Message_Receiver::handle_input ( ACE_SOCK_Dgram dgram,
TAO_ECG_CDR_Processor cdr_processor 
)

Returns 1 if data was read successfully and accepted by <cdr_processor> without errors. Returns 0 if there were no errors, but no data has been passed to <cdr_processor>, either due to request being incomplete (not all fragments received), or it being a duplicate. Returns -1 if there were errors.

Definition at line 241 of file ECG_CDR_Message_Receiver.cpp.

References ACE_ERROR, ACE_ERROR_RETURN, ACE_MAX_DGRAM_SIZE, ACE_ptr_align_binary(), TAO_ECG_CDR_Message_Receiver::Mcast_Header::byte_order, TAO_ECG_CDR_Message_Receiver::Mcast_Header::crc, ACE::crc32(), TAO_ECG_CDR_Processor::decode(), TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE, TAO_ECG_CDR_Message_Receiver::Mcast_Header::fragment_count, ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_host_name(), ACE_INET_Addr::get_port_number(), ignore_from_, LM_ERROR, mark_received(), ACE_CDR::MAX_ALIGNMENT, process_fragment(), TAO_ECG_CDR_Message_Receiver::Mcast_Header::read(), ACE_SOCK_Dgram::recv(), TAO_ECG_CDR_Message_Receiver::Mcast_Header::request_id, and TAO_ECG_CDR_Message_Receiver::Mcast_Header::request_size.

Referenced by TAO_ECG_UDP_Receiver::handle_input().

00244 {
00245   char nonaligned_header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE
00246                         + ACE_CDR::MAX_ALIGNMENT];
00247   char *header_buf = ACE_ptr_align_binary (nonaligned_header,
00248                                            ACE_CDR::MAX_ALIGNMENT);
00249   char nonaligned_data[ACE_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT];
00250   char *data_buf = ACE_ptr_align_binary (nonaligned_data,
00251                                          ACE_CDR::MAX_ALIGNMENT);
00252 
00253   // Read the message from dgram.
00254 
00255   const int iovcnt = 2;
00256   iovec iov[iovcnt];
00257   iov[0].iov_base = header_buf;
00258   iov[0].iov_len  = TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE;
00259   iov[1].iov_base = data_buf;
00260   iov[1].iov_len  = ACE_MAX_DGRAM_SIZE;
00261 
00262   ACE_INET_Addr from;
00263   ssize_t n = dgram.recv (iov, iovcnt, from);
00264 
00265   if (n == -1)
00266     {
00267       if (errno == EWOULDBLOCK)
00268         return 0;
00269 
00270       ACE_ERROR_RETURN ((LM_ERROR, "Error reading mcast fragment (%m).\n"),
00271                         -1);
00272     }
00273 
00274   if (n == 0)
00275     {
00276       ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: "
00277                                   "read 0 bytes from socket.\n"),
00278                         0);
00279     }
00280 
00281   if (n < TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE)
00282     {
00283       ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: "
00284                                    "# of bytes read < mcast header size.\n"),
00285                          -1);
00286     }
00287 
00288   u_int crc = 0;
00289 
00290   if (this->check_crc_)
00291     {
00292       iov[1].iov_len = n - iov[0].iov_len;
00293       iov[0].iov_len -= 4;  // don't include crc
00294 
00295       crc = ACE::crc32 (iov, 2);
00296     }
00297   // Check whether the message is a loopback message.
00298   if (this->ignore_from_.get () != 0
00299       && this->ignore_from_->is_loopback (from))
00300     {
00301       return 0;
00302     }
00303 
00304   // Decode and validate mcast header.
00305   Mcast_Header header;
00306   if (header.read (header_buf, n, this->check_crc_) == -1)
00307     return -1;
00308 
00309   if ( this->check_crc_ && header.crc != crc)
00310     {
00311       static unsigned int err_count = 0;
00312       ACE_ERROR ((LM_ERROR,
00313                   "******************************\n"));
00314 
00315       ACE_ERROR ((LM_ERROR,
00316                   "ERROR DETECTED \n"));
00317 
00318       if (crc == 0)
00319         {
00320           ACE_ERROR ((LM_ERROR,
00321                       "Sending process may not have computed CRC \n"));
00322         }
00323       else
00324         {
00325           ACE_ERROR ((LM_ERROR,
00326                       " NETWORK CRC CHECKSUM FAILED\n"));
00327         }
00328 
00329       ACE_ERROR ((LM_ERROR,
00330                   "Message was received from [%s:%s:%d] \n",
00331                   from.get_host_name (),
00332                   from.get_host_addr (),
00333                   from.get_port_number()));
00334 
00335       ACE_ERROR ((LM_ERROR,
00336                   "Num errors = %d \n",
00337                   ++err_count));
00338       ACE_ERROR ((LM_ERROR,
00339                   "This is a bad thing. Attempting to ignore ..\n"));
00340 
00341       return 0;
00342     }
00343 
00344   // Process received data.
00345   if (header.fragment_count == 1)
00346     {
00347       // Update <request_map_> to mark this request as completed. (Not
00348       // needed if we don't care about duplicates.)
00349       int const result = this->mark_received (from, header.request_id);
00350       if (result != 1)
00351         return result;
00352 
00353       TAO_InputCDR cdr (data_buf, header.request_size, header.byte_order);
00354       if (cdr_processor->decode (cdr) == -1)
00355         return -1;
00356       else
00357         return 1;
00358     }
00359 
00360   return this->process_fragment (from, header, data_buf, cdr_processor);
00361 }

void TAO_ECG_CDR_Message_Receiver::init ( TAO_ECG_Refcounted_Endpoint  ignore_from  ) 

Parameters:
ignore_from Endpoint used to remove events generated by the same process.

Referenced by TAO_ECG_UDP_Receiver::init().

int TAO_ECG_CDR_Message_Receiver::mark_received ( const ACE_INET_Addr from,
CORBA::ULong  request_id 
) [private]

Returns 1 on success, 0 if <request_id> has already been received or is below current request range, and -1 on error.

Definition at line 364 of file ECG_CDR_Message_Receiver.cpp.

References ACE_DEBUG, ACE_ERROR_RETURN, get_source_entry(), LM_ERROR, LM_INFO, LM_WARNING, and Request_Completed_.

Referenced by handle_input().

00366 {
00367   // ACE_GUARD_RETURN (ACE_Lock, guard, *this->lock_, -1);
00368 
00369   Request_Map::ENTRY * entry = this->get_source_entry (from);
00370   if (!entry)
00371     return -1;
00372 
00373   TAO_ECG_UDP_Request_Entry ** request =
00374     entry->int_id_->get_request (request_id);
00375 
00376   if (request == 0)
00377     {
00378       ACE_DEBUG ((LM_WARNING, "Received mcast request with sequence"
00379                               "below currently expected range.\n"));
00380       return 0;
00381     }
00382   if (*request == &Request_Completed_)
00383     {
00384       ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment. "
00385                            "(Request already complete).\n"));
00386       return 0;
00387     }
00388   if (*request != 0)
00389     {
00390       ACE_ERROR_RETURN ((LM_ERROR, "Inconsistent fragments for "
00391                                    "mcast request.\n"),
00392                          -1);
00393     }
00394 
00395   *request = &Request_Completed_;
00396   return 1;
00397 }

int TAO_ECG_CDR_Message_Receiver::process_fragment ( const ACE_INET_Addr from,
const Mcast_Header header,
char *  data_buf,
TAO_ECG_CDR_Processor cdr_processor 
) [private]

Returns 1 if complete request is received and <event> is populated, 0 if request has only partially been received or is a duplicate, and -1 on error.

Definition at line 400 of file ECG_CDR_Message_Receiver.cpp.

References ACE_DEBUG, ACE_ERROR_RETURN, ACE_NEW_RETURN, TAO_ECG_CDR_Message_Receiver::Mcast_Header::byte_order, TAO_ECG_CDR_Processor::decode(), TAO_ECG_CDR_Message_Receiver::Mcast_Header::fragment_count, TAO_ECG_CDR_Message_Receiver::Mcast_Header::fragment_id, TAO_ECG_CDR_Message_Receiver::Mcast_Header::fragment_offset, TAO_ECG_CDR_Message_Receiver::Mcast_Header::fragment_size, get_source_entry(), LM_ERROR, LM_INFO, LM_WARNING, ACE_OS::memcpy(), Request_Completed_, TAO_ECG_CDR_Message_Receiver::Mcast_Header::request_id, and TAO_ECG_CDR_Message_Receiver::Mcast_Header::request_size.

Referenced by handle_input().

00405 {
00406   // ACE_GUARD_RETURN (ACE_Lock, guard, *this->lock_, -1);
00407 
00408   Request_Map::ENTRY * source_entry = this->get_source_entry (from);
00409   if (!source_entry)
00410     return -1;
00411 
00412   TAO_ECG_UDP_Request_Entry ** request =
00413     source_entry->int_id_->get_request (header.request_id);
00414 
00415   if (request == 0)
00416     {
00417       ACE_DEBUG ((LM_WARNING, "Received mcast request with sequence "
00418                               "below currently expected range.\n"));
00419       return 0;
00420     }
00421   if (*request == &Request_Completed_)
00422     {
00423       ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment. "
00424                            "(Request already complete).\n"));
00425       return 0;
00426     }
00427   if (*request == 0)
00428     // Entry for this request has not yet been allocated.
00429     {
00430       ACE_NEW_RETURN (*request,
00431                       TAO_ECG_UDP_Request_Entry (header.byte_order,
00432                                                  header.request_id,
00433                                                  header.request_size,
00434                                                  header.fragment_count),
00435                       -1);
00436     }
00437 
00438   // Validate the fragment.
00439   if ((*request)->validate_fragment (header.byte_order,
00440                                      header.request_size,
00441                                      header.fragment_size,
00442                                      header.fragment_offset,
00443                                      header.fragment_id,
00444                                      header.fragment_count) == 0)
00445     {
00446       ACE_ERROR_RETURN ((LM_ERROR,
00447                          "Received invalid mcast fragment.\n"),
00448                         -1);
00449     }
00450 
00451   // Check whether this fragment was already received.
00452   if ((*request)->test_received (header.fragment_id) == 1)
00453     {
00454       ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment.\n"));
00455       return 0;
00456     }
00457 
00458   // Add the fragment to the request entry.
00459   (*request)->mark_received (header.fragment_id);
00460   ACE_OS::memcpy ((*request)->fragment_buffer (header.fragment_offset),
00461                   data_buf,
00462                   header.fragment_size);
00463 
00464   // The request is not yet complete.
00465   if (!(*request)->complete ())
00466     {
00467       return 0;
00468     }
00469 
00470   // The request is complete - decode it.
00471   TAO_InputCDR cdr ((*request)->fragment_buffer (0),
00472                     header.request_size,
00473                     header.byte_order);
00474 
00475   if (cdr_processor->decode (cdr) == -1)
00476     return -1;
00477 
00478   delete *request;
00479   *request = &Request_Completed_;
00480   return 1;
00481 }

void TAO_ECG_CDR_Message_Receiver::shutdown ( void   ) 

Initialization and termination methods.

Definition at line 512 of file ECG_CDR_Message_Receiver.cpp.

References ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::end(), ignore_from_, request_map_, and ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::reset().

Referenced by TAO_ECG_UDP_Receiver::shutdown().

00513 {
00514   // ACE_GUARD (ACE_Lock, guard, *this->lock_);
00515 
00516   Request_Map::iterator end = this->request_map_.end ();
00517   for (Request_Map::iterator i =  this->request_map_.begin ();
00518        i != end;
00519        ++i)
00520     {
00521       delete (*i).int_id_;
00522       (*i).int_id_ = 0;
00523     }
00524 
00525   this->ignore_from_.reset ();
00526 }


Member Data Documentation

CORBA::Boolean TAO_ECG_CDR_Message_Receiver::check_crc_ [private]

Flag to indicate whether CRC should be computed and checked.

Definition at line 234 of file ECG_CDR_Message_Receiver.h.

TAO_ECG_Refcounted_Endpoint TAO_ECG_CDR_Message_Receiver::ignore_from_ [private]

Ignore any events coming from this IP address.

Definition at line 215 of file ECG_CDR_Message_Receiver.h.

Referenced by handle_input(), and shutdown().

size_t TAO_ECG_CDR_Message_Receiver::max_requests_ [private]

Serializes use of <request_map_>.

Size of a fragmented requests array, i.e., max number of partially received requests kept at any given time per source.

Definition at line 226 of file ECG_CDR_Message_Receiver.h.

size_t TAO_ECG_CDR_Message_Receiver::min_purge_count_ [private]

Minimum number of requests purged from a fragmented requests array when the range of requests represented there needs to be shifted.

Definition at line 231 of file ECG_CDR_Message_Receiver.h.

TAO_ECG_UDP_Request_Entry TAO_ECG_CDR_Message_Receiver::Request_Completed_ [static]

Represents any request that has been fully received and serviced, to simplify the internal logic.

Definition at line 178 of file ECG_CDR_Message_Receiver.h.

Referenced by mark_received(), process_fragment(), TAO_ECG_CDR_Message_Receiver::Requests::purge_requests(), and TAO_ECG_CDR_Message_Receiver::Requests::~Requests().

Request_Map TAO_ECG_CDR_Message_Receiver::request_map_ [private]

The map containing all the incoming requests which have been partially received.

Definition at line 219 of file ECG_CDR_Message_Receiver.h.

Referenced by get_source_entry(), and shutdown().


The documentation for this class was generated from the following files:
Generated on Tue Feb 2 17:44:38 2010 for TAO_RTEvent by  doxygen 1.4.7