sfp.h

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 
00003 // ============================================================================
00004 /**
00005  *  @file  sfp.h
00006  *
00007  *  $Id: sfp.h 71526 2006-03-14 06:14:35Z jtc $
00008  *
00009  *  @author  Nagarajan Surendran <naga@cs.wustl.edu>
00010  */
00011 // ============================================================================
00012 
00013 #ifndef TAO_AV_SFP_H
00014 #define TAO_AV_SFP_H
00015 #include /**/ "ace/pre.h"
00016 
00017 #include "orbsvcs/AV/AV_export.h"
00018 
00019 #include "orbsvcs/AV/Policy.h"
00020 #include "orbsvcs/AV/MCast.h"
00021 #include "orbsvcs/AV/AVStreams_i.h"
00022 #include "orbsvcs/AV/UDP.h"
00023 
00024 #include "orbsvcs/sfpC.h"
00025 
00026 #include "tao/CDR.h"
00027 
00028 #include "ace/SOCK_Dgram.h"
00029 #include "ace/INET_Addr.h"
00030 
00031 
00032 #define TAO_SFP_MAGIC_NUMBER_LEN 4
00033 #define TAO_SFP_MESSAGE_TYPE_OFFSET 5
00034 #define TAO_SFP_WRITEV_MAX 128
00035 
00036 #define TAO_SFP_MAX_PACKET_SIZE ACE_MAX_DGRAM_SIZE
00037 
00038 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00039 
00040 /**
00041  * @class TAO_SFP_Fragment_Node
00042  * @brief
00043  */
00044 class TAO_SFP_Fragment_Node
00045 {
00046 public:
00047   TAO_SFP_Fragment_Node (void) : data_ (0) {}
00048   flowProtocol::fragment fragment_info_;
00049   ACE_Message_Block *data_;
00050   friend bool operator< (const TAO_SFP_Fragment_Node& left,
00051                          const TAO_SFP_Fragment_Node& right);
00052 };
00053 
00054 /**
00055  * @class TAO_SFP_Fragment_Table_Entry
00056  * @brief
00057  */
00058 class TAO_SFP_Fragment_Table_Entry
00059 {
00060 public:
00061   TAO_SFP_Fragment_Table_Entry (void)
00062     :last_received_ (0),
00063      num_fragments_ (0)
00064     {}
00065   int last_received_;
00066   size_t num_fragments_;
00067   TAO_AV_frame_info frame_info;
00068   ACE_Ordered_MultiSet<TAO_SFP_Fragment_Node> fragment_set_;
00069 };
00070 
00071 typedef ACE_Ordered_MultiSet_Iterator<TAO_SFP_Fragment_Node> FRAGMENT_SET_ITERATOR;
00072 typedef ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex> TAO_SFP_Fragment_Table;
00073 typedef ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table*,ACE_Null_Mutex> TAO_SFP_Fragment_Table_Map;
00074 
00075 /**
00076  * @class TAO_SFP_Frame_State
00077  * @brief
00078  */
00079 class TAO_AV_Export TAO_SFP_Frame_State
00080 {
00081 public:
00082   TAO_SFP_Frame_State (void);
00083   CORBA::Boolean is_complete (void);
00084 
00085   int reset (void);
00086 
00087   TAO_InputCDR cdr;
00088   // This is the InputCDR that will be used to decode the message.
00089   flowProtocol::frameHeader frame_header_;
00090   flowProtocol::fragment fragment_;
00091   flowProtocol::frame frame_;
00092   CORBA::Boolean more_fragments_;
00093   ACE_Message_Block *frame_block_;
00094   // boolean flags indicating that there are more fragments.
00095   ACE_Message_Block static_frame_;
00096   TAO_SFP_Fragment_Table_Map fragment_table_map_;
00097 };
00098 
00099 class TAO_AV_Transport;
00100 class TAO_AV_Core;
00101 
00102 /**
00103  * @class TAO_SFP_Base
00104  * @brief
00105  */
00106 class TAO_AV_Export TAO_SFP_Base
00107 {
00108 public:
00109   // default arguments to pass to use for the ORB
00110   static const char TAO_SFP_ORB_ARGUMENTS[];
00111 
00112   // SFP magic numbers
00113   static const char TAO_SFP_MAGIC_NUMBER[];
00114   static const char TAO_SFP_FRAGMENT_MAGIC_NUMBER[];
00115   static const char TAO_SFP_START_MAGIC_NUMBER[];
00116   static const char TAO_SFP_CREDIT_MAGIC_NUMBER[];
00117   static const char TAO_SFP_STARTREPLY_MAGIC_NUMBER[];
00118 
00119   // SFP version 1.0
00120   static const unsigned char TAO_SFP_MAJOR_VERSION;
00121   static const unsigned char TAO_SFP_MINOR_VERSION;
00122 
00123   // lengths of various SFP headers
00124   static const unsigned char TAO_SFP_FRAME_HEADER_LEN;
00125   static const unsigned char TAO_SFP_MESSAGE_SIZE_OFFSET;
00126   static const unsigned char TAO_SFP_FRAGMENT_SIZE_OFFSET;
00127   static u_int frame_header_len;
00128   static u_int start_reply_len;
00129   static u_int start_len;
00130   static u_int credit_len;
00131   static u_int fragment_len;
00132 
00133   enum State
00134   {
00135     ACTIVE_START,
00136     PASSIVE_START,
00137     TIMEDOUT_T1,
00138     TIMEDOUT_T2,
00139     REPLY_RECEIVED,
00140     START_RECEIVED
00141   };
00142 
00143   TAO_SFP_Base (void);
00144   static CORBA::Boolean start_frame (CORBA::Octet flags,
00145                                      flowProtocol::MsgType type,
00146                                      TAO_OutputCDR &msg);
00147 
00148   static CORBA::Boolean write_start_message (TAO_OutputCDR &msg);
00149   static CORBA::Boolean write_start_reply_message (TAO_OutputCDR &msg);
00150   static CORBA::Boolean write_credit_message (CORBA::ULong cred_num,
00151                                               TAO_OutputCDR &msg);
00152   static CORBA::Boolean write_fragment_message (CORBA::Octet flags,
00153                                                 CORBA::ULong fragment_number,
00154                                                 CORBA::ULong sequence_number,
00155                                                 CORBA::ULong source_id,
00156                                                 TAO_OutputCDR &msg);
00157 
00158   static CORBA::Boolean write_frame_message (CORBA::ULong timestamp,
00159                                              CORBA::ULong synchSource,
00160                                              flowProtocol::my_seq_ulong source_ids,
00161                                              CORBA::ULong sequence_num,
00162                                              TAO_OutputCDR &msg);
00163 
00164   static int send_message (TAO_AV_Transport *transport,
00165                            TAO_OutputCDR &stream,
00166                            ACE_Message_Block *mb = 0);
00167   static int peek_message_type (TAO_AV_Transport *transport,
00168                                 flowProtocol::MsgType &type);
00169   static int read_start_message (TAO_AV_Transport *transport,
00170                                  flowProtocol::Start &start,
00171                                  TAO_InputCDR &cdr);
00172   static int read_start_reply_message (TAO_AV_Transport *transport,
00173                                        flowProtocol::StartReply &start_reply,
00174                                        TAO_InputCDR &cdr);
00175   static int read_credit_message (TAO_AV_Transport *transport,
00176                                   flowProtocol::credit &credit,
00177                                   TAO_InputCDR &cdr);
00178   static int read_endofstream_message (TAO_AV_Transport *transport,
00179                                        flowProtocol::frameHeader &endofstream,
00180                                        TAO_InputCDR &cdr);
00181 
00182   static int read_frame (TAO_AV_Transport *transport,
00183                          flowProtocol::frameHeader &frame_header,
00184                          TAO_SFP_Frame_State &state,
00185                          TAO_AV_frame_info *&frame_info);
00186 
00187   static int read_fragment (TAO_AV_Transport *transport,
00188                             flowProtocol::fragment &fragment,
00189                             TAO_SFP_Frame_State &state,
00190                             TAO_AV_frame_info *&frame_info);
00191 
00192   static int peek_frame_header (TAO_AV_Transport *transport,
00193                                 flowProtocol::frameHeader &header,
00194                                 TAO_InputCDR &cdr);
00195 
00196   static int peek_fragment_header (TAO_AV_Transport *transport,
00197                                    flowProtocol::fragment &fragment,
00198                                    TAO_InputCDR &cdr);
00199 
00200   static int handle_input (TAO_AV_Transport *transport,
00201                            TAO_SFP_Frame_State &state,
00202                            TAO_AV_frame_info *&frame_info);
00203 
00204   static ACE_Message_Block* check_all_fragments (TAO_SFP_Fragment_Table_Entry *fragment_entry);
00205 
00206 protected:
00207   static void dump_buf (char *buf,int n);
00208   // dumps the buffer to the screen.
00209 };
00210 
00211 // Beware the SFP_Base code relies on the Singleton being initialized.
00212 typedef ACE_Singleton <TAO_SFP_Base,TAO_SYNCH_MUTEX> TAO_SFP_BASE;
00213 
00214 /**
00215  * @class TAO_SFP_Object
00216  * @brief
00217  */
00218 class TAO_AV_Export TAO_SFP_Object  : public TAO_AV_Protocol_Object
00219 {
00220 public:
00221   TAO_SFP_Object (TAO_AV_Callback *callback,
00222                   TAO_AV_Transport *transport);
00223   // We should add a sfp options parameter.
00224 
00225   virtual ~TAO_SFP_Object (void);
00226   // Dtor
00227 
00228   virtual int handle_input (void) = 0;
00229   virtual int send_frame (ACE_Message_Block *frame,
00230                           TAO_AV_frame_info *frame_info = 0);
00231 
00232   virtual int send_frame (const iovec *iov,
00233                           int iovcnt,
00234                           TAO_AV_frame_info *frame_info = 0);
00235 
00236   virtual int send_frame (const char*buf,
00237                           size_t len);
00238 
00239   virtual int destroy (void);
00240   virtual int set_policies (const TAO_AV_PolicyList &policies);
00241 
00242 protected:
00243   ACE_Message_Block *get_fragment (ACE_Message_Block *&frame,
00244                                    size_t initial_len,
00245                                    size_t &last_mb_orig_len,
00246                                    size_t &last_mb_current_len);
00247   CORBA::ULong sequence_num_;
00248   CORBA::ULong source_id_;
00249   CORBA::Long max_credit_;
00250   CORBA::Long current_credit_;
00251   TAO_SFP_Frame_State state_;
00252 };
00253 
00254 /**
00255  * @class TAO_SFP_Producer_Object
00256  * @brief
00257  */
00258 class TAO_AV_Export TAO_SFP_Producer_Object : public TAO_SFP_Object
00259 {
00260 public:
00261   TAO_SFP_Producer_Object (TAO_AV_Callback *callback,
00262                            TAO_AV_Transport *transport,
00263                            const char *flow_options);
00264   virtual int handle_input (void);
00265 protected:
00266   CORBA::ULong credit_sequence_num_;
00267 };
00268 
00269 /**
00270  * @class TAO_SFP_Consumer_Object
00271  * @brief
00272  */
00273 class TAO_AV_Export TAO_SFP_Consumer_Object : public TAO_SFP_Object
00274 {
00275 public:
00276   TAO_SFP_Consumer_Object (TAO_AV_Callback *callback,
00277                            TAO_AV_Transport *transport,
00278                            ACE_CString& flow_options);
00279   virtual int handle_input (void);
00280 };
00281 
00282 /**
00283  * @class TAO_AV_SFP_Factory
00284  * @brief
00285  */
00286 class TAO_AV_Export TAO_AV_SFP_Factory : public TAO_AV_Flow_Protocol_Factory
00287 {
00288 public:
00289   TAO_AV_SFP_Factory (void);
00290   virtual ~TAO_AV_SFP_Factory (void);
00291   virtual int init (int argc, char *argv[]);
00292   // Initialization hook.
00293   virtual int match_protocol (const char *flow_string);
00294   virtual TAO_AV_Protocol_Object* make_protocol_object (TAO_FlowSpec_Entry *entry,
00295                                                         TAO_Base_StreamEndPoint *endpoint,
00296                                                         TAO_AV_Flow_Handler *handler,
00297                                                         TAO_AV_Transport *transport);
00298 };
00299 
00300 TAO_END_VERSIONED_NAMESPACE_DECL
00301 
00302 ACE_STATIC_SVC_DECLARE (TAO_AV_SFP_Flow_Factory)
00303 ACE_FACTORY_DECLARE (TAO_AV, TAO_AV_SFP_Flow_Factory)
00304 
00305 #include /**/ "ace/post.h"
00306 #endif /* TAO_SFP_H */

Generated on Tue Feb 2 17:47:49 2010 for TAO_AV by  doxygen 1.4.7