#include <ECG_CDR_Message_Sender.h>
Collaboration diagram for TAO_ECG_CDR_Message_Sender:
Public Types | |
ECG_HEADER_SIZE = 32 | |
ECG_MIN_MTU = 32 + 8 | |
ECG_MAX_MTU = 65536 | |
ECG_DEFAULT_MTU = 1024 | |
enum | { ECG_HEADER_SIZE = 32, ECG_MIN_MTU = 32 + 8, ECG_MAX_MTU = 65536, ECG_DEFAULT_MTU = 1024 } |
Public Member Functions | |
void | send_message (const TAO_OutputCDR &cdr, const ACE_INET_Addr &addr) |
The main method - send a CDR message. | |
TAO_ECG_CDR_Message_Sender (CORBA::Boolean crc=0) | |
Initialization and termination methods. | |
void | init (TAO_ECG_Refcounted_Endpoint endpoint_rptr) |
Set the endpoint for sending messages. | |
void | shutdown (void) |
Initialization and termination methods. | |
int | get_local_addr (ACE_INET_Addr &addr) |
Get the local endpoint used to send the events. | |
int | mtu (CORBA::ULong mtu) |
CORBA::ULong | mtu (void) const |
Get the local endpoint used to send the events. | |
Private Member Functions | |
ACE_SOCK_Dgram & | dgram (void) |
Return the datagram... | |
void | send_fragment (const ACE_INET_Addr &addr, CORBA::ULong request_id, CORBA::ULong request_size, CORBA::ULong fragment_size, CORBA::ULong fragment_offset, CORBA::ULong fragment_id, CORBA::ULong fragment_count, iovec iov[], int iovcnt) |
CORBA::ULong | compute_fragment_count (const ACE_Message_Block *begin, const ACE_Message_Block *end, int iov_size, CORBA::ULong max_fragment_payload, CORBA::ULong &total_length) |
Private Attributes | |
TAO_ECG_Refcounted_Endpoint | endpoint_rptr_ |
The datagram used for sendto (). | |
CORBA::ULong | mtu_ |
The MTU for this sender... | |
CORBA::Boolean | checksum_ |
Should crc checksum be caluclated and sent? |
This class breaks up a CDR message into fragments and sends each fragment with a header (described below) using UDP. The UDP address can be a normal IP address or it can be a multicast group. The UDP address is obtained from a RtecUDPAdmin::AddrServer class.
This class is used by various Gateway (Senders/Receivers) classes responsible for federating Event Channels with UDP/Mcast.
Message header are encapsulated using CDR, with the following format: struct Header { octet byte_order_flags; // bit 0 represents the byte order as in GIOP 1.1 // bit 1 is set if this is the last fragment unsigned long request_id; // The request ID, senders must not send two requests with // the same ID, senders can be distinguished using recvfrom.. unsigned long request_size; // The size of this request, this can be used to pre-allocate // the request buffer. unsgined long fragment_size; // The size of this fragment, excluding the header... unsigned long fragment_offset; // Where does this fragment fit in the complete message... unsigned long fragment_id; // The ID of this fragment... unsigned long fragment_count; // The total number of fragments to expect in this request
//
Definition at line 75 of file ECG_CDR_Message_Sender.h.
anonymous enum |
Definition at line 79 of file ECG_CDR_Message_Sender.h.
00079 { 00080 ECG_HEADER_SIZE = 32, 00081 ECG_MIN_MTU = 32 + 8, 00082 ECG_MAX_MTU = 65536, // Really optimistic... 00083 ECG_DEFAULT_MTU = 1024 00084 };
TAO_ECG_CDR_Message_Sender::TAO_ECG_CDR_Message_Sender | ( | CORBA::Boolean | crc = 0 |
) |
Initialization and termination methods.
CORBA::ULong TAO_ECG_CDR_Message_Sender::compute_fragment_count | ( | const ACE_Message_Block * | begin, | |
const ACE_Message_Block * | end, | |||
int | iov_size, | |||
CORBA::ULong | max_fragment_payload, | |||
CORBA::ULong & | total_length | |||
) | [private] |
Count the number of fragments that will be required to send the message blocks in the range [begin,end) The maximum fragment payload (i.e. the size without the header is also required); <total_length> returns the total message size.
Definition at line 274 of file ECG_CDR_Message_Sender.cpp.
References ACE_Message_Block::cont().
Referenced by send_message().
00279 { 00280 CORBA::ULong fragment_count = 0; 00281 total_length = 0; 00282 00283 CORBA::ULong fragment_size = 0; 00284 // Reserve the first iovec for the header... 00285 int iovcnt = 1; 00286 for (const ACE_Message_Block* b = begin; 00287 b != end; 00288 b = b->cont ()) 00289 { 00290 CORBA::ULong l = b->length (); 00291 total_length += l; 00292 fragment_size += l; 00293 ++iovcnt; 00294 while (fragment_size > max_fragment_payload) 00295 { 00296 // Ran out of space, must create a fragment... 00297 ++fragment_count; 00298 00299 // The next iovector will contain what remains of this 00300 // buffer, but also consider 00301 iovcnt = 2; 00302 l -= max_fragment_payload - (fragment_size - l); 00303 fragment_size = l; 00304 } 00305 if (fragment_size == max_fragment_payload) 00306 { 00307 ++fragment_count; 00308 iovcnt = 1; 00309 fragment_size = 0; 00310 } 00311 if (iovcnt >= iov_size) 00312 { 00313 // Ran out of space in the iovector.... 00314 ++fragment_count; 00315 iovcnt = 1; 00316 fragment_size = 0; 00317 } 00318 } 00319 if (iovcnt != 1) 00320 { 00321 // Send the remaining data in another fragment 00322 ++fragment_count; 00323 } 00324 return fragment_count; 00325 }
ACE_SOCK_Dgram& TAO_ECG_CDR_Message_Sender::dgram | ( | void | ) | [private] |
int TAO_ECG_CDR_Message_Sender::get_local_addr | ( | ACE_INET_Addr & | addr | ) |
Get the local endpoint used to send the events.
TAO_BEGIN_VERSIONED_NAMESPACE_DECL void TAO_ECG_CDR_Message_Sender::init | ( | TAO_ECG_Refcounted_Endpoint | endpoint_rptr | ) |
Set the endpoint for sending messages.
If init () is successful, shutdown () must be called when the sender is no longer needed. If shutdown () is not called by the user, cleanup activities will be performed by the destructor.
Definition at line 18 of file ECG_CDR_Message_Sender.cpp.
References ACE_ERROR, and LM_ERROR.
Referenced by TAO_ECG_UDP_Sender::init().
00020 { 00021 if (endpoint_rptr.get () == 0 00022 || endpoint_rptr->dgram ().get_handle () == ACE_INVALID_HANDLE) 00023 { 00024 ACE_ERROR ((LM_ERROR, "TAO_ECG_CDR_Message_Sender::init(): " 00025 "nil or unitialized endpoint argument.\n")); 00026 throw CORBA::INTERNAL (); 00027 } 00028 00029 this->endpoint_rptr_ = endpoint_rptr; 00030 }
CORBA::ULong TAO_ECG_CDR_Message_Sender::mtu | ( | void | ) | const |
int TAO_ECG_CDR_Message_Sender::mtu | ( | CORBA::ULong | mtu | ) |
The sender may need to fragment the message, otherwise the network may drop the packets. Setting the MTU can fail if the value is too small (at least the header + 8 bytes must fit).
void TAO_ECG_CDR_Message_Sender::send_fragment | ( | const ACE_INET_Addr & | addr, | |
CORBA::ULong | request_id, | |||
CORBA::ULong | request_size, | |||
CORBA::ULong | fragment_size, | |||
CORBA::ULong | fragment_offset, | |||
CORBA::ULong | fragment_id, | |||
CORBA::ULong | fragment_count, | |||
iovec | iov[], | |||
int | iovcnt | |||
) | [private] |
Send one fragment, the first entry in the iovec is used to send the header, the rest of the iovec array should contain pointers to the actual data.
Definition at line 180 of file ECG_CDR_Message_Sender.cpp.
References ACE_DEBUG, ACE_ERROR, ACE_HTONL, checksum_, ACE::crc32(), dgram(), ECG_HEADER_SIZE, LM_ERROR, LM_WARNING, ACE_CDR::MAX_ALIGNMENT, ACE_SOCK_Dgram::send(), and TAO_ENCAP_BYTE_ORDER.
Referenced by send_message().
00189 { 00190 CORBA::ULong header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE 00191 / sizeof(CORBA::ULong) 00192 + ACE_CDR::MAX_ALIGNMENT]; 00193 char* buf = reinterpret_cast<char*> (header); 00194 TAO_OutputCDR cdr (buf, sizeof(header)); 00195 cdr.write_boolean (TAO_ENCAP_BYTE_ORDER); 00196 // Insert some known values in the padding bytes, so we can smoke 00197 // test the message on the receiving end. 00198 cdr.write_octet ('A'); cdr.write_octet ('B'); cdr.write_octet ('C'); 00199 cdr.write_ulong (request_id); 00200 cdr.write_ulong (request_size); 00201 cdr.write_ulong (fragment_size); 00202 cdr.write_ulong (fragment_offset); 00203 cdr.write_ulong (fragment_id); 00204 cdr.write_ulong (fragment_count); 00205 CORBA::Octet padding[4]; 00206 00207 00208 // MRH 00209 if (checksum_) 00210 { 00211 // Compute CRC 00212 iov[0].iov_base = cdr.begin ()->rd_ptr (); 00213 iov[0].iov_len = cdr.begin ()->length (); 00214 unsigned int crc = 0; 00215 unsigned char *crc_parts = (unsigned char *)(&crc); 00216 if (iovcnt > 1) 00217 { 00218 crc = ACE::crc32 (iov, iovcnt); 00219 crc = ACE_HTONL (crc); 00220 } 00221 for (int cnt=0; cnt<4; ++cnt) 00222 { 00223 padding[cnt] = crc_parts[cnt]; 00224 } 00225 } 00226 else 00227 { 00228 for (int cnt=0; cnt<4; ++cnt) 00229 { 00230 padding[cnt] = 0; 00231 } 00232 } 00233 //End MRH 00234 cdr.write_octet_array (padding, 4); 00235 00236 iov[0].iov_base = cdr.begin ()->rd_ptr (); 00237 iov[0].iov_len = cdr.begin ()->length (); 00238 00239 ssize_t n = this->dgram ().send (iov, 00240 iovcnt, 00241 addr); 00242 size_t expected_n = 0; 00243 for (int i = 0; i < iovcnt; ++i) 00244 expected_n += iov[i].iov_len; 00245 if (n > 0 && size_t(n) != expected_n) 00246 { 00247 ACE_ERROR ((LM_ERROR, ("Sent only %d out of %d bytes " 00248 "for mcast fragment.\n"), 00249 n, 00250 expected_n)); 00251 } 00252 00253 if (n == -1) 00254 { 00255 if (errno == EWOULDBLOCK) 00256 { 00257 ACE_ERROR ((LM_ERROR, "Send of mcast fragment failed (%m).\n")); 00258 // @@ TODO Use a Event Channel specific exception 00259 throw CORBA::COMM_FAILURE (); 00260 } 00261 else 00262 { 00263 ACE_DEBUG ((LM_WARNING, "Send of mcast fragment blocked (%m).\n")); 00264 } 00265 } 00266 else if (n == 0) 00267 { 00268 ACE_DEBUG ((LM_WARNING, "EOF on send of mcast fragment (%m).\n")); 00269 } 00270 }
void TAO_ECG_CDR_Message_Sender::send_message | ( | const TAO_OutputCDR & | cdr, | |
const ACE_INET_Addr & | addr | |||
) |
The main method - send a CDR message.
Definition at line 33 of file ECG_CDR_Message_Sender.cpp.
References ACE_ERROR, ACE_IOV_MAX, ACE_OutputCDR::begin(), compute_fragment_count(), ACE_Message_Block::cont(), ECG_HEADER_SIZE, ACE_OutputCDR::end(), endpoint_rptr_, LM_ERROR, mtu(), and send_fragment().
Referenced by TAO_ECG_UDP_Sender::push().
00035 { 00036 if (this->endpoint_rptr_.get () == 0) 00037 { 00038 ACE_ERROR ((LM_ERROR, "Attempt to invoke send_message() " 00039 "on non-initialized sender object.\n")); 00040 throw CORBA::INTERNAL (); 00041 } 00042 00043 CORBA::ULong max_fragment_payload = this->mtu () - 00044 TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE; 00045 // ACE_ASSERT (max_fragment_payload != 0); 00046 00047 #if defined (ACE_HAS_BROKEN_DGRAM_SENDV) 00048 const int TAO_WRITEV_MAX = ACE_IOV_MAX - 1; 00049 #else 00050 const int TAO_WRITEV_MAX = ACE_IOV_MAX; 00051 #endif /* ACE_HAS_BROKEN_DGRAM_SENDV */ 00052 iovec iov[TAO_WRITEV_MAX]; 00053 00054 CORBA::ULong total_length; 00055 CORBA::ULong fragment_count = 00056 this->compute_fragment_count (cdr.begin (), 00057 cdr.end (), 00058 TAO_WRITEV_MAX, 00059 max_fragment_payload, 00060 total_length); 00061 00062 CORBA::ULong request_id = this->endpoint_rptr_->next_request_id (); 00063 00064 // Reserve the first iovec for the header... 00065 int iovcnt = 1; 00066 CORBA::ULong fragment_id = 0; 00067 CORBA::ULong fragment_offset = 0; 00068 CORBA::ULong fragment_size = 0; 00069 for (const ACE_Message_Block* b = cdr.begin (); 00070 b != cdr.end (); 00071 b = b->cont ()) 00072 { 00073 CORBA::ULong l = b->length (); 00074 00075 char* rd_ptr = b->rd_ptr (); 00076 00077 iov[iovcnt].iov_base = rd_ptr; 00078 iov[iovcnt].iov_len = l; 00079 fragment_size += l; 00080 ++iovcnt; 00081 while (fragment_size > max_fragment_payload) 00082 { 00083 // This fragment is full, we have to send it... 00084 00085 // First adjust the last iov entry: 00086 CORBA::ULong last_mb_length = 00087 max_fragment_payload - (fragment_size - l); 00088 iov[iovcnt - 1].iov_len = last_mb_length; 00089 00090 this->send_fragment (addr, 00091 request_id, 00092 total_length, 00093 max_fragment_payload, 00094 fragment_offset, 00095 fragment_id, 00096 fragment_count, 00097 iov, 00098 iovcnt); 00099 ++fragment_id; 00100 fragment_offset += max_fragment_payload; 00101 00102 // Reset, but don't forget that the last Message_Block 00103 // may need to be sent in multiple fragments.. 00104 l -= last_mb_length; 00105 rd_ptr += last_mb_length; 00106 iov[1].iov_base = rd_ptr; 00107 iov[1].iov_len = l; 00108 fragment_size = l; 00109 iovcnt = 2; 00110 } 00111 if (fragment_size == max_fragment_payload) 00112 { 00113 // We filled a fragment, but this time it was filled 00114 // exactly, the treatment is a little different from the 00115 // loop above... 00116 this->send_fragment (addr, 00117 request_id, 00118 total_length, 00119 max_fragment_payload, 00120 fragment_offset, 00121 fragment_id, 00122 fragment_count, 00123 iov, 00124 iovcnt); 00125 ++fragment_id; 00126 fragment_offset += max_fragment_payload; 00127 00128 iovcnt = 1; 00129 fragment_size = 0; 00130 } 00131 if (iovcnt == TAO_WRITEV_MAX) 00132 { 00133 // Now we ran out of space in the iovec, we must send a 00134 // fragment to work around that.... 00135 this->send_fragment (addr, 00136 request_id, 00137 total_length, 00138 fragment_size, 00139 fragment_offset, 00140 fragment_id, 00141 fragment_count, 00142 iov, 00143 iovcnt); 00144 ++fragment_id; 00145 fragment_offset += fragment_size; 00146 00147 iovcnt = 1; 00148 fragment_size = 0; 00149 } 00150 } 00151 // There is something left in the iovvec that we must send 00152 // also... 00153 if (iovcnt != 1) 00154 { 00155 // Now we ran out of space in the iovec, we must send a 00156 // fragment to work around that.... 00157 this->send_fragment (addr, 00158 request_id, 00159 total_length, 00160 fragment_size, 00161 fragment_offset, 00162 fragment_id, 00163 fragment_count, 00164 iov, 00165 iovcnt); 00166 ++fragment_id; 00167 fragment_offset += fragment_size; 00168 00169 // reset, not needed here... 00170 // iovcnt = 1; 00171 // fragment_size = 0; 00172 } 00173 // ACE_ASSERT (total_length == fragment_offset); 00174 // ACE_ASSERT (fragment_id == fragment_count); 00175 00176 }
void TAO_ECG_CDR_Message_Sender::shutdown | ( | void | ) |
Should crc checksum be caluclated and sent?
Definition at line 173 of file ECG_CDR_Message_Sender.h.
Referenced by send_fragment().
The datagram used for sendto ().
Definition at line 167 of file ECG_CDR_Message_Sender.h.
Referenced by send_message().
CORBA::ULong TAO_ECG_CDR_Message_Sender::mtu_ [private] |