00001 // -*- C++ -*- 00002 00003 /** 00004 * @file ECG_CDR_Message_Sender.h 00005 * 00006 * ECG_CDR_Message_Sender.h,v 1.10 2006/03/15 07:52:21 jtc Exp 00007 * 00008 * @author Carlos O'Ryan (coryan@cs.wustl.edu) 00009 * @author Marina Spivak (marina@atdesk.com) 00010 */ 00011 00012 #ifndef TAO_ECG_CDR_MESSAGE_SENDER_H 00013 #define TAO_ECG_CDR_MESSAGE_SENDER_H 00014 00015 #include /**/ "ace/pre.h" 00016 00017 #include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h" 00018 00019 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00020 # pragma once 00021 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00022 00023 #include /**/ "orbsvcs/Event/event_serv_export.h" 00024 00025 #include "tao/SystemException.h" 00026 #include "tao/Environment.h" 00027 00028 #include "ace/INET_Addr.h" 00029 00030 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00031 00032 /** 00033 * @class TAO_ECG_CDR_Message_Sender 00034 * 00035 * @brief Sends CDR messages using UDP. 00036 * NOT THREAD-SAFE. 00037 * 00038 * This class breaks up a CDR message into fragments and sends each 00039 * fragment with a header (described below) using UDP. 00040 * The UDP address can be a normal IP address or it can be a multicast 00041 * group. The UDP address is obtained from a RtecUDPAdmin::AddrServer 00042 * class. 00043 * 00044 * This class is used by various Gateway (Senders/Receivers) classes 00045 * responsible for federating Event Channels with UDP/Mcast. 00046 * 00047 * <H2>MESSAGE FORMAT</H2> 00048 * Message header are encapsulated using CDR, with the 00049 * following format: 00050 * struct Header { 00051 * octet byte_order_flags; 00052 * // bit 0 represents the byte order as in GIOP 1.1 00053 * // bit 1 is set if this is the last fragment 00054 * unsigned long request_id; 00055 * // The request ID, senders must not send two requests with 00056 * // the same ID, senders can be distinguished using recvfrom.. 00057 * unsigned long request_size; 00058 * // The size of this request, this can be used to pre-allocate 00059 * // the request buffer. 00060 * unsgined long fragment_size; 00061 * // The size of this fragment, excluding the header... 00062 * unsigned long fragment_offset; 00063 * // Where does this fragment fit in the complete message... 00064 * unsigned long fragment_id; 00065 * // The ID of this fragment... 00066 * unsigned long fragment_count; 00067 * // The total number of fragments to expect in this request 00068 * 00069 * // @todo This could be eliminated if efficient reassembly 00070 * // could be implemented without it. 00071 * octet padding[4]; 00072 * 00073 * // Ensures the header ends at an 8-byte boundary. 00074 * }; // size (in CDR stream) = 32 00075 */ 00076 class TAO_RTEvent_Serv_Export TAO_ECG_CDR_Message_Sender 00077 { 00078 public: 00079 00080 enum { 00081 ECG_HEADER_SIZE = 32, 00082 ECG_MIN_MTU = 32 + 8, 00083 ECG_MAX_MTU = 65536, // Really optimistic... 00084 ECG_DEFAULT_MTU = 1024 00085 }; 00086 00087 /// Initialization and termination methods. 00088 //@{ 00089 TAO_ECG_CDR_Message_Sender (CORBA::Boolean crc = 0); 00090 00091 /// Set the endpoint for sending messages. 00092 /** 00093 * If init () is successful, shutdown () must be called when the 00094 * sender is no longer needed. If shutdown () is not called by the 00095 * user, cleanup activities will be performed by the destructor. 00096 */ 00097 void init (TAO_ECG_Refcounted_Endpoint endpoint_rptr 00098 ACE_ENV_ARG_DECL) 00099 ACE_THROW_SPEC ((CORBA::SystemException)); 00100 00101 // Shutdown this component. Frees up the endpoint. 00102 void shutdown (ACE_ENV_SINGLE_ARG_DECL); 00103 //@} 00104 00105 /// Setters/getters. 00106 //@{ 00107 /// Get the local endpoint used to send the events. 00108 int get_local_addr (ACE_INET_Addr& addr); 00109 00110 /** 00111 * The sender may need to fragment the message, otherwise the 00112 * network may drop the packets. 00113 * Setting the MTU can fail if the value is too small (at least the 00114 * header + 8 bytes must fit). 00115 */ 00116 int mtu (CORBA::ULong mtu); 00117 CORBA::ULong mtu (void) const; 00118 //@} 00119 00120 /// The main method - send a CDR message. 00121 /** 00122 * @todo Under some platforms, notably Linux, the fragmentation code 00123 * in this method is woefully naive. The fragments are sent it a 00124 * big burst, unfortunately, that can fill up the local kernel 00125 * buffer before all the data is sent. In those circumstances some 00126 * of the fragments are silently (gulp!) dropped by the kernel, 00127 * check the documentation for sendto(2) specially the ENOBUFS 00128 * error condition. 00129 * There is no easy solution that I know off, except "pacing" the 00130 * fragments, i.e. never sending more than a prescribed number of 00131 * bytes per-second, sleeping before sending more or queueing them 00132 * to send later via the reactor. 00133 */ 00134 void send_message (const TAO_OutputCDR &cdr, 00135 const ACE_INET_Addr &addr 00136 ACE_ENV_ARG_DECL) 00137 ACE_THROW_SPEC ((CORBA::SystemException)); 00138 00139 private: 00140 /// Return the datagram... 00141 ACE_SOCK_Dgram& dgram (void); 00142 00143 /** 00144 * Send one fragment, the first entry in the iovec is used to send 00145 * the header, the rest of the iovec array should contain pointers 00146 * to the actual data. 00147 */ 00148 void send_fragment (const ACE_INET_Addr &addr, 00149 CORBA::ULong request_id, 00150 CORBA::ULong request_size, 00151 CORBA::ULong fragment_size, 00152 CORBA::ULong fragment_offset, 00153 CORBA::ULong fragment_id, 00154 CORBA::ULong fragment_count, 00155 iovec iov[], 00156 int iovcnt 00157 ACE_ENV_ARG_DECL); 00158 00159 /** 00160 * Count the number of fragments that will be required to send the 00161 * message blocks in the range [begin,end) 00162 * The maximum fragment payload (i.e. the size without the header is 00163 * also required); <total_length> returns the total message size. 00164 */ 00165 CORBA::ULong compute_fragment_count (const ACE_Message_Block* begin, 00166 const ACE_Message_Block* end, 00167 int iov_size, 00168 CORBA::ULong max_fragment_payload, 00169 CORBA::ULong& total_length); 00170 00171 private: 00172 /// The datagram used for sendto (). 00173 TAO_ECG_Refcounted_Endpoint endpoint_rptr_; 00174 00175 /// The MTU for this sender... 00176 CORBA::ULong mtu_; 00177 00178 /// Should crc checksum be caluclated and sent? 00179 CORBA::Boolean checksum_; 00180 }; 00181 00182 TAO_END_VERSIONED_NAMESPACE_DECL 00183 00184 #if defined(__ACE_INLINE__) 00185 #include "orbsvcs/Event/ECG_CDR_Message_Sender.i" 00186 #endif /* __ACE_INLINE__ */ 00187 00188 #include /**/ "ace/post.h" 00189 00190 #endif /* TAO_ECG_CDR_MESSAGE_SENDER_H */