#include <ECG_CDR_Message_Receiver.h>
Collaboration diagram for TAO_ECG_CDR_Message_Receiver:
This class receives UDP and Multicast message fragments, assembles them (described in detail below), and passes complete messages in the form of cdr streams to the calling classes.
This class is used by various Gateway classes (Senders/Receivers) responsible for federating Event Channels with UDP/Mcast.
= REASSEMBLY Fragmentation is described in ECG_CDR_Message_Sender.h Whenever an incomplete fragment is received (one with fragment_count > 1) we allocate an entry for the message in an map indexed by (host,port,request_id). The entry contains the buffer, a bit vector to keep track of the fragments received so far, and a timeout counter. This timeout counter is set to 0 on each (new) fragment arrival, and incremented on a regular basis. If the counter reaches a maximum value the message is dropped. Once all the fragments have been received the message is sent up to the calling classes, and the memory reclaimed.
Definition at line 143 of file ECG_CDR_Message_Receiver.h.
|
Definition at line 192 of file ECG_CDR_Message_Receiver.h. |
|
Definition at line 182 of file ECG_CDR_Message_Receiver.h.
00182 { 00183 ECG_DEFAULT_MAX_FRAGMENTED_REQUESTS = 1024, 00184 ECG_DEFAULT_FRAGMENTED_REQUESTS_MIN_PURGE_COUNT = 32 00185 }; |
|
|
|
|
|
Definition at line 484 of file ECG_CDR_Message_Receiver.cpp. References ACE_ERROR_RETURN, ACE_NEW_RETURN, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_LOCK >::bind(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_LOCK >::find(), TAO_ECG_CDR_Message_Receiver::Requests::init(), LM_ERROR, ACE_Auto_Basic_Ptr< X >::release(), and request_map_. Referenced by mark_received(), and process_fragment().
00485 { 00486 // Get the entry for <from> from the <request_map_>. 00487 Request_Map::ENTRY * entry = 0; 00488 00489 if (this->request_map_.find (from, entry) == -1) 00490 { 00491 // Create an entry if one doesn't exist. 00492 Requests *requests = 0; 00493 ACE_NEW_RETURN (requests, 00494 Requests, 00495 0); 00496 auto_ptr<Requests> requests_aptr (requests); 00497 00498 if (requests->init (this->max_requests_, this->min_purge_count_) == -1 00499 || this->request_map_.bind (from, requests, entry) == -1) 00500 { 00501 ACE_ERROR_RETURN ((LM_ERROR, "Unable to create hash map " 00502 "entry for a new request.\n"), 00503 0); 00504 } 00505 requests_aptr.release (); 00506 } 00507 00508 return entry; 00509 } |
|
Returns 1 if data was read successfully and accepted by without errors. Returns 0 if there were no errors, but no data has been passed to , either due to request being incomplete (not all fragments received), or it being a duplicate. Returns -1 if there were errors. Definition at line 241 of file ECG_CDR_Message_Receiver.cpp. References ACE_ERROR, ACE_ERROR_RETURN, ACE_MAX_DGRAM_SIZE, ACE_ptr_align_binary(), TAO_ECG_CDR_Message_Receiver::Mcast_Header::byte_order, check_crc_, TAO_ECG_CDR_Message_Receiver::Mcast_Header::crc, ACE::crc32(), TAO_ECG_CDR_Processor::decode(), EWOULDBLOCK, TAO_ECG_CDR_Message_Receiver::Mcast_Header::fragment_count, ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::get(), ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_host_name(), ACE_INET_Addr::get_port_number(), ignore_from_, LM_ERROR, mark_received(), process_fragment(), TAO_ECG_CDR_Message_Receiver::Mcast_Header::read(), ACE_SOCK_Dgram::recv(), TAO_ECG_CDR_Message_Receiver::Mcast_Header::request_id, TAO_ECG_CDR_Message_Receiver::Mcast_Header::request_size, and ssize_t. Referenced by TAO_ECG_UDP_Receiver::handle_input().
00244 { 00245 char nonaligned_header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE 00246 + ACE_CDR::MAX_ALIGNMENT]; 00247 char *header_buf = ACE_ptr_align_binary (nonaligned_header, 00248 ACE_CDR::MAX_ALIGNMENT); 00249 char nonaligned_data[ACE_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT]; 00250 char *data_buf = ACE_ptr_align_binary (nonaligned_data, 00251 ACE_CDR::MAX_ALIGNMENT); 00252 00253 // Read the message from dgram. 00254 00255 const int iovcnt = 2; 00256 iovec iov[iovcnt]; 00257 iov[0].iov_base = header_buf; 00258 iov[0].iov_len = TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE; 00259 iov[1].iov_base = data_buf; 00260 iov[1].iov_len = ACE_MAX_DGRAM_SIZE; 00261 00262 ACE_INET_Addr from; 00263 ssize_t n = dgram.recv (iov, iovcnt, from); 00264 00265 if (n == -1) 00266 { 00267 if (errno == EWOULDBLOCK) 00268 return 0; 00269 00270 ACE_ERROR_RETURN ((LM_ERROR, "Error reading mcast fragment (%m).\n"), 00271 -1); 00272 } 00273 00274 if (n == 0) 00275 { 00276 ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: " 00277 "read 0 bytes from socket.\n"), 00278 0); 00279 } 00280 00281 if (n < TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE) 00282 { 00283 ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: " 00284 "# of bytes read < mcast header size.\n"), 00285 -1); 00286 } 00287 00288 u_int crc = 0; 00289 00290 if (this->check_crc_) 00291 { 00292 iov[1].iov_len = n - iov[0].iov_len; 00293 iov[0].iov_len -= 4; // don't include crc 00294 00295 crc = ACE::crc32 (iov, 2); 00296 } 00297 // Check whether the message is a loopback message. 00298 if (this->ignore_from_.get () != 0 00299 && this->ignore_from_->is_loopback (from)) 00300 { 00301 return 0; 00302 } 00303 00304 // Decode and validate mcast header. 00305 Mcast_Header header; 00306 if (header.read (header_buf, n, this->check_crc_) == -1) 00307 return -1; 00308 00309 if ( this->check_crc_ && header.crc != crc) 00310 { 00311 static unsigned int err_count = 0; 00312 ACE_ERROR ((LM_ERROR, 00313 "******************************\n")); 00314 00315 ACE_ERROR ((LM_ERROR, 00316 "ERROR DETECTED \n")); 00317 00318 if (crc == 0) 00319 { 00320 ACE_ERROR ((LM_ERROR, 00321 "Sending process may not have computed CRC \n")); 00322 } 00323 else 00324 { 00325 ACE_ERROR ((LM_ERROR, 00326 " NETWORK CRC CHECKSUM FAILED\n")); 00327 } 00328 00329 ACE_ERROR ((LM_ERROR, 00330 "Message was received from [%s:%s:%d] \n", 00331 from.get_host_name (), 00332 from.get_host_addr (), 00333 from.get_port_number())); 00334 00335 ACE_ERROR ((LM_ERROR, 00336 "Num errors = %d \n", 00337 ++err_count)); 00338 ACE_ERROR ((LM_ERROR, 00339 "This is a bad thing. Attempting to ignore ..\n")); 00340 00341 return 0; 00342 } 00343 00344 // Process received data. 00345 if (header.fragment_count == 1) 00346 { 00347 // Update <request_map_> to mark this request as completed. (Not 00348 // needed if we don't care about duplicates.) 00349 int const result = this->mark_received (from, header.request_id); 00350 if (result != 1) 00351 return result; 00352 00353 TAO_InputCDR cdr (data_buf, header.request_size, header.byte_order); 00354 if (cdr_processor->decode (cdr) == -1) 00355 return -1; 00356 else 00357 return 1; 00358 } 00359 00360 return this->process_fragment (from, header, data_buf, cdr_processor); 00361 } |
|
Referenced by TAO_ECG_UDP_Receiver::init(). |
|
Returns 1 on success, 0 if has already been received or is below current request range, and -1 on error. Definition at line 364 of file ECG_CDR_Message_Receiver.cpp. References ACE_DEBUG, ACE_ERROR_RETURN, get_source_entry(), LM_ERROR, LM_INFO, LM_WARNING, and Request_Completed_. Referenced by handle_input().
00366 { 00367 // ACE_GUARD_RETURN (ACE_Lock, guard, *this->lock_, -1); 00368 00369 Request_Map::ENTRY * entry = this->get_source_entry (from); 00370 if (!entry) 00371 return -1; 00372 00373 TAO_ECG_UDP_Request_Entry ** request = 00374 entry->int_id_->get_request (request_id); 00375 00376 if (request == 0) 00377 { 00378 ACE_DEBUG ((LM_WARNING, "Received mcast request with sequence" 00379 "below currently expected range.\n")); 00380 return 0; 00381 } 00382 if (*request == &Request_Completed_) 00383 { 00384 ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment. " 00385 "(Request already complete).\n")); 00386 return 0; 00387 } 00388 if (*request != 0) 00389 { 00390 ACE_ERROR_RETURN ((LM_ERROR, "Inconsistent fragments for " 00391 "mcast request.\n"), 00392 -1); 00393 } 00394 00395 *request = &Request_Completed_; 00396 return 1; 00397 } |
|
Returns 1 if complete request is received and is populated, 0 if request has only partially been received or is a duplicate, and -1 on error. Definition at line 400 of file ECG_CDR_Message_Receiver.cpp. References ACE_DEBUG, ACE_ERROR_RETURN, ACE_NEW_RETURN, TAO_ECG_CDR_Message_Receiver::Mcast_Header::byte_order, TAO_ECG_CDR_Processor::decode(), TAO_ECG_CDR_Message_Receiver::Mcast_Header::fragment_count, TAO_ECG_CDR_Message_Receiver::Mcast_Header::fragment_id, TAO_ECG_CDR_Message_Receiver::Mcast_Header::fragment_offset, TAO_ECG_CDR_Message_Receiver::Mcast_Header::fragment_size, get_source_entry(), LM_ERROR, LM_INFO, LM_WARNING, ACE_OS::memcpy(), Request_Completed_, TAO_ECG_CDR_Message_Receiver::Mcast_Header::request_id, and TAO_ECG_CDR_Message_Receiver::Mcast_Header::request_size. Referenced by handle_input().
00405 { 00406 // ACE_GUARD_RETURN (ACE_Lock, guard, *this->lock_, -1); 00407 00408 Request_Map::ENTRY * source_entry = this->get_source_entry (from); 00409 if (!source_entry) 00410 return -1; 00411 00412 TAO_ECG_UDP_Request_Entry ** request = 00413 source_entry->int_id_->get_request (header.request_id); 00414 00415 if (request == 0) 00416 { 00417 ACE_DEBUG ((LM_WARNING, "Received mcast request with sequence " 00418 "below currently expected range.\n")); 00419 return 0; 00420 } 00421 if (*request == &Request_Completed_) 00422 { 00423 ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment. " 00424 "(Request already complete).\n")); 00425 return 0; 00426 } 00427 if (*request == 0) 00428 // Entry for this request has not yet been allocated. 00429 { 00430 ACE_NEW_RETURN (*request, 00431 TAO_ECG_UDP_Request_Entry (header.byte_order, 00432 header.request_id, 00433 header.request_size, 00434 header.fragment_count), 00435 -1); 00436 } 00437 00438 // Validate the fragment. 00439 if ((*request)->validate_fragment (header.byte_order, 00440 header.request_size, 00441 header.fragment_size, 00442 header.fragment_offset, 00443 header.fragment_id, 00444 header.fragment_count) == 0) 00445 { 00446 ACE_ERROR_RETURN ((LM_ERROR, 00447 "Received invalid mcast fragment.\n"), 00448 -1); 00449 } 00450 00451 // Check whether this fragment was already received. 00452 if ((*request)->test_received (header.fragment_id) == 1) 00453 { 00454 ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment.\n")); 00455 return 0; 00456 } 00457 00458 // Add the fragment to the request entry. 00459 (*request)->mark_received (header.fragment_id); 00460 ACE_OS::memcpy ((*request)->fragment_buffer (header.fragment_offset), 00461 data_buf, 00462 header.fragment_size); 00463 00464 // The request is not yet complete. 00465 if (!(*request)->complete ()) 00466 { 00467 return 0; 00468 } 00469 00470 // The request is complete - decode it. 00471 TAO_InputCDR cdr ((*request)->fragment_buffer (0), 00472 header.request_size, 00473 header.byte_order); 00474 00475 if (cdr_processor->decode (cdr) == -1) 00476 return -1; 00477 00478 delete *request; 00479 *request = &Request_Completed_; 00480 return 1; 00481 } |
|
Definition at line 512 of file ECG_CDR_Message_Receiver.cpp. References ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_LOCK >::begin(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_LOCK >::end(), ignore_from_, request_map_, and ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::reset(). Referenced by TAO_ECG_UDP_Receiver::shutdown().
00513 { 00514 // ACE_GUARD (ACE_Lock, guard, *this->lock_); 00515 00516 Request_Map::iterator end = this->request_map_.end (); 00517 for (Request_Map::iterator i = this->request_map_.begin (); 00518 i != end; 00519 ++i) 00520 { 00521 delete (*i).int_id_; 00522 (*i).int_id_ = 0; 00523 } 00524 00525 this->ignore_from_.reset (); 00526 } |
|
Flag to indicate whether CRC should be computed and checked.
Definition at line 234 of file ECG_CDR_Message_Receiver.h. Referenced by handle_input(). |
|
Ignore any events coming from this IP address.
Definition at line 215 of file ECG_CDR_Message_Receiver.h. Referenced by handle_input(), and shutdown(). |
|
Serializes use of . Size of a fragmented requests array, i.e., max number of partially received requests kept at any given time per source. Definition at line 226 of file ECG_CDR_Message_Receiver.h. |
|
Minimum number of requests purged from a fragmented requests array when the range of requests represented there needs to be shifted. Definition at line 231 of file ECG_CDR_Message_Receiver.h. |
|
Represents any request that has been fully received and serviced, to simplify the internal logic. Referenced by mark_received(), process_fragment(), TAO_ECG_CDR_Message_Receiver::Requests::purge_requests(), and TAO_ECG_CDR_Message_Receiver::Requests::~Requests(). |
|
The map containing all the incoming requests which have been partially received. Definition at line 219 of file ECG_CDR_Message_Receiver.h. Referenced by get_source_entry(), and shutdown(). |