00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #ifndef TAO_ECG_CDR_MESSAGE_RECEIVER_H
00012 #define TAO_ECG_CDR_MESSAGE_RECEIVER_H
00013
00014 #include "ace/pre.h"
00015
00016 #include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h"
00017
00018 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00019 # pragma once
00020 #endif
00021
00022 #include "tao/CDR.h"
00023
00024 #include "ace/Hash_Map_Manager.h"
00025 #include "ace/INET_Addr.h"
00026 #include "ace/Null_Mutex.h"
00027
00028 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00029
00030
00031
00032
00033
00034
00035
00036
00037 class TAO_ECG_CDR_Processor
00038 {
00039 public:
00040 virtual ~TAO_ECG_CDR_Processor (void);
00041
00042
00043 virtual int decode (TAO_InputCDR &cdr) = 0;
00044 };
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055 class TAO_ECG_UDP_Request_Entry
00056 {
00057 public:
00058 enum {
00059 ECG_DEFAULT_FRAGMENT_BUFSIZ = 8
00060 };
00061
00062
00063 TAO_ECG_UDP_Request_Entry (CORBA::Boolean byte_order,
00064 CORBA::ULong request_id,
00065 CORBA::ULong request_size,
00066 CORBA::ULong fragment_count);
00067
00068 ~TAO_ECG_UDP_Request_Entry (void);
00069
00070
00071 int validate_fragment (CORBA::Boolean byte_order,
00072 CORBA::ULong request_size,
00073 CORBA::ULong fragment_size,
00074 CORBA::ULong fragment_offset,
00075 CORBA::ULong fragment_id,
00076 CORBA::ULong fragment_count) const;
00077
00078
00079 int test_received (CORBA::ULong fragment_id) const;
00080
00081
00082 void mark_received (CORBA::ULong fragment_id);
00083
00084
00085 int complete (void) const;
00086
00087
00088 char* fragment_buffer (CORBA::ULong fragment_offset);
00089
00090 private:
00091
00092 TAO_ECG_UDP_Request_Entry (const TAO_ECG_UDP_Request_Entry & rhs);
00093 TAO_ECG_UDP_Request_Entry& operator= (const TAO_ECG_UDP_Request_Entry & rhs);
00094
00095 private:
00096
00097
00098 CORBA::Boolean byte_order_;
00099 CORBA::ULong request_id_;
00100 CORBA::ULong request_size_;
00101 CORBA::ULong fragment_count_;
00102
00103 ACE_Message_Block payload_;
00104
00105
00106 CORBA::ULong* received_fragments_;
00107 int own_received_fragments_;
00108 CORBA::ULong received_fragments_size_;
00109 CORBA::ULong default_received_fragments_[ECG_DEFAULT_FRAGMENT_BUFSIZ];
00110 };
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143 class TAO_RTEvent_Serv_Export TAO_ECG_CDR_Message_Receiver
00144 {
00145 public:
00146
00147
00148 TAO_ECG_CDR_Message_Receiver (CORBA::Boolean check_crc);
00149 ~TAO_ECG_CDR_Message_Receiver (void);
00150
00151
00152
00153
00154
00155 void init (TAO_ECG_Refcounted_Endpoint ignore_from
00156 );
00157
00158
00159 void shutdown (void);
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173 int handle_input (ACE_SOCK_Dgram& dgram,
00174 TAO_ECG_CDR_Processor *cdr_processor);
00175
00176
00177
00178 static TAO_ECG_UDP_Request_Entry Request_Completed_;
00179
00180 private:
00181
00182 enum {
00183 ECG_DEFAULT_MAX_FRAGMENTED_REQUESTS = 1024,
00184 ECG_DEFAULT_FRAGMENTED_REQUESTS_MIN_PURGE_COUNT = 32
00185 };
00186
00187 struct Mcast_Header;
00188 class Requests;
00189
00190 typedef ACE_Hash_Map_Manager<ACE_INET_Addr,
00191 Requests*,
00192 ACE_Null_Mutex> Request_Map;
00193
00194 private:
00195
00196
00197
00198 int mark_received (const ACE_INET_Addr &from,
00199 CORBA::ULong request_id);
00200
00201
00202
00203
00204 int process_fragment (const ACE_INET_Addr &from,
00205 const Mcast_Header &header,
00206 char * data_buf,
00207 TAO_ECG_CDR_Processor *cdr_processor);
00208
00209
00210 Request_Map::ENTRY* get_source_entry (const ACE_INET_Addr &from);
00211
00212 private:
00213
00214
00215 TAO_ECG_Refcounted_Endpoint ignore_from_;
00216
00217
00218
00219 Request_Map request_map_;
00220
00221
00222
00223
00224
00225
00226 size_t max_requests_;
00227
00228
00229
00230
00231 size_t min_purge_count_;
00232
00233
00234 CORBA::Boolean check_crc_;
00235 };
00236
00237
00238
00239
00240 struct TAO_ECG_CDR_Message_Receiver::Mcast_Header
00241 {
00242 int byte_order;
00243 CORBA::ULong request_id;
00244 CORBA::ULong request_size;
00245 CORBA::ULong fragment_size;
00246 CORBA::ULong fragment_offset;
00247 CORBA::ULong fragment_id;
00248 CORBA::ULong fragment_count;
00249 CORBA::ULong crc;
00250 int read (char * header,
00251 size_t bytes_received,
00252 CORBA::Boolean checkcrc = 0);
00253 };
00254
00255
00256
00257
00258
00259 class TAO_ECG_CDR_Message_Receiver::Requests
00260 {
00261 public:
00262
00263 Requests (void);
00264 ~Requests (void);
00265
00266
00267 int init (size_t size, size_t min_purge_count);
00268
00269
00270
00271
00272
00273
00274
00275
00276 TAO_ECG_UDP_Request_Entry ** get_request (CORBA::ULong request_id);
00277
00278 private:
00279
00280
00281
00282
00283 void purge_requests (CORBA::ULong purge_first,
00284 CORBA::ULong purge_last);
00285
00286 Requests & operator= (const Requests &rhs);
00287 Requests (const Requests &rhs);
00288
00289 private:
00290
00291
00292
00293 TAO_ECG_UDP_Request_Entry** fragmented_requests_;
00294
00295
00296 size_t size_;
00297
00298
00299
00300
00301 CORBA::ULong id_range_low_;
00302 CORBA::ULong id_range_high_;
00303
00304
00305
00306 size_t min_purge_count_;
00307 };
00308
00309 TAO_END_VERSIONED_NAMESPACE_DECL
00310
00311 #if defined(__ACE_INLINE__)
00312 #include "orbsvcs/Event/ECG_CDR_Message_Receiver.inl"
00313 #endif
00314
00315 #include "ace/post.h"
00316
00317 #endif