ECG_CDR_Message_Receiver.h

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 /**
00003  *  @file ECG_CDR_Message_Receiver.h
00004  *
00005  *  $Id: ECG_CDR_Message_Receiver.h 77001 2007-02-12 07:54:49Z johnnyw $
00006  *
00007  *  @author Carlos O'Ryan (coryan@cs.wustl.edu)
00008  *  @author Marina Spivak (marina@atdesk.com)
00009  */
00010 
00011 #ifndef TAO_ECG_CDR_MESSAGE_RECEIVER_H
00012 #define TAO_ECG_CDR_MESSAGE_RECEIVER_H
00013 
00014 #include /**/ "ace/pre.h"
00015 
00016 #include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h"
00017 
00018 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00019 # pragma once
00020 #endif /* ACE_LACKS_PRAGMA_ONCE */
00021 
00022 #include "tao/CDR.h"
00023 
00024 #include "ace/Hash_Map_Manager.h"
00025 #include "ace/INET_Addr.h"
00026 #include "ace/Null_Mutex.h"
00027 
00028 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00029 
00030 /**
00031  * @class TAO_ECG_CDR_Processor
00032  *
00033  * @brief Interface for callback objects used by
00034  *        TAO_ECG_CDR_Message_Receiver to propagate received data to
00035  *        its callers.
00036  */
00037 class TAO_ECG_CDR_Processor
00038 {
00039 public:
00040   virtual ~TAO_ECG_CDR_Processor (void);
00041 
00042   /// Extracts data from <cdr>.  Returns 0 on success, -1 on error.
00043   virtual int decode (TAO_InputCDR &cdr) = 0;
00044 };
00045 
00046 // ****************************************************************
00047 /**
00048  * @class TAO_ECG_UDP_Request_Entry
00049  *
00050  * @brief Keeps information about an incomplete request.
00051  *
00052  * When a request arrives in fragments this object is used to
00053  * keep track of the incoming data.
00054  */
00055 class TAO_ECG_UDP_Request_Entry
00056 {
00057 public:
00058   enum {
00059     ECG_DEFAULT_FRAGMENT_BUFSIZ = 8
00060   };
00061 
00062   /// Initialize the fragment, allocating memory, etc.
00063   TAO_ECG_UDP_Request_Entry (CORBA::Boolean byte_order,
00064                              CORBA::ULong request_id,
00065                              CORBA::ULong request_size,
00066                              CORBA::ULong fragment_count);
00067 
00068   ~TAO_ECG_UDP_Request_Entry (void);
00069 
00070   /// Validate a fragment, it should be rejected if it is invalid..
00071   int validate_fragment (CORBA::Boolean byte_order,
00072                          CORBA::ULong request_size,
00073                          CORBA::ULong fragment_size,
00074                          CORBA::ULong fragment_offset,
00075                          CORBA::ULong fragment_id,
00076                          CORBA::ULong fragment_count) const;
00077 
00078   /// Has @a fragment_id been received?
00079   int test_received (CORBA::ULong fragment_id) const;
00080 
00081   /// Mark @a fragment_id as received, reset timeout counter...
00082   void mark_received (CORBA::ULong fragment_id);
00083 
00084   /// Is the message complete?
00085   int complete (void) const;
00086 
00087   /// Return a buffer for the fragment at offset @a fragment_offset
00088   char* fragment_buffer (CORBA::ULong fragment_offset);
00089 
00090 private:
00091 
00092   TAO_ECG_UDP_Request_Entry (const TAO_ECG_UDP_Request_Entry & rhs);
00093   TAO_ECG_UDP_Request_Entry& operator= (const TAO_ECG_UDP_Request_Entry & rhs);
00094 
00095 private:
00096   /// This attributes should remain constant in all the fragments, used
00097   /// for validation....
00098   CORBA::Boolean byte_order_;
00099   CORBA::ULong request_id_;
00100   CORBA::ULong request_size_;
00101   CORBA::ULong fragment_count_;
00102 
00103   ACE_Message_Block payload_;
00104 
00105   /// This is a bit vector, used to keep track of the received buffers.
00106   CORBA::ULong* received_fragments_;
00107   int own_received_fragments_;
00108   CORBA::ULong received_fragments_size_;
00109   CORBA::ULong default_received_fragments_[ECG_DEFAULT_FRAGMENT_BUFSIZ];
00110 };
00111 
00112 // ****************************************************************
00113 
00114 /**
00115  * @class TAO_ECG_CDR_Message_Receiver
00116  *
00117  * @brief Receives UDP and Multicast messages.
00118  *
00119  * @todo Update class documentation below.
00120  *
00121  *       5)  Make status array size and purge_count configurable.
00122  *
00123  * This class receives UDP and Multicast message fragments, assembles
00124  * them (described in detail below), and passes complete messages
00125  * in the form of cdr streams to the calling classes.
00126  *
00127  * This class is used by various Gateway classes (Senders/Receivers)
00128  * responsible for federating Event Channels with UDP/Mcast.
00129  *
00130  * = REASSEMBLY
00131  * Fragmentation is described in ECG_CDR_Message_Sender.h
00132  * Whenever an incomplete fragment is received (one with
00133  * fragment_count > 1) we allocate an entry for the message in an
00134  * map indexed by (host,port,request_id).  The entry contains the
00135  * buffer, a bit vector to keep track of the fragments received
00136  * so far, and a timeout counter.  This timeout counter is set to
00137  * 0 on each (new) fragment arrival, and incremented on a regular
00138  * basis.  If the counter reaches a maximum value the message is
00139  * dropped.
00140  * Once all the fragments have been received the message is sent
00141  * up to the calling classes, and the memory reclaimed.
00142  */
00143 class TAO_RTEvent_Serv_Export TAO_ECG_CDR_Message_Receiver
00144 {
00145 public:
00146   /// Initialization and termination methods.
00147   //@{
00148   TAO_ECG_CDR_Message_Receiver (CORBA::Boolean check_crc);
00149   ~TAO_ECG_CDR_Message_Receiver (void);
00150 
00151   /**
00152    * @param ignore_from Endpoint used to remove events generated by
00153    *        the same process.
00154    */
00155   void init (TAO_ECG_Refcounted_Endpoint ignore_from
00156              /* , ACE_Lock *lock = 0 */);
00157 
00158   // Shutdown the component: close down the request map, etc.
00159   void shutdown (void);
00160   //@}
00161 
00162   /// Main method: read the data from @a dgram and either pass ready data
00163   /// to @a cdr_processor or update the <request_map_> if the request
00164   /// is not yet complete.
00165   /**
00166    * Returns 1 if data was read successfully and accepted by
00167    * <cdr_processor> without errors.
00168    * Returns 0 if there were no errors, but no data has been passed to
00169    * <cdr_processor>, either due to request being incomplete (not all
00170    * fragments received), or it being a duplicate.
00171    * Returns -1 if there were errors.
00172    */
00173   int handle_input (ACE_SOCK_Dgram& dgram,
00174                     TAO_ECG_CDR_Processor *cdr_processor);
00175 
00176   /// Represents any request that has been fully received and
00177   /// serviced, to simplify the internal logic.
00178   static TAO_ECG_UDP_Request_Entry Request_Completed_;
00179 
00180 private:
00181 
00182   enum {
00183     ECG_DEFAULT_MAX_FRAGMENTED_REQUESTS = 1024,
00184     ECG_DEFAULT_FRAGMENTED_REQUESTS_MIN_PURGE_COUNT = 32
00185   };
00186 
00187   struct Mcast_Header;
00188   class Requests;
00189 
00190   typedef ACE_Hash_Map_Manager<ACE_INET_Addr,
00191                                Requests*,
00192                                ACE_Null_Mutex> Request_Map;
00193 
00194 private:
00195 
00196   /// Returns 1 on success, 0 if <request_id> has already been
00197   /// received or is below current request range, and -1 on error.
00198   int mark_received (const ACE_INET_Addr &from,
00199                      CORBA::ULong request_id);
00200 
00201   /// Returns 1 if complete request is received and <event> is
00202   /// populated, 0 if request has only partially been received or is a
00203   /// duplicate, and -1 on error.
00204   int process_fragment (const ACE_INET_Addr &from,
00205                         const Mcast_Header &header,
00206                         char * data_buf,
00207                         TAO_ECG_CDR_Processor *cdr_processor);
00208 
00209 
00210   Request_Map::ENTRY* get_source_entry (const ACE_INET_Addr &from);
00211 
00212 private:
00213 
00214   /// Ignore any events coming from this IP address.
00215   TAO_ECG_Refcounted_Endpoint ignore_from_;
00216 
00217   /// The map containing all the incoming requests which have been
00218   /// partially received.
00219   Request_Map request_map_;
00220 
00221   /// Serializes use of <request_map_>.
00222   //  ACE_Lock* lock_;
00223 
00224   /// Size of a fragmented requests array, i.e., max number of
00225   /// partially received requests kept at any given time per source.
00226   size_t max_requests_;
00227 
00228   /// Minimum number of requests purged from a fragmented requests
00229   /// array when the range of requests represented there needs to be
00230   /// shifted.
00231   size_t min_purge_count_;
00232 
00233   /// Flag to indicate whether CRC should be computed and checked.
00234   CORBA::Boolean check_crc_;
00235 };
00236 
00237 // ****************************************************************
00238 
00239 /// Helper for decoding, validating and storing mcast header.
00240 struct TAO_ECG_CDR_Message_Receiver::Mcast_Header
00241 {
00242   int byte_order;
00243   CORBA::ULong request_id;
00244   CORBA::ULong request_size;
00245   CORBA::ULong fragment_size;
00246   CORBA::ULong fragment_offset;
00247   CORBA::ULong fragment_id;
00248   CORBA::ULong fragment_count;
00249   CORBA::ULong crc;
00250   int read (char * header,
00251             size_t bytes_received,
00252             CORBA::Boolean checkcrc = 0);
00253 };
00254 
00255 // ****************************************************************
00256 
00257 /// Once init() has been called:
00258 /// Invariant: id_range_high_- id_range_low_ == size_ - 1
00259 class TAO_ECG_CDR_Message_Receiver::Requests
00260 {
00261 public:
00262 
00263   Requests (void);
00264   ~Requests (void);
00265 
00266   /// Allocates and initializes <fragmented_requests_>.
00267   int init (size_t size, size_t min_purge_count);
00268 
00269   /// Returns pointer to a <fragmented_requests_> element
00270   /// representing <request_id>.
00271   /**
00272    * If <request_id> < <id_range_low> return 0.
00273    * If <request_id> > <id_range_high>, shift the range so it
00274    * includes <request_id>, purging incomplete requests as needed.
00275    */
00276   TAO_ECG_UDP_Request_Entry ** get_request (CORBA::ULong request_id);
00277 
00278 private:
00279 
00280   /// Delete any outstanding requests with ids in the range
00281   /// [<purge_first>, <purge_last>] from <fragmented_requests> and
00282   /// and reset their slots.
00283   void purge_requests (CORBA::ULong purge_first,
00284                        CORBA::ULong purge_last);
00285 
00286   Requests & operator= (const Requests &rhs);
00287   Requests (const Requests &rhs);
00288 
00289 private:
00290   /// Array, used in a circular fashion, that stores partially received
00291   /// requests (and info on which requests have been fully received
00292   /// and processed) for a range of request ids.
00293   TAO_ECG_UDP_Request_Entry** fragmented_requests_;
00294 
00295   /// Size of <fragmented_requests_> array.
00296   size_t size_;
00297 
00298   /// The range of request ids, currently represented in
00299   /// <fragmented_requests>.
00300   //@{
00301   CORBA::ULong id_range_low_;
00302   CORBA::ULong id_range_high_;
00303   //@}
00304 
00305   /// Minimum range shifting amount.
00306   size_t min_purge_count_;
00307 };
00308 
00309 TAO_END_VERSIONED_NAMESPACE_DECL
00310 
00311 #if defined(__ACE_INLINE__)
00312 #include "orbsvcs/Event/ECG_CDR_Message_Receiver.inl"
00313 #endif /* __ACE_INLINE__ */
00314 
00315 #include /**/ "ace/post.h"
00316 
00317 #endif /* TAO_ECG_CDR_MESSAGE_RECEIVER_H */

Generated on Tue Feb 2 17:44:06 2010 for TAO_RTEvent by  doxygen 1.4.7