00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #ifndef TAO_ECG_CDR_MESSAGE_RECEIVER_H
00012 #define TAO_ECG_CDR_MESSAGE_RECEIVER_H
00013
00014 #include "ace/pre.h"
00015
00016 #include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h"
00017
00018 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00019 # pragma once
00020 #endif
00021
00022 #include "tao/CDR.h"
00023 #include "tao/Environment.h"
00024
00025 #include "ace/Hash_Map_Manager.h"
00026 #include "ace/INET_Addr.h"
00027 #include "ace/Null_Mutex.h"
00028
00029 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00030
00031
00032
00033
00034
00035
00036
00037
00038 class TAO_ECG_CDR_Processor
00039 {
00040 public:
00041 virtual ~TAO_ECG_CDR_Processor (void);
00042
00043
00044 virtual int decode (TAO_InputCDR &cdr) = 0;
00045 };
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056 class TAO_ECG_UDP_Request_Entry
00057 {
00058 public:
00059 enum {
00060 ECG_DEFAULT_FRAGMENT_BUFSIZ = 8
00061 };
00062
00063
00064 TAO_ECG_UDP_Request_Entry (CORBA::Boolean byte_order,
00065 CORBA::ULong request_id,
00066 CORBA::ULong request_size,
00067 CORBA::ULong fragment_count);
00068
00069 ~TAO_ECG_UDP_Request_Entry (void);
00070
00071
00072 int validate_fragment (CORBA::Boolean byte_order,
00073 CORBA::ULong request_size,
00074 CORBA::ULong fragment_size,
00075 CORBA::ULong fragment_offset,
00076 CORBA::ULong fragment_id,
00077 CORBA::ULong fragment_count) const;
00078
00079
00080 int test_received (CORBA::ULong fragment_id) const;
00081
00082
00083 void mark_received (CORBA::ULong fragment_id);
00084
00085
00086 int complete (void) const;
00087
00088
00089 char* fragment_buffer (CORBA::ULong fragment_offset);
00090
00091 private:
00092
00093 TAO_ECG_UDP_Request_Entry (const TAO_ECG_UDP_Request_Entry & rhs);
00094 TAO_ECG_UDP_Request_Entry& operator= (const TAO_ECG_UDP_Request_Entry & rhs);
00095
00096 private:
00097
00098
00099 CORBA::Boolean byte_order_;
00100 CORBA::ULong request_id_;
00101 CORBA::ULong request_size_;
00102 CORBA::ULong fragment_count_;
00103
00104 ACE_Message_Block payload_;
00105
00106
00107 CORBA::ULong* received_fragments_;
00108 int own_received_fragments_;
00109 CORBA::ULong received_fragments_size_;
00110 CORBA::ULong default_received_fragments_[ECG_DEFAULT_FRAGMENT_BUFSIZ];
00111 };
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144 class TAO_RTEvent_Serv_Export TAO_ECG_CDR_Message_Receiver
00145 {
00146 public:
00147
00148
00149 TAO_ECG_CDR_Message_Receiver (CORBA::Boolean check_crc);
00150 ~TAO_ECG_CDR_Message_Receiver (void);
00151
00152
00153
00154
00155
00156 void init (TAO_ECG_Refcounted_Endpoint ignore_from
00157 );
00158
00159
00160 void shutdown (void);
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174 int handle_input (ACE_SOCK_Dgram& dgram,
00175 TAO_ECG_CDR_Processor *cdr_processor);
00176
00177
00178
00179 static TAO_ECG_UDP_Request_Entry Request_Completed_;
00180
00181 private:
00182
00183 enum {
00184 ECG_DEFAULT_MAX_FRAGMENTED_REQUESTS = 1024,
00185 ECG_DEFAULT_FRAGMENTED_REQUESTS_MIN_PURGE_COUNT = 32
00186 };
00187
00188 struct Mcast_Header;
00189 class Requests;
00190
00191 typedef ACE_Hash_Map_Manager<ACE_INET_Addr,
00192 Requests*,
00193 ACE_Null_Mutex> Request_Map;
00194
00195 private:
00196
00197
00198
00199 int mark_received (const ACE_INET_Addr &from,
00200 CORBA::ULong request_id);
00201
00202
00203
00204
00205 int process_fragment (const ACE_INET_Addr &from,
00206 const Mcast_Header &header,
00207 char * data_buf,
00208 TAO_ECG_CDR_Processor *cdr_processor);
00209
00210
00211 Request_Map::ENTRY* get_source_entry (const ACE_INET_Addr &from);
00212
00213 private:
00214
00215
00216 TAO_ECG_Refcounted_Endpoint ignore_from_;
00217
00218
00219
00220 Request_Map request_map_;
00221
00222
00223
00224
00225
00226
00227 size_t max_requests_;
00228
00229
00230
00231
00232 size_t min_purge_count_;
00233
00234
00235 CORBA::Boolean check_crc_;
00236 };
00237
00238
00239
00240
00241 struct TAO_ECG_CDR_Message_Receiver::Mcast_Header
00242 {
00243 int byte_order;
00244 CORBA::ULong request_id;
00245 CORBA::ULong request_size;
00246 CORBA::ULong fragment_size;
00247 CORBA::ULong fragment_offset;
00248 CORBA::ULong fragment_id;
00249 CORBA::ULong fragment_count;
00250 CORBA::ULong crc;
00251 int read (char * header,
00252 size_t bytes_received,
00253 CORBA::Boolean checkcrc = 0);
00254 };
00255
00256
00257
00258
00259
00260 class TAO_ECG_CDR_Message_Receiver::Requests
00261 {
00262 public:
00263
00264 Requests (void);
00265 ~Requests (void);
00266
00267
00268 int init (size_t size, size_t min_purge_count);
00269
00270
00271
00272
00273
00274
00275
00276
00277 TAO_ECG_UDP_Request_Entry ** get_request (CORBA::ULong request_id);
00278
00279 private:
00280
00281
00282
00283
00284 void purge_requests (CORBA::ULong purge_first,
00285 CORBA::ULong purge_last);
00286
00287 Requests & operator= (const Requests &rhs);
00288 Requests (const Requests &rhs);
00289
00290 private:
00291
00292
00293
00294 TAO_ECG_UDP_Request_Entry** fragmented_requests_;
00295
00296
00297 size_t size_;
00298
00299
00300
00301
00302 CORBA::ULong id_range_low_;
00303 CORBA::ULong id_range_high_;
00304
00305
00306
00307 size_t min_purge_count_;
00308 };
00309
00310 TAO_END_VERSIONED_NAMESPACE_DECL
00311
00312 #if defined(__ACE_INLINE__)
00313 #include "orbsvcs/Event/ECG_CDR_Message_Receiver.i"
00314 #endif
00315
00316 #include "ace/post.h"
00317
00318 #endif