#include <ECG_CDR_Message_Receiver.h>
Collaboration diagram for TAO_ECG_CDR_Message_Receiver:
This class receives UDP and Multicast message fragments, assembles them (described in detail below), and passes complete messages in the form of cdr streams to the calling classes.
This class is used by various Gateway classes (Senders/Receivers) responsible for federating Event Channels with UDP/Mcast.
= REASSEMBLY Fragmentation is described in ECG_CDR_Message_Sender.h Whenever an incomplete fragment is received (one with fragment_count > 1) we allocate an entry for the message in an map indexed by (host,port,request_id). The entry contains the buffer, a bit vector to keep track of the fragments received so far, and a timeout counter. This timeout counter is set to 0 on each (new) fragment arrival, and incremented on a regular basis. If the counter reaches a maximum value the message is dropped. Once all the fragments have been received the message is sent up to the calling classes, and the memory reclaimed.
Definition at line 144 of file ECG_CDR_Message_Receiver.h.
|
Definition at line 193 of file ECG_CDR_Message_Receiver.h. |
|
Definition at line 183 of file ECG_CDR_Message_Receiver.h.
00183 { 00184 ECG_DEFAULT_MAX_FRAGMENTED_REQUESTS = 1024, 00185 ECG_DEFAULT_FRAGMENTED_REQUESTS_MIN_PURGE_COUNT = 32 00186 }; |
|
Definition at line 20 of file ECG_CDR_Message_Receiver.i. References check_crc_, ECG_DEFAULT_FRAGMENTED_REQUESTS_MIN_PURGE_COUNT, ECG_DEFAULT_MAX_FRAGMENTED_REQUESTS, ignore_from_, max_requests_, min_purge_count_, and request_map_.
00021 : ignore_from_ () 00022 , request_map_ () 00023 /* , lock_ (0) */ 00024 , max_requests_ (ECG_DEFAULT_MAX_FRAGMENTED_REQUESTS) 00025 , min_purge_count_ (ECG_DEFAULT_FRAGMENTED_REQUESTS_MIN_PURGE_COUNT) 00026 , check_crc_ (crc) 00027 { 00028 // ACE_NEW (this->lock_, 00029 // ACE_Lock_Adapter<ACE_Null_Mutex>); 00030 } |
|
Definition at line 33 of file ECG_CDR_Message_Receiver.i. References shutdown().
00034 { 00035 this->shutdown (); 00036 } |
|
Definition at line 485 of file ECG_CDR_Message_Receiver.cpp. References ACE_ERROR_RETURN, ACE_NEW_RETURN, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_LOCK >::bind(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_LOCK >::find(), TAO_ECG_CDR_Message_Receiver::Requests::init(), LM_ERROR, ACE_Auto_Basic_Ptr< X >::release(), and request_map_. Referenced by mark_received(), and process_fragment().
00486 { 00487 // Get the entry for <from> from the <request_map_>. 00488 Request_Map::ENTRY * entry; 00489 00490 if (this->request_map_.find (from, entry) == -1) 00491 { 00492 // Create an entry if one doesn't exist. 00493 Requests *requests = 0; 00494 ACE_NEW_RETURN (requests, 00495 Requests, 00496 0); 00497 auto_ptr<Requests> requests_aptr (requests); 00498 00499 if (requests->init (this->max_requests_, this->min_purge_count_) == -1 00500 || this->request_map_.bind (from, requests, entry) == -1) 00501 { 00502 ACE_ERROR_RETURN ((LM_ERROR, "Unable to create hash map " 00503 "entry for a new request.\n"), 00504 0); 00505 } 00506 requests_aptr.release (); 00507 } 00508 00509 return entry; 00510 } |
|
Returns 1 if data was read successfully and accepted by without errors. Returns 0 if there were no errors, but no data has been passed to , either due to request being incomplete (not all fragments received), or it being a duplicate. Returns -1 if there were errors. Definition at line 241 of file ECG_CDR_Message_Receiver.cpp. References ACE_ERROR, ACE_ERROR_RETURN, ACE_MAX_DGRAM_SIZE, ACE_ptr_align_binary, TAO_ECG_CDR_Message_Receiver::Mcast_Header::byte_order, check_crc_, TAO_ECG_CDR_Message_Receiver::Mcast_Header::crc, ACE::crc32(), TAO_ECG_CDR_Processor::decode(), EWOULDBLOCK, TAO_ECG_CDR_Message_Receiver::Mcast_Header::fragment_count, ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::get(), ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_host_name(), ACE_INET_Addr::get_port_number(), ignore_from_, iovec::iov_base, iovec::iov_len, LM_DEBUG, LM_ERROR, mark_received(), process_fragment(), TAO_ECG_CDR_Message_Receiver::Mcast_Header::read(), ACE_SOCK_Dgram::recv(), TAO_ECG_CDR_Message_Receiver::Mcast_Header::request_id, TAO_ECG_CDR_Message_Receiver::Mcast_Header::request_size, and ssize_t. Referenced by TAO_ECG_UDP_Receiver::handle_input().
00244 { 00245 char nonaligned_header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE 00246 + ACE_CDR::MAX_ALIGNMENT]; 00247 char *header_buf = ACE_ptr_align_binary (nonaligned_header, 00248 ACE_CDR::MAX_ALIGNMENT); 00249 00250 char nonaligned_data[ACE_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT]; 00251 char *data_buf = ACE_ptr_align_binary (nonaligned_data, 00252 ACE_CDR::MAX_ALIGNMENT); 00253 00254 // Read the message from dgram. 00255 00256 const int iovcnt = 2; 00257 iovec iov[iovcnt]; 00258 iov[0].iov_base = header_buf; 00259 iov[0].iov_len = TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE; 00260 iov[1].iov_base = data_buf; 00261 iov[1].iov_len = ACE_MAX_DGRAM_SIZE; 00262 00263 ACE_INET_Addr from; 00264 ssize_t n = dgram.recv (iov, iovcnt, from); 00265 00266 if (n == -1) 00267 { 00268 if (errno == EWOULDBLOCK) 00269 return 0; 00270 00271 ACE_ERROR_RETURN ((LM_ERROR, "Error reading mcast fragment (%m).\n"), 00272 -1); 00273 } 00274 00275 if (n == 0) 00276 { 00277 ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: " 00278 "read 0 bytes from socket.\n"), 00279 0); 00280 } 00281 00282 if (n < TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE) 00283 { 00284 ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: " 00285 "# of bytes read < mcast header size.\n"), 00286 -1); 00287 } 00288 00289 u_int crc = 0; 00290 00291 if (this->check_crc_) 00292 { 00293 iov[1].iov_len = n - iov[0].iov_len; 00294 iov[0].iov_len -= 4; // don't include crc 00295 00296 crc = ACE::crc32 (iov, 2); 00297 } 00298 // Check whether the message is a loopback message. 00299 if (this->ignore_from_.get () != 0 00300 && this->ignore_from_->is_loopback (from)) 00301 { 00302 return 0; 00303 } 00304 00305 // Decode and validate mcast header. 00306 Mcast_Header header; 00307 if (header.read (header_buf, n, this->check_crc_) == -1) 00308 return -1; 00309 00310 if ( this->check_crc_ && header.crc != crc) 00311 { 00312 static unsigned int err_count = 0; 00313 ACE_ERROR ((LM_DEBUG, 00314 "******************************\n")); 00315 00316 ACE_ERROR ((LM_DEBUG, 00317 "ERROR DETECTED \n")); 00318 00319 if (crc == 0) 00320 { 00321 ACE_ERROR ((LM_DEBUG, 00322 "Sending process may not have computed CRC \n")); 00323 } 00324 else 00325 { 00326 ACE_ERROR ((LM_DEBUG, 00327 " NETWORK CRC CHECKSUM FAILED\n")); 00328 } 00329 00330 ACE_ERROR ((LM_ERROR, 00331 "Message was received from [%s:%s:%d] \n", 00332 from.get_host_name (), 00333 from.get_host_addr (), 00334 from.get_port_number())); 00335 00336 ACE_ERROR ((LM_ERROR, 00337 "Num errors = %d \n", 00338 ++err_count)); 00339 ACE_ERROR ((LM_ERROR, 00340 "This is a bad thing. Attempting to ignore ..\n")); 00341 00342 return 0; 00343 } 00344 00345 // Process received data. 00346 if (header.fragment_count == 1) 00347 { 00348 // Update <request_map_> to mark this request as completed. (Not 00349 // needed if we don't care about duplicates.) 00350 int result = this->mark_received (from, header.request_id); 00351 if (result != 1) 00352 return result; 00353 00354 TAO_InputCDR cdr (data_buf, header.request_size, header.byte_order); 00355 if (cdr_processor->decode (cdr) == -1) 00356 return -1; 00357 else 00358 return 1; 00359 } 00360 00361 return this->process_fragment (from, header, data_buf, cdr_processor); 00362 } |
|
Definition at line 39 of file ECG_CDR_Message_Receiver.i. References ignore_from_, and TAO_ECG_Refcounted_Endpoint. Referenced by TAO_ECG_UDP_Receiver::init().
00041 { 00042 this->ignore_from_ = ignore_from; 00043 00044 // if (lock) 00045 // { 00046 // delete this->lock_; 00047 // this->lock_ = lock; 00048 // } 00049 } |
|
Returns 1 on success, 0 if has already been received or is below current request range, and -1 on error. Definition at line 365 of file ECG_CDR_Message_Receiver.cpp. References ACE_DEBUG, ACE_ERROR_RETURN, get_source_entry(), LM_ERROR, LM_INFO, LM_WARNING, and Request_Completed_. Referenced by handle_input().
00367 { 00368 // ACE_GUARD_RETURN (ACE_Lock, guard, *this->lock_, -1); 00369 00370 Request_Map::ENTRY * entry = this->get_source_entry (from); 00371 if (!entry) 00372 return -1; 00373 00374 TAO_ECG_UDP_Request_Entry ** request = 00375 entry->int_id_->get_request (request_id); 00376 00377 if (request == 0) 00378 { 00379 ACE_DEBUG ((LM_WARNING, "Received mcast request with sequence" 00380 "below currently expected range.\n")); 00381 return 0; 00382 } 00383 if (*request == &Request_Completed_) 00384 { 00385 ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment. " 00386 "(Request already complete).\n")); 00387 return 0; 00388 } 00389 if (*request != 0) 00390 { 00391 ACE_ERROR_RETURN ((LM_ERROR, "Inconsistent fragments for " 00392 "mcast request.\n"), 00393 -1); 00394 } 00395 00396 *request = &Request_Completed_; 00397 return 1; 00398 } |
|
Returns 1 if complete request is received and is populated, 0 if request has only partially been received or is a duplicate, and -1 on error. Definition at line 401 of file ECG_CDR_Message_Receiver.cpp. References ACE_DEBUG, ACE_ERROR_RETURN, ACE_NEW_RETURN, TAO_ECG_CDR_Message_Receiver::Mcast_Header::byte_order, TAO_ECG_CDR_Processor::decode(), TAO_ECG_CDR_Message_Receiver::Mcast_Header::fragment_count, TAO_ECG_CDR_Message_Receiver::Mcast_Header::fragment_id, TAO_ECG_CDR_Message_Receiver::Mcast_Header::fragment_offset, TAO_ECG_CDR_Message_Receiver::Mcast_Header::fragment_size, get_source_entry(), LM_ERROR, LM_INFO, LM_WARNING, ACE_OS::memcpy(), Request_Completed_, TAO_ECG_CDR_Message_Receiver::Mcast_Header::request_id, and TAO_ECG_CDR_Message_Receiver::Mcast_Header::request_size. Referenced by handle_input().
00406 { 00407 // ACE_GUARD_RETURN (ACE_Lock, guard, *this->lock_, -1); 00408 00409 Request_Map::ENTRY * source_entry = this->get_source_entry (from); 00410 if (!source_entry) 00411 return -1; 00412 00413 TAO_ECG_UDP_Request_Entry ** request = 00414 source_entry->int_id_->get_request (header.request_id); 00415 00416 if (request == 0) 00417 { 00418 ACE_DEBUG ((LM_WARNING, "Received mcast request with sequence " 00419 "below currently expected range.\n")); 00420 return 0; 00421 } 00422 if (*request == &Request_Completed_) 00423 { 00424 ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment. " 00425 "(Request already complete).\n")); 00426 return 0; 00427 } 00428 if (*request == 0) 00429 // Entry for this request has not yet been allocated. 00430 { 00431 ACE_NEW_RETURN (*request, 00432 TAO_ECG_UDP_Request_Entry (header.byte_order, 00433 header.request_id, 00434 header.request_size, 00435 header.fragment_count), 00436 -1); 00437 } 00438 00439 // Validate the fragment. 00440 if ((*request)->validate_fragment (header.byte_order, 00441 header.request_size, 00442 header.fragment_size, 00443 header.fragment_offset, 00444 header.fragment_id, 00445 header.fragment_count) == 0) 00446 { 00447 ACE_ERROR_RETURN ((LM_ERROR, 00448 "Received invalid mcast fragment.\n"), 00449 -1); 00450 } 00451 00452 // Check whether this fragment was already received. 00453 if ((*request)->test_received (header.fragment_id) == 1) 00454 { 00455 ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment.\n")); 00456 return 0; 00457 } 00458 00459 // Add the fragment to the request entry. 00460 (*request)->mark_received (header.fragment_id); 00461 ACE_OS::memcpy ((*request)->fragment_buffer (header.fragment_offset), 00462 data_buf, 00463 header.fragment_size); 00464 00465 // The request is not yet complete. 00466 if (!(*request)->complete ()) 00467 { 00468 return 0; 00469 } 00470 00471 // The request is complete - decode it. 00472 TAO_InputCDR cdr ((*request)->fragment_buffer (0), 00473 header.request_size, 00474 header.byte_order); 00475 00476 if (cdr_processor->decode (cdr) == -1) 00477 return -1; 00478 00479 delete *request; 00480 *request = &Request_Completed_; 00481 return 1; 00482 } |
|
Definition at line 513 of file ECG_CDR_Message_Receiver.cpp. References ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_LOCK >::begin(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_LOCK >::end(), ignore_from_, request_map_, and ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::reset(). Referenced by TAO_ECG_UDP_Receiver::shutdown(), and ~TAO_ECG_CDR_Message_Receiver().
00514 { 00515 // ACE_GUARD (ACE_Lock, guard, *this->lock_); 00516 00517 Request_Map::iterator end = this->request_map_.end (); 00518 for (Request_Map::iterator i = this->request_map_.begin (); 00519 i != end; 00520 ++i) 00521 { 00522 delete (*i).int_id_; 00523 (*i).int_id_ = 0; 00524 } 00525 00526 this->ignore_from_.reset (); 00527 } |
|
Flag to indicate whether CRC should be computed and checked.
Definition at line 235 of file ECG_CDR_Message_Receiver.h. Referenced by handle_input(), and TAO_ECG_CDR_Message_Receiver(). |
|
Ignore any events coming from this IP address.
Definition at line 216 of file ECG_CDR_Message_Receiver.h. Referenced by handle_input(), init(), shutdown(), and TAO_ECG_CDR_Message_Receiver(). |
|
Serializes use of . Size of a fragmented requests array, i.e., max number of partially received requests kept at any given time per source. Definition at line 227 of file ECG_CDR_Message_Receiver.h. Referenced by TAO_ECG_CDR_Message_Receiver(). |
|
Minimum number of requests purged from a fragmented requests array when the range of requests represented there needs to be shifted. Definition at line 232 of file ECG_CDR_Message_Receiver.h. Referenced by TAO_ECG_CDR_Message_Receiver(). |
|
Represents any request that has been fully received and serviced, to simplify the internal logic. Referenced by mark_received(), process_fragment(), TAO_ECG_CDR_Message_Receiver::Requests::purge_requests(), and TAO_ECG_CDR_Message_Receiver::Requests::~Requests(). |
|
The map containing all the incoming requests which have been partially received. Definition at line 220 of file ECG_CDR_Message_Receiver.h. Referenced by get_source_entry(), shutdown(), and TAO_ECG_CDR_Message_Receiver(). |