#include <ECG_CDR_Message_Sender.h>
Collaboration diagram for TAO_ECG_CDR_Message_Sender:
Public Types | |
enum | { ECG_HEADER_SIZE = 32, ECG_MIN_MTU = 32 + 8, ECG_MAX_MTU = 65536, ECG_DEFAULT_MTU = 1024 } |
Public Member Functions | |
void | send_message (const TAO_OutputCDR &cdr, const ACE_INET_Addr &addr) throw (CORBA::SystemException) |
The main method - send a CDR message. | |
TAO_ECG_CDR_Message_Sender (CORBA::Boolean crc=0) | |
void | init (TAO_ECG_Refcounted_Endpoint endpoint_rptr) throw (CORBA::SystemException) |
Set the endpoint for sending messages. | |
void | shutdown () |
int | get_local_addr (ACE_INET_Addr &addr) |
Get the local endpoint used to send the events. | |
int | mtu (CORBA::ULong mtu) |
CORBA::ULong | mtu (void) const |
Get the local endpoint used to send the events. | |
Private Member Functions | |
ACE_SOCK_Dgram & | dgram (void) |
Return the datagram... | |
void | send_fragment (const ACE_INET_Addr &addr, CORBA::ULong request_id, CORBA::ULong request_size, CORBA::ULong fragment_size, CORBA::ULong fragment_offset, CORBA::ULong fragment_id, CORBA::ULong fragment_count, iovec iov[], int iovcnt) |
CORBA::ULong | compute_fragment_count (const ACE_Message_Block *begin, const ACE_Message_Block *end, int iov_size, CORBA::ULong max_fragment_payload, CORBA::ULong &total_length) |
Private Attributes | |
TAO_ECG_Refcounted_Endpoint | endpoint_rptr_ |
The datagram used for sendto (). | |
CORBA::ULong | mtu_ |
The MTU for this sender... | |
CORBA::Boolean | checksum_ |
Should crc checksum be caluclated and sent? |
This class breaks up a CDR message into fragments and sends each fragment with a header (described below) using UDP. The UDP address can be a normal IP address or it can be a multicast group. The UDP address is obtained from a RtecUDPAdmin::AddrServer class.
This class is used by various Gateway (Senders/Receivers) classes responsible for federating Event Channels with UDP/Mcast.
Message header are encapsulated using CDR, with the following format: struct Header { octet byte_order_flags; // bit 0 represents the byte order as in GIOP 1.1 // bit 1 is set if this is the last fragment unsigned long request_id; // The request ID, senders must not send two requests with // the same ID, senders can be distinguished using recvfrom.. unsigned long request_size; // The size of this request, this can be used to pre-allocate // the request buffer. unsgined long fragment_size; // The size of this fragment, excluding the header... unsigned long fragment_offset; // Where does this fragment fit in the complete message... unsigned long fragment_id; // The ID of this fragment... unsigned long fragment_count; // The total number of fragments to expect in this request
//
Definition at line 76 of file ECG_CDR_Message_Sender.h.
|
Definition at line 80 of file ECG_CDR_Message_Sender.h.
00080 { 00081 ECG_HEADER_SIZE = 32, 00082 ECG_MIN_MTU = 32 + 8, 00083 ECG_MAX_MTU = 65536, // Really optimistic... 00084 ECG_DEFAULT_MTU = 1024 00085 }; |
|
Definition at line 10 of file ECG_CDR_Message_Sender.i.
00011 : endpoint_rptr_ () 00012 , mtu_ (TAO_ECG_CDR_Message_Sender::ECG_DEFAULT_MTU) 00013 , checksum_ (crc) 00014 { 00015 } |
|
Count the number of fragments that will be required to send the message blocks in the range [begin,end) The maximum fragment payload (i.e. the size without the header is also required); returns the total message size. Definition at line 287 of file ECG_CDR_Message_Sender.cpp. References ACE_Message_Block::cont(), and ACE_Message_Block::length().
00292 { 00293 CORBA::ULong fragment_count = 0; 00294 total_length = 0; 00295 00296 CORBA::ULong fragment_size = 0; 00297 // Reserve the first iovec for the header... 00298 int iovcnt = 1; 00299 for (const ACE_Message_Block* b = begin; 00300 b != end; 00301 b = b->cont ()) 00302 { 00303 CORBA::ULong l = b->length (); 00304 total_length += l; 00305 fragment_size += l; 00306 ++iovcnt; 00307 while (fragment_size > max_fragment_payload) 00308 { 00309 // Ran out of space, must create a fragment... 00310 ++fragment_count; 00311 00312 // The next iovector will contain what remains of this 00313 // buffer, but also consider 00314 iovcnt = 2; 00315 l -= max_fragment_payload - (fragment_size - l); 00316 fragment_size = l; 00317 } 00318 if (fragment_size == max_fragment_payload) 00319 { 00320 ++fragment_count; 00321 iovcnt = 1; 00322 fragment_size = 0; 00323 } 00324 if (iovcnt >= iov_size) 00325 { 00326 // Ran out of space in the iovector.... 00327 ++fragment_count; 00328 iovcnt = 1; 00329 fragment_size = 0; 00330 } 00331 } 00332 if (iovcnt != 1) 00333 { 00334 // Send the remaining data in another fragment 00335 ++fragment_count; 00336 } 00337 return fragment_count; 00338 } |
|
Return the datagram...
Definition at line 26 of file ECG_CDR_Message_Sender.i. References ACE_ASSERT, and endpoint_rptr_. Referenced by get_local_addr(), and send_fragment().
00027 { 00028 ACE_ASSERT (this->endpoint_rptr_.get ()); 00029 return this->endpoint_rptr_->dgram (); 00030 } |
|
Get the local endpoint used to send the events.
Definition at line 33 of file ECG_CDR_Message_Sender.i. References dgram(), endpoint_rptr_, ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::get(), and ACE_SOCK::get_local_addr().
00034 { 00035 if (this->endpoint_rptr_.get () == 0) 00036 return -1; 00037 return this->dgram ().get_local_addr (addr); 00038 } |
|
Set the endpoint for sending messages. If init () is successful, shutdown () must be called when the sender is no longer needed. If shutdown () is not called by the user, cleanup activities will be performed by the destructor. Definition at line 18 of file ECG_CDR_Message_Sender.cpp. References ACE_ERROR, ACE_THROW, LM_ERROR, and TAO_ECG_Refcounted_Endpoint. Referenced by TAO_ECG_UDP_Sender::init().
00022 { 00023 if (endpoint_rptr.get () == 0 00024 || endpoint_rptr->dgram ().get_handle () == ACE_INVALID_HANDLE) 00025 { 00026 ACE_ERROR ((LM_ERROR, "TAO_ECG_CDR_Message_Sender::init(): " 00027 "nil or unitialized endpoint argument.\n")); 00028 ACE_THROW (CORBA::INTERNAL ()); 00029 } 00030 00031 this->endpoint_rptr_ = endpoint_rptr; 00032 } |
|
Get the local endpoint used to send the events.
Definition at line 41 of file ECG_CDR_Message_Sender.i. References mtu_.
00042 { 00043 return this->mtu_; 00044 } |
|
The sender may need to fragment the message, otherwise the network may drop the packets. Setting the MTU can fail if the value is too small (at least the header + 8 bytes must fit). Definition at line 47 of file ECG_CDR_Message_Sender.i. References ECG_MAX_MTU, ECG_MIN_MTU, and mtu_.
00048 { 00049 if (new_mtu < TAO_ECG_CDR_Message_Sender::ECG_MIN_MTU 00050 || new_mtu >= TAO_ECG_CDR_Message_Sender::ECG_MAX_MTU) 00051 return -1; 00052 this->mtu_ = new_mtu; 00053 return 0; 00054 } |
|
Send one fragment, the first entry in the iovec is used to send the header, the rest of the iovec array should contain pointers to the actual data. Definition at line 192 of file ECG_CDR_Message_Sender.cpp. References ACE_DEBUG, ACE_THROW, ACE_OutputCDR::begin(), checksum_, ACE::crc32(), dgram(), ECG_HEADER_SIZE, EWOULDBLOCK, iovec::iov_base, iovec::iov_len, ACE_Message_Block::length(), LM_ERROR, LM_WARNING, ACE_Message_Block::rd_ptr(), ACE_SOCK_Dgram::send(), ssize_t, TAO_ENCAP_BYTE_ORDER, ACE_OutputCDR::write_boolean(), ACE_OutputCDR::write_octet(), ACE_OutputCDR::write_octet_array(), and ACE_OutputCDR::write_ulong().
00202 { 00203 CORBA::ULong header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE 00204 / sizeof(CORBA::ULong) 00205 + ACE_CDR::MAX_ALIGNMENT]; 00206 char* buf = reinterpret_cast<char*> (header); 00207 TAO_OutputCDR cdr (buf, sizeof(header)); 00208 cdr.write_boolean (TAO_ENCAP_BYTE_ORDER); 00209 // Insert some known values in the padding bytes, so we can smoke 00210 // test the message on the receiving end. 00211 cdr.write_octet ('A'); cdr.write_octet ('B'); cdr.write_octet ('C'); 00212 cdr.write_ulong (request_id); 00213 cdr.write_ulong (request_size); 00214 cdr.write_ulong (fragment_size); 00215 cdr.write_ulong (fragment_offset); 00216 cdr.write_ulong (fragment_id); 00217 cdr.write_ulong (fragment_count); 00218 CORBA::Octet padding[4]; 00219 00220 00221 // MRH 00222 if (checksum_) 00223 { 00224 // Compute CRC 00225 iov[0].iov_base = cdr.begin ()->rd_ptr (); 00226 iov[0].iov_len = cdr.begin ()->length (); 00227 unsigned int crc = 0; 00228 unsigned char *crc_parts = (unsigned char *)(&crc); 00229 if (iovcnt > 1) 00230 { 00231 crc = ACE::crc32 (iov, iovcnt); 00232 crc = htonl (crc); 00233 } 00234 for (int cnt=0; cnt<4; ++cnt) 00235 { 00236 padding[cnt] = crc_parts[cnt]; 00237 } 00238 } 00239 else 00240 { 00241 for (int cnt=0; cnt<4; ++cnt) 00242 { 00243 padding[cnt] = 0; 00244 } 00245 } 00246 //End MRH 00247 cdr.write_octet_array (padding, 4); 00248 00249 iov[0].iov_base = cdr.begin ()->rd_ptr (); 00250 iov[0].iov_len = cdr.begin ()->length (); 00251 00252 ssize_t n = this->dgram ().send (iov, 00253 iovcnt, 00254 addr); 00255 size_t expected_n = 0; 00256 for (int i = 0; i < iovcnt; ++i) 00257 expected_n += iov[i].iov_len; 00258 if (n > 0 && size_t(n) != expected_n) 00259 { 00260 ACE_DEBUG ((LM_ERROR, ("Sent only %d out of %d bytes " 00261 "for mcast fragment.\n"), 00262 n, 00263 expected_n)); 00264 } 00265 00266 if (n == -1) 00267 { 00268 if (errno == EWOULDBLOCK) 00269 { 00270 ACE_DEBUG ((LM_ERROR, "Send of mcast fragment failed (%m).\n")); 00271 // @@ TODO Use a Event Channel specific exception 00272 ACE_THROW (CORBA::COMM_FAILURE ()); 00273 } 00274 else 00275 { 00276 ACE_DEBUG ((LM_WARNING, "Send of mcast fragment blocked (%m).\n")); 00277 } 00278 } 00279 else if (n == 0) 00280 { 00281 ACE_DEBUG ((LM_WARNING, "EOF on send of mcast fragment (%m).\n")); 00282 } 00283 } |
|
The main method - send a CDR message.
Definition at line 35 of file ECG_CDR_Message_Sender.cpp. References ACE_CHECK, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_IOV_MAX, ACE_THROW, ACE_Message_Block::cont(), ECG_HEADER_SIZE, ACE_Message_Block::end(), iovec::iov_base, iovec::iov_len, ACE_Message_Block::length(), LM_ERROR, and ACE_Message_Block::rd_ptr().
00039 { 00040 if (this->endpoint_rptr_.get () == 0) 00041 { 00042 ACE_ERROR ((LM_ERROR, "Attempt to invoke send_message() " 00043 "on non-initialized sender object.\n")); 00044 ACE_THROW (CORBA::INTERNAL ()); 00045 } 00046 00047 CORBA::ULong max_fragment_payload = this->mtu () - 00048 TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE; 00049 // ACE_ASSERT (max_fragment_payload != 0); 00050 00051 #if defined (ACE_HAS_BROKEN_DGRAM_SENDV) 00052 const int TAO_WRITEV_MAX = ACE_IOV_MAX - 1; 00053 #else 00054 const int TAO_WRITEV_MAX = ACE_IOV_MAX; 00055 #endif /* ACE_HAS_BROKEN_DGRAM_SENDV */ 00056 iovec iov[TAO_WRITEV_MAX]; 00057 00058 CORBA::ULong total_length; 00059 CORBA::ULong fragment_count = 00060 this->compute_fragment_count (cdr.begin (), 00061 cdr.end (), 00062 TAO_WRITEV_MAX, 00063 max_fragment_payload, 00064 total_length); 00065 00066 CORBA::ULong request_id = this->endpoint_rptr_->next_request_id (); 00067 00068 // Reserve the first iovec for the header... 00069 int iovcnt = 1; 00070 CORBA::ULong fragment_id = 0; 00071 CORBA::ULong fragment_offset = 0; 00072 CORBA::ULong fragment_size = 0; 00073 for (const ACE_Message_Block* b = cdr.begin (); 00074 b != cdr.end (); 00075 b = b->cont ()) 00076 { 00077 CORBA::ULong l = b->length (); 00078 00079 char* rd_ptr = b->rd_ptr (); 00080 00081 iov[iovcnt].iov_base = rd_ptr; 00082 iov[iovcnt].iov_len = l; 00083 fragment_size += l; 00084 ++iovcnt; 00085 while (fragment_size > max_fragment_payload) 00086 { 00087 // This fragment is full, we have to send it... 00088 00089 // First adjust the last iov entry: 00090 CORBA::ULong last_mb_length = 00091 max_fragment_payload - (fragment_size - l); 00092 iov[iovcnt - 1].iov_len = last_mb_length; 00093 00094 this->send_fragment (addr, 00095 request_id, 00096 total_length, 00097 max_fragment_payload, 00098 fragment_offset, 00099 fragment_id, 00100 fragment_count, 00101 iov, 00102 iovcnt 00103 ACE_ENV_ARG_PARAMETER); 00104 ACE_CHECK; 00105 ++fragment_id; 00106 fragment_offset += max_fragment_payload; 00107 00108 // Reset, but don't forget that the last Message_Block 00109 // may need to be sent in multiple fragments.. 00110 l -= last_mb_length; 00111 rd_ptr += last_mb_length; 00112 iov[1].iov_base = rd_ptr; 00113 iov[1].iov_len = l; 00114 fragment_size = l; 00115 iovcnt = 2; 00116 } 00117 if (fragment_size == max_fragment_payload) 00118 { 00119 // We filled a fragment, but this time it was filled 00120 // exactly, the treatment is a little different from the 00121 // loop above... 00122 this->send_fragment (addr, 00123 request_id, 00124 total_length, 00125 max_fragment_payload, 00126 fragment_offset, 00127 fragment_id, 00128 fragment_count, 00129 iov, 00130 iovcnt 00131 ACE_ENV_ARG_PARAMETER); 00132 ACE_CHECK; 00133 ++fragment_id; 00134 fragment_offset += max_fragment_payload; 00135 00136 iovcnt = 1; 00137 fragment_size = 0; 00138 } 00139 if (iovcnt == TAO_WRITEV_MAX) 00140 { 00141 // Now we ran out of space in the iovec, we must send a 00142 // fragment to work around that.... 00143 this->send_fragment (addr, 00144 request_id, 00145 total_length, 00146 fragment_size, 00147 fragment_offset, 00148 fragment_id, 00149 fragment_count, 00150 iov, 00151 iovcnt 00152 ACE_ENV_ARG_PARAMETER); 00153 ACE_CHECK; 00154 ++fragment_id; 00155 fragment_offset += fragment_size; 00156 00157 iovcnt = 1; 00158 fragment_size = 0; 00159 } 00160 } 00161 // There is something left in the iovvec that we must send 00162 // also... 00163 if (iovcnt != 1) 00164 { 00165 // Now we ran out of space in the iovec, we must send a 00166 // fragment to work around that.... 00167 this->send_fragment (addr, 00168 request_id, 00169 total_length, 00170 fragment_size, 00171 fragment_offset, 00172 fragment_id, 00173 fragment_count, 00174 iov, 00175 iovcnt 00176 ACE_ENV_ARG_PARAMETER); 00177 ACE_CHECK; 00178 ++fragment_id; 00179 fragment_offset += fragment_size; 00180 00181 // reset, not needed here... 00182 // iovcnt = 1; 00183 // fragment_size = 0; 00184 } 00185 // ACE_ASSERT (total_length == fragment_offset); 00186 // ACE_ASSERT (fragment_id == fragment_count); 00187 00188 } |
|
Referenced by TAO_ECG_UDP_Sender::shutdown(). |
|
Should crc checksum be caluclated and sent?
Definition at line 179 of file ECG_CDR_Message_Sender.h. Referenced by send_fragment(). |
|
The datagram used for sendto ().
Definition at line 173 of file ECG_CDR_Message_Sender.h. Referenced by dgram(), and get_local_addr(). |
|
The MTU for this sender...
Definition at line 176 of file ECG_CDR_Message_Sender.h. Referenced by mtu(). |