00001 // -*- C++ -*- 00002 00003 /** 00004 * @file ECG_CDR_Message_Sender.h 00005 * 00006 * $Id: ECG_CDR_Message_Sender.h 77001 2007-02-12 07:54:49Z johnnyw $ 00007 * 00008 * @author Carlos O'Ryan (coryan@cs.wustl.edu) 00009 * @author Marina Spivak (marina@atdesk.com) 00010 */ 00011 00012 #ifndef TAO_ECG_CDR_MESSAGE_SENDER_H 00013 #define TAO_ECG_CDR_MESSAGE_SENDER_H 00014 00015 #include /**/ "ace/pre.h" 00016 00017 #include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h" 00018 00019 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00020 # pragma once 00021 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00022 00023 #include /**/ "orbsvcs/Event/event_serv_export.h" 00024 00025 #include "tao/SystemException.h" 00026 00027 #include "ace/INET_Addr.h" 00028 00029 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00030 00031 /** 00032 * @class TAO_ECG_CDR_Message_Sender 00033 * 00034 * @brief Sends CDR messages using UDP. 00035 * NOT THREAD-SAFE. 00036 * 00037 * This class breaks up a CDR message into fragments and sends each 00038 * fragment with a header (described below) using UDP. 00039 * The UDP address can be a normal IP address or it can be a multicast 00040 * group. The UDP address is obtained from a RtecUDPAdmin::AddrServer 00041 * class. 00042 * 00043 * This class is used by various Gateway (Senders/Receivers) classes 00044 * responsible for federating Event Channels with UDP/Mcast. 00045 * 00046 * <H2>MESSAGE FORMAT</H2> 00047 * Message header are encapsulated using CDR, with the 00048 * following format: 00049 * struct Header { 00050 * octet byte_order_flags; 00051 * // bit 0 represents the byte order as in GIOP 1.1 00052 * // bit 1 is set if this is the last fragment 00053 * unsigned long request_id; 00054 * // The request ID, senders must not send two requests with 00055 * // the same ID, senders can be distinguished using recvfrom.. 00056 * unsigned long request_size; 00057 * // The size of this request, this can be used to pre-allocate 00058 * // the request buffer. 00059 * unsgined long fragment_size; 00060 * // The size of this fragment, excluding the header... 00061 * unsigned long fragment_offset; 00062 * // Where does this fragment fit in the complete message... 00063 * unsigned long fragment_id; 00064 * // The ID of this fragment... 00065 * unsigned long fragment_count; 00066 * // The total number of fragments to expect in this request 00067 * 00068 * // @todo This could be eliminated if efficient reassembly 00069 * // could be implemented without it. 00070 * octet padding[4]; 00071 * 00072 * // Ensures the header ends at an 8-byte boundary. 00073 * }; // size (in CDR stream) = 32 00074 */ 00075 class TAO_RTEvent_Serv_Export TAO_ECG_CDR_Message_Sender 00076 { 00077 public: 00078 00079 enum { 00080 ECG_HEADER_SIZE = 32, 00081 ECG_MIN_MTU = 32 + 8, 00082 ECG_MAX_MTU = 65536, // Really optimistic... 00083 ECG_DEFAULT_MTU = 1024 00084 }; 00085 00086 /// Initialization and termination methods. 00087 //@{ 00088 TAO_ECG_CDR_Message_Sender (CORBA::Boolean crc = 0); 00089 00090 /// Set the endpoint for sending messages. 00091 /** 00092 * If init () is successful, shutdown () must be called when the 00093 * sender is no longer needed. If shutdown () is not called by the 00094 * user, cleanup activities will be performed by the destructor. 00095 */ 00096 void init (TAO_ECG_Refcounted_Endpoint endpoint_rptr); 00097 00098 // Shutdown this component. Frees up the endpoint. 00099 void shutdown (void); 00100 //@} 00101 00102 /// Setters/getters. 00103 //@{ 00104 /// Get the local endpoint used to send the events. 00105 int get_local_addr (ACE_INET_Addr& addr); 00106 00107 /** 00108 * The sender may need to fragment the message, otherwise the 00109 * network may drop the packets. 00110 * Setting the MTU can fail if the value is too small (at least the 00111 * header + 8 bytes must fit). 00112 */ 00113 int mtu (CORBA::ULong mtu); 00114 CORBA::ULong mtu (void) const; 00115 //@} 00116 00117 /// The main method - send a CDR message. 00118 /** 00119 * @todo Under some platforms, notably Linux, the fragmentation code 00120 * in this method is woefully naive. The fragments are sent it a 00121 * big burst, unfortunately, that can fill up the local kernel 00122 * buffer before all the data is sent. In those circumstances some 00123 * of the fragments are silently (gulp!) dropped by the kernel, 00124 * check the documentation for sendto(2) specially the ENOBUFS 00125 * error condition. 00126 * There is no easy solution that I know off, except "pacing" the 00127 * fragments, i.e. never sending more than a prescribed number of 00128 * bytes per-second, sleeping before sending more or queueing them 00129 * to send later via the reactor. 00130 */ 00131 void send_message (const TAO_OutputCDR &cdr, 00132 const ACE_INET_Addr &addr); 00133 00134 private: 00135 /// Return the datagram... 00136 ACE_SOCK_Dgram& dgram (void); 00137 00138 /** 00139 * Send one fragment, the first entry in the iovec is used to send 00140 * the header, the rest of the iovec array should contain pointers 00141 * to the actual data. 00142 */ 00143 void send_fragment (const ACE_INET_Addr &addr, 00144 CORBA::ULong request_id, 00145 CORBA::ULong request_size, 00146 CORBA::ULong fragment_size, 00147 CORBA::ULong fragment_offset, 00148 CORBA::ULong fragment_id, 00149 CORBA::ULong fragment_count, 00150 iovec iov[], 00151 int iovcnt); 00152 00153 /** 00154 * Count the number of fragments that will be required to send the 00155 * message blocks in the range [begin,end) 00156 * The maximum fragment payload (i.e. the size without the header is 00157 * also required); <total_length> returns the total message size. 00158 */ 00159 CORBA::ULong compute_fragment_count (const ACE_Message_Block* begin, 00160 const ACE_Message_Block* end, 00161 int iov_size, 00162 CORBA::ULong max_fragment_payload, 00163 CORBA::ULong& total_length); 00164 00165 private: 00166 /// The datagram used for sendto (). 00167 TAO_ECG_Refcounted_Endpoint endpoint_rptr_; 00168 00169 /// The MTU for this sender... 00170 CORBA::ULong mtu_; 00171 00172 /// Should crc checksum be caluclated and sent? 00173 CORBA::Boolean checksum_; 00174 }; 00175 00176 TAO_END_VERSIONED_NAMESPACE_DECL 00177 00178 #if defined(__ACE_INLINE__) 00179 #include "orbsvcs/Event/ECG_CDR_Message_Sender.inl" 00180 #endif /* __ACE_INLINE__ */ 00181 00182 #include /**/ "ace/post.h" 00183 00184 #endif /* TAO_ECG_CDR_MESSAGE_SENDER_H */