ECG_CDR_Message_Receiver.h

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 /**
00003  *  @file ECG_CDR_Message_Receiver.h
00004  *
00005  *  ECG_CDR_Message_Receiver.h,v 1.9 2006/03/14 06:14:25 jtc Exp
00006  *
00007  *  @author Carlos O'Ryan (coryan@cs.wustl.edu)
00008  *  @author Marina Spivak (marina@atdesk.com)
00009  */
00010 
00011 #ifndef TAO_ECG_CDR_MESSAGE_RECEIVER_H
00012 #define TAO_ECG_CDR_MESSAGE_RECEIVER_H
00013 
00014 #include /**/ "ace/pre.h"
00015 
00016 #include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h"
00017 
00018 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00019 # pragma once
00020 #endif /* ACE_LACKS_PRAGMA_ONCE */
00021 
00022 #include "tao/CDR.h"
00023 #include "tao/Environment.h"
00024 
00025 #include "ace/Hash_Map_Manager.h"
00026 #include "ace/INET_Addr.h"
00027 #include "ace/Null_Mutex.h"
00028 
00029 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00030 
00031 /**
00032  * @class TAO_ECG_CDR_Processor
00033  *
00034  * @brief Interface for callback objects used by
00035  *        TAO_ECG_CDR_Message_Receiver to propagate received data to
00036  *        its callers.
00037  */
00038 class TAO_ECG_CDR_Processor
00039 {
00040 public:
00041   virtual ~TAO_ECG_CDR_Processor (void);
00042 
00043   /// Extracts data from <cdr>.  Returns 0 on success, -1 on error.
00044   virtual int decode (TAO_InputCDR &cdr) = 0;
00045 };
00046 
00047 // ****************************************************************
00048 /**
00049  * @class TAO_ECG_UDP_Request_Entry
00050  *
00051  * @brief Keeps information about an incomplete request.
00052  *
00053  * When a request arrives in fragments this object is used to
00054  * keep track of the incoming data.
00055  */
00056 class TAO_ECG_UDP_Request_Entry
00057 {
00058 public:
00059   enum {
00060     ECG_DEFAULT_FRAGMENT_BUFSIZ = 8
00061   };
00062 
00063   /// Initialize the fragment, allocating memory, etc.
00064   TAO_ECG_UDP_Request_Entry (CORBA::Boolean byte_order,
00065                              CORBA::ULong request_id,
00066                              CORBA::ULong request_size,
00067                              CORBA::ULong fragment_count);
00068 
00069   ~TAO_ECG_UDP_Request_Entry (void);
00070 
00071   /// Validate a fragment, it should be rejected if it is invalid..
00072   int validate_fragment (CORBA::Boolean byte_order,
00073                          CORBA::ULong request_size,
00074                          CORBA::ULong fragment_size,
00075                          CORBA::ULong fragment_offset,
00076                          CORBA::ULong fragment_id,
00077                          CORBA::ULong fragment_count) const;
00078 
00079   /// Has @a fragment_id been received?
00080   int test_received (CORBA::ULong fragment_id) const;
00081 
00082   /// Mark @a fragment_id as received, reset timeout counter...
00083   void mark_received (CORBA::ULong fragment_id);
00084 
00085   /// Is the message complete?
00086   int complete (void) const;
00087 
00088   /// Return a buffer for the fragment at offset @a fragment_offset
00089   char* fragment_buffer (CORBA::ULong fragment_offset);
00090 
00091 private:
00092 
00093   TAO_ECG_UDP_Request_Entry (const TAO_ECG_UDP_Request_Entry & rhs);
00094   TAO_ECG_UDP_Request_Entry& operator= (const TAO_ECG_UDP_Request_Entry & rhs);
00095 
00096 private:
00097   /// This attributes should remain constant in all the fragments, used
00098   /// for validation....
00099   CORBA::Boolean byte_order_;
00100   CORBA::ULong request_id_;
00101   CORBA::ULong request_size_;
00102   CORBA::ULong fragment_count_;
00103 
00104   ACE_Message_Block payload_;
00105 
00106   /// This is a bit vector, used to keep track of the received buffers.
00107   CORBA::ULong* received_fragments_;
00108   int own_received_fragments_;
00109   CORBA::ULong received_fragments_size_;
00110   CORBA::ULong default_received_fragments_[ECG_DEFAULT_FRAGMENT_BUFSIZ];
00111 };
00112 
00113 // ****************************************************************
00114 
00115 /**
00116  * @class TAO_ECG_CDR_Message_Receiver
00117  *
00118  * @brief Receives UDP and Multicast messages.
00119  *
00120  * @todo Update class documentation below.
00121  *
00122  *       5)  Make status array size and purge_count configurable.
00123  *
00124  * This class receives UDP and Multicast message fragments, assembles
00125  * them (described in detail below), and passes complete messages
00126  * in the form of cdr streams to the calling classes.
00127  *
00128  * This class is used by various Gateway classes (Senders/Receivers)
00129  * responsible for federating Event Channels with UDP/Mcast.
00130  *
00131  * = REASSEMBLY
00132  * Fragmentation is described in ECG_CDR_Message_Sender.h
00133  * Whenever an incomplete fragment is received (one with
00134  * fragment_count > 1) we allocate an entry for the message in an
00135  * map indexed by (host,port,request_id).  The entry contains the
00136  * buffer, a bit vector to keep track of the fragments received
00137  * so far, and a timeout counter.  This timeout counter is set to
00138  * 0 on each (new) fragment arrival, and incremented on a regular
00139  * basis.  If the counter reaches a maximum value the message is
00140  * dropped.
00141  * Once all the fragments have been received the message is sent
00142  * up to the calling classes, and the memory reclaimed.
00143  */
00144 class TAO_RTEvent_Serv_Export TAO_ECG_CDR_Message_Receiver
00145 {
00146 public:
00147   /// Initialization and termination methods.
00148   //@{
00149   TAO_ECG_CDR_Message_Receiver (CORBA::Boolean check_crc);
00150   ~TAO_ECG_CDR_Message_Receiver (void);
00151 
00152   /**
00153    * @param ignore_from Endpoint used to remove events generated by
00154    *        the same process.
00155    */
00156   void init (TAO_ECG_Refcounted_Endpoint ignore_from
00157              /* , ACE_Lock *lock = 0 */);
00158 
00159   // Shutdown the component: close down the request map, etc.
00160   void shutdown (void);
00161   //@}
00162 
00163   /// Main method: read the data from @a dgram and either pass ready data
00164   /// to @a cdr_processor or update the <request_map_> if the request
00165   /// is not yet complete.
00166   /**
00167    * Returns 1 if data was read successfully and accepted by
00168    * <cdr_processor> without errors.
00169    * Returns 0 if there were no errors, but no data has been passed to
00170    * <cdr_processor>, either due to request being incomplete (not all
00171    * fragments received), or it being a duplicate.
00172    * Returns -1 if there were errors.
00173    */
00174   int handle_input (ACE_SOCK_Dgram& dgram,
00175                     TAO_ECG_CDR_Processor *cdr_processor);
00176 
00177   /// Represents any request that has been fully received and
00178   /// serviced, to simplify the internal logic.
00179   static TAO_ECG_UDP_Request_Entry Request_Completed_;
00180 
00181 private:
00182 
00183   enum {
00184     ECG_DEFAULT_MAX_FRAGMENTED_REQUESTS = 1024,
00185     ECG_DEFAULT_FRAGMENTED_REQUESTS_MIN_PURGE_COUNT = 32
00186   };
00187 
00188   struct Mcast_Header;
00189   class Requests;
00190 
00191   typedef ACE_Hash_Map_Manager<ACE_INET_Addr,
00192                                Requests*,
00193                                ACE_Null_Mutex> Request_Map;
00194 
00195 private:
00196 
00197   /// Returns 1 on success, 0 if <request_id> has already been
00198   /// received or is below current request range, and -1 on error.
00199   int mark_received (const ACE_INET_Addr &from,
00200                      CORBA::ULong request_id);
00201 
00202   /// Returns 1 if complete request is received and <event> is
00203   /// populated, 0 if request has only partially been received or is a
00204   /// duplicate, and -1 on error.
00205   int process_fragment (const ACE_INET_Addr &from,
00206                         const Mcast_Header &header,
00207                         char * data_buf,
00208                         TAO_ECG_CDR_Processor *cdr_processor);
00209 
00210 
00211   Request_Map::ENTRY* get_source_entry (const ACE_INET_Addr &from);
00212 
00213 private:
00214 
00215   /// Ignore any events coming from this IP address.
00216   TAO_ECG_Refcounted_Endpoint ignore_from_;
00217 
00218   /// The map containing all the incoming requests which have been
00219   /// partially received.
00220   Request_Map request_map_;
00221 
00222   /// Serializes use of <request_map_>.
00223   //  ACE_Lock* lock_;
00224 
00225   /// Size of a fragmented requests array, i.e., max number of
00226   /// partially received requests kept at any given time per source.
00227   size_t max_requests_;
00228 
00229   /// Minimum number of requests purged from a fragmented requests
00230   /// array when the range of requests represented there needs to be
00231   /// shifted.
00232   size_t min_purge_count_;
00233 
00234   /// Flag to indicate whether CRC should be computed and checked.
00235   CORBA::Boolean check_crc_;
00236 };
00237 
00238 // ****************************************************************
00239 
00240 /// Helper for decoding, validating and storing mcast header.
00241 struct TAO_ECG_CDR_Message_Receiver::Mcast_Header
00242 {
00243   int byte_order;
00244   CORBA::ULong request_id;
00245   CORBA::ULong request_size;
00246   CORBA::ULong fragment_size;
00247   CORBA::ULong fragment_offset;
00248   CORBA::ULong fragment_id;
00249   CORBA::ULong fragment_count;
00250   CORBA::ULong crc;
00251   int read (char * header,
00252             size_t bytes_received,
00253             CORBA::Boolean checkcrc = 0);
00254 };
00255 
00256 // ****************************************************************
00257 
00258 /// Once init() has been called:
00259 /// Invariant: id_range_high_- id_range_low_ == size_ - 1
00260 class TAO_ECG_CDR_Message_Receiver::Requests
00261 {
00262 public:
00263 
00264   Requests (void);
00265   ~Requests (void);
00266 
00267   /// Allocates and initializes <fragmented_requests_>.
00268   int init (size_t size, size_t min_purge_count);
00269 
00270   /// Returns pointer to a <fragmented_requests_> element
00271   /// representing <request_id>.
00272   /**
00273    * If <request_id> < <id_range_low> return 0.
00274    * If <request_id> > <id_range_high>, shift the range so it
00275    * includes <request_id>, purging incomplete requests as needed.
00276    */
00277   TAO_ECG_UDP_Request_Entry ** get_request (CORBA::ULong request_id);
00278 
00279 private:
00280 
00281   /// Delete any outstanding requests with ids in the range
00282   /// [<purge_first>, <purge_last>] from <fragmented_requests> and
00283   /// and reset their slots.
00284   void purge_requests (CORBA::ULong purge_first,
00285                        CORBA::ULong purge_last);
00286 
00287   Requests & operator= (const Requests &rhs);
00288   Requests (const Requests &rhs);
00289 
00290 private:
00291   /// Array, used in a circular fashion, that stores partially received
00292   /// requests (and info on which requests have been fully received
00293   /// and processed) for a range of request ids.
00294   TAO_ECG_UDP_Request_Entry** fragmented_requests_;
00295 
00296   /// Size of <fragmented_requests_> array.
00297   size_t size_;
00298 
00299   /// The range of request ids, currently represented in
00300   /// <fragmented_requests>.
00301   //@{
00302   CORBA::ULong id_range_low_;
00303   CORBA::ULong id_range_high_;
00304   //@}
00305 
00306   /// Minimum range shifting amount.
00307   size_t min_purge_count_;
00308 };
00309 
00310 TAO_END_VERSIONED_NAMESPACE_DECL
00311 
00312 #if defined(__ACE_INLINE__)
00313 #include "orbsvcs/Event/ECG_CDR_Message_Receiver.i"
00314 #endif /* __ACE_INLINE__ */
00315 
00316 #include /**/ "ace/post.h"
00317 
00318 #endif /* TAO_ECG_CDR_MESSAGE_RECEIVER_H */

Generated on Thu Nov 9 13:11:10 2006 for TAO_RTEvent by doxygen 1.3.6