00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #ifndef TAO_AV_SFP_H
00014 #define TAO_AV_SFP_H
00015 #include "ace/pre.h"
00016
00017 #include "orbsvcs/AV/AV_export.h"
00018
00019 #include "orbsvcs/AV/Policy.h"
00020 #include "orbsvcs/AV/MCast.h"
00021 #include "orbsvcs/AV/AVStreams_i.h"
00022 #include "orbsvcs/AV/UDP.h"
00023
00024 #include "orbsvcs/sfpC.h"
00025
00026 #include "tao/CDR.h"
00027
00028 #include "ace/SOCK_Dgram.h"
00029 #include "ace/INET_Addr.h"
00030
00031
00032 #define TAO_SFP_MAGIC_NUMBER_LEN 4
00033 #define TAO_SFP_MESSAGE_TYPE_OFFSET 5
00034 #define TAO_SFP_WRITEV_MAX 128
00035
00036 #define TAO_SFP_MAX_PACKET_SIZE ACE_MAX_DGRAM_SIZE
00037
00038 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00039
00040
00041
00042
00043
00044 class TAO_SFP_Fragment_Node
00045 {
00046 public:
00047 TAO_SFP_Fragment_Node (void) : data_ (0) {}
00048 flowProtocol::fragment fragment_info_;
00049 ACE_Message_Block *data_;
00050 friend bool operator< (const TAO_SFP_Fragment_Node& left,
00051 const TAO_SFP_Fragment_Node& right);
00052 };
00053
00054
00055
00056
00057
00058 class TAO_SFP_Fragment_Table_Entry
00059 {
00060 public:
00061 TAO_SFP_Fragment_Table_Entry (void)
00062 :last_received_ (0),
00063 num_fragments_ (0)
00064 {}
00065 int last_received_;
00066 size_t num_fragments_;
00067 TAO_AV_frame_info frame_info;
00068 ACE_Ordered_MultiSet<TAO_SFP_Fragment_Node> fragment_set_;
00069 };
00070
00071 typedef ACE_Ordered_MultiSet_Iterator<TAO_SFP_Fragment_Node> FRAGMENT_SET_ITERATOR;
00072 typedef ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex> TAO_SFP_Fragment_Table;
00073 typedef ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table*,ACE_Null_Mutex> TAO_SFP_Fragment_Table_Map;
00074
00075
00076
00077
00078
00079 class TAO_AV_Export TAO_SFP_Frame_State
00080 {
00081 public:
00082 TAO_SFP_Frame_State (void);
00083 CORBA::Boolean is_complete (void);
00084
00085 int reset (void);
00086
00087 TAO_InputCDR cdr;
00088
00089 flowProtocol::frameHeader frame_header_;
00090 flowProtocol::fragment fragment_;
00091 flowProtocol::frame frame_;
00092 CORBA::Boolean more_fragments_;
00093 ACE_Message_Block *frame_block_;
00094
00095 ACE_Message_Block static_frame_;
00096 TAO_SFP_Fragment_Table_Map fragment_table_map_;
00097 };
00098
00099 class TAO_AV_Transport;
00100 class TAO_AV_Core;
00101
00102
00103
00104
00105
00106 class TAO_AV_Export TAO_SFP_Base
00107 {
00108 public:
00109
00110 static const char TAO_SFP_ORB_ARGUMENTS[];
00111
00112
00113 static const char TAO_SFP_MAGIC_NUMBER[];
00114 static const char TAO_SFP_FRAGMENT_MAGIC_NUMBER[];
00115 static const char TAO_SFP_START_MAGIC_NUMBER[];
00116 static const char TAO_SFP_CREDIT_MAGIC_NUMBER[];
00117 static const char TAO_SFP_STARTREPLY_MAGIC_NUMBER[];
00118
00119
00120 static const unsigned char TAO_SFP_MAJOR_VERSION;
00121 static const unsigned char TAO_SFP_MINOR_VERSION;
00122
00123
00124 static const unsigned char TAO_SFP_FRAME_HEADER_LEN;
00125 static const unsigned char TAO_SFP_MESSAGE_SIZE_OFFSET;
00126 static const unsigned char TAO_SFP_FRAGMENT_SIZE_OFFSET;
00127 static u_int frame_header_len;
00128 static u_int start_reply_len;
00129 static u_int start_len;
00130 static u_int credit_len;
00131 static u_int fragment_len;
00132
00133 enum State
00134 {
00135 ACTIVE_START,
00136 PASSIVE_START,
00137 TIMEDOUT_T1,
00138 TIMEDOUT_T2,
00139 REPLY_RECEIVED,
00140 START_RECEIVED
00141 };
00142
00143 TAO_SFP_Base (void);
00144 static CORBA::Boolean start_frame (CORBA::Octet flags,
00145 flowProtocol::MsgType type,
00146 TAO_OutputCDR &msg);
00147
00148 static CORBA::Boolean write_start_message (TAO_OutputCDR &msg);
00149 static CORBA::Boolean write_start_reply_message (TAO_OutputCDR &msg);
00150 static CORBA::Boolean write_credit_message (CORBA::ULong cred_num,
00151 TAO_OutputCDR &msg);
00152 static CORBA::Boolean write_fragment_message (CORBA::Octet flags,
00153 CORBA::ULong fragment_number,
00154 CORBA::ULong sequence_number,
00155 CORBA::ULong source_id,
00156 TAO_OutputCDR &msg);
00157
00158 static CORBA::Boolean write_frame_message (CORBA::ULong timestamp,
00159 CORBA::ULong synchSource,
00160 flowProtocol::my_seq_ulong source_ids,
00161 CORBA::ULong sequence_num,
00162 TAO_OutputCDR &msg);
00163
00164 static int send_message (TAO_AV_Transport *transport,
00165 TAO_OutputCDR &stream,
00166 ACE_Message_Block *mb = 0);
00167 static int peek_message_type (TAO_AV_Transport *transport,
00168 flowProtocol::MsgType &type);
00169 static int read_start_message (TAO_AV_Transport *transport,
00170 flowProtocol::Start &start,
00171 TAO_InputCDR &cdr);
00172 static int read_start_reply_message (TAO_AV_Transport *transport,
00173 flowProtocol::StartReply &start_reply,
00174 TAO_InputCDR &cdr);
00175 static int read_credit_message (TAO_AV_Transport *transport,
00176 flowProtocol::credit &credit,
00177 TAO_InputCDR &cdr);
00178 static int read_endofstream_message (TAO_AV_Transport *transport,
00179 flowProtocol::frameHeader &endofstream,
00180 TAO_InputCDR &cdr);
00181
00182 static int read_frame (TAO_AV_Transport *transport,
00183 flowProtocol::frameHeader &frame_header,
00184 TAO_SFP_Frame_State &state,
00185 TAO_AV_frame_info *&frame_info);
00186
00187 static int read_fragment (TAO_AV_Transport *transport,
00188 flowProtocol::fragment &fragment,
00189 TAO_SFP_Frame_State &state,
00190 TAO_AV_frame_info *&frame_info);
00191
00192 static int peek_frame_header (TAO_AV_Transport *transport,
00193 flowProtocol::frameHeader &header,
00194 TAO_InputCDR &cdr);
00195
00196 static int peek_fragment_header (TAO_AV_Transport *transport,
00197 flowProtocol::fragment &fragment,
00198 TAO_InputCDR &cdr);
00199
00200 static int handle_input (TAO_AV_Transport *transport,
00201 TAO_SFP_Frame_State &state,
00202 TAO_AV_frame_info *&frame_info);
00203
00204 static ACE_Message_Block* check_all_fragments (TAO_SFP_Fragment_Table_Entry *fragment_entry);
00205
00206 protected:
00207 static void dump_buf (char *buf,int n);
00208
00209 };
00210
00211
00212 typedef ACE_Singleton <TAO_SFP_Base,TAO_SYNCH_MUTEX> TAO_SFP_BASE;
00213
00214
00215
00216
00217
00218 class TAO_AV_Export TAO_SFP_Object : public TAO_AV_Protocol_Object
00219 {
00220 public:
00221 TAO_SFP_Object (TAO_AV_Callback *callback,
00222 TAO_AV_Transport *transport);
00223
00224
00225 virtual ~TAO_SFP_Object (void);
00226
00227
00228 virtual int handle_input (void) = 0;
00229 virtual int send_frame (ACE_Message_Block *frame,
00230 TAO_AV_frame_info *frame_info = 0);
00231
00232 virtual int send_frame (const iovec *iov,
00233 int iovcnt,
00234 TAO_AV_frame_info *frame_info = 0);
00235
00236 virtual int send_frame (const char*buf,
00237 size_t len);
00238
00239 virtual int destroy (void);
00240 virtual int set_policies (const TAO_AV_PolicyList &policies);
00241
00242 protected:
00243 ACE_Message_Block *get_fragment (ACE_Message_Block *&frame,
00244 size_t initial_len,
00245 size_t &last_mb_orig_len,
00246 size_t &last_mb_current_len);
00247 CORBA::ULong sequence_num_;
00248 CORBA::ULong source_id_;
00249 CORBA::Long max_credit_;
00250 CORBA::Long current_credit_;
00251 TAO_SFP_Frame_State state_;
00252 };
00253
00254
00255
00256
00257
00258 class TAO_AV_Export TAO_SFP_Producer_Object : public TAO_SFP_Object
00259 {
00260 public:
00261 TAO_SFP_Producer_Object (TAO_AV_Callback *callback,
00262 TAO_AV_Transport *transport,
00263 const char *flow_options);
00264 virtual int handle_input (void);
00265 protected:
00266 CORBA::ULong credit_sequence_num_;
00267 };
00268
00269
00270
00271
00272
00273 class TAO_AV_Export TAO_SFP_Consumer_Object : public TAO_SFP_Object
00274 {
00275 public:
00276 TAO_SFP_Consumer_Object (TAO_AV_Callback *callback,
00277 TAO_AV_Transport *transport,
00278 ACE_CString& flow_options);
00279 virtual int handle_input (void);
00280 };
00281
00282
00283
00284
00285
00286 class TAO_AV_Export TAO_AV_SFP_Factory : public TAO_AV_Flow_Protocol_Factory
00287 {
00288 public:
00289 TAO_AV_SFP_Factory (void);
00290 virtual ~TAO_AV_SFP_Factory (void);
00291 virtual int init (int argc, char *argv[]);
00292
00293 virtual int match_protocol (const char *flow_string);
00294 virtual TAO_AV_Protocol_Object* make_protocol_object (TAO_FlowSpec_Entry *entry,
00295 TAO_Base_StreamEndPoint *endpoint,
00296 TAO_AV_Flow_Handler *handler,
00297 TAO_AV_Transport *transport);
00298 };
00299
00300 TAO_END_VERSIONED_NAMESPACE_DECL
00301
00302 ACE_STATIC_SVC_DECLARE (TAO_AV_SFP_Flow_Factory)
00303 ACE_FACTORY_DECLARE (TAO_AV, TAO_AV_SFP_Flow_Factory)
00304
00305 #include "ace/post.h"
00306 #endif