#include <sfp.h>
Inheritance diagram for TAO_SFP_Object:
Public Member Functions | |
TAO_SFP_Object (TAO_AV_Callback *callback, TAO_AV_Transport *transport) | |
virtual | ~TAO_SFP_Object (void) |
virtual int | handle_input (void)=0 |
virtual int | send_frame (ACE_Message_Block *frame, TAO_AV_frame_info *frame_info=0) |
send a data frame. | |
virtual int | send_frame (const iovec *iov, int iovcnt, TAO_AV_frame_info *frame_info=0) |
send a frame in iovecs. | |
virtual int | send_frame (const char *buf, size_t len) |
virtual int | destroy (void) |
virtual int | set_policies (const TAO_AV_PolicyList &policies) |
set/get policies. | |
Protected Member Functions | |
ACE_Message_Block * | get_fragment (ACE_Message_Block *&frame, size_t initial_len, size_t &last_mb_orig_len, size_t &last_mb_current_len) |
Protected Attributes | |
CORBA::ULong | sequence_num_ |
CORBA::ULong | source_id_ |
CORBA::Long | max_credit_ |
CORBA::Long | current_credit_ |
TAO_SFP_Frame_State | state_ |
Definition at line 218 of file sfp.h.
TAO_SFP_Object::TAO_SFP_Object | ( | TAO_AV_Callback * | callback, | |
TAO_AV_Transport * | transport | |||
) |
Definition at line 836 of file sfp.cpp.
References ACE_Singleton< TYPE, ACE_LOCK >::instance(), ACE_Message_Block::size(), state_, and TAO_SFP_Frame_State::static_frame_.
00838 :TAO_AV_Protocol_Object (callback,transport), 00839 source_id_ (10), 00840 max_credit_ (-1), 00841 current_credit_ (-1) 00842 { 00843 TAO_SFP_BASE::instance (); 00844 this->state_.static_frame_.size (2* this->transport_->mtu ()); 00845 }
TAO_SFP_Object::~TAO_SFP_Object | ( | void | ) | [virtual] |
int TAO_SFP_Object::destroy | ( | void | ) | [virtual] |
Implements TAO_AV_Protocol_Object.
Definition at line 853 of file sfp.cpp.
References TAO_AV_Protocol_Object::callback_, flowProtocol::EndofStream_Msg, TAO_AV_Callback::handle_destroy(), TAO_SFP_Base::send_message(), TAO_SFP_Base::start_frame(), and TAO_ENCAP_BYTE_ORDER.
00854 { 00855 int result = -1; 00856 TAO_OutputCDR out_stream; 00857 result = TAO_SFP_Base::start_frame (TAO_ENCAP_BYTE_ORDER, 00858 flowProtocol::EndofStream_Msg, 00859 out_stream); 00860 if (result < 0) 00861 return result; 00862 result = TAO_SFP_Base::send_message (this->transport_, 00863 out_stream); 00864 if (result < 0) 00865 return result; 00866 this->callback_->handle_destroy (); 00867 return 0; 00868 }
ACE_Message_Block * TAO_SFP_Object::get_fragment | ( | ACE_Message_Block *& | frame, | |
size_t | initial_len, | |||
size_t & | last_mb_orig_len, | |||
size_t & | last_mb_current_len | |||
) | [protected] |
Definition at line 1044 of file sfp.cpp.
References ACE_Message_Block::cont(), ACE_Message_Block::duplicate(), ACE_Message_Block::length(), and TAO_SFP_MAX_PACKET_SIZE.
Referenced by send_frame().
01048 { 01049 ACE_Message_Block *fragment_mb = 0,*temp_mb = 0; 01050 size_t prev_len,last_len = 0; 01051 size_t current_len = 0; 01052 size_t message_len = initial_len; 01053 while (mb != 0) 01054 { 01055 prev_len = message_len; 01056 message_len += mb->length (); 01057 if (fragment_mb == 0) 01058 fragment_mb = temp_mb = mb->duplicate (); 01059 if (message_len > TAO_SFP_MAX_PACKET_SIZE) 01060 { 01061 // get only the length that we can accomodate. 01062 current_len = TAO_SFP_MAX_PACKET_SIZE - prev_len; 01063 if (current_len < mb->length ()) 01064 { 01065 // The above condition is an assertion. 01066 message_len += (current_len-mb->length ()); 01067 last_len = mb->length (); 01068 mb->length (current_len); 01069 temp_mb->length (current_len); 01070 } 01071 break; 01072 } 01073 else 01074 { 01075 // we can accomodate this message block 01076 message_len += mb->length (); 01077 mb = mb->cont (); 01078 temp_mb = temp_mb->cont (); 01079 } 01080 } 01081 last_mb_orig_len = last_len; 01082 last_mb_current_len = current_len; 01083 return fragment_mb; 01084 }
virtual int TAO_SFP_Object::handle_input | ( | void | ) | [pure virtual] |
Implements TAO_AV_Protocol_Object.
Implemented in TAO_SFP_Producer_Object, and TAO_SFP_Consumer_Object.
int TAO_SFP_Object::send_frame | ( | const char * | buf, | |
size_t | len | |||
) | [virtual] |
int TAO_SFP_Object::send_frame | ( | const iovec * | iov, | |
int | iovcnt, | |||
TAO_AV_frame_info * | frame_info = 0 | |||
) | [virtual] |
send a frame in iovecs.
Implements TAO_AV_Protocol_Object.
Definition at line 1028 of file sfp.cpp.
References ACE_ERROR_RETURN, and LM_ERROR.
01031 { 01032 ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SFP_Object::send_frame"),-1); 01033 }
int TAO_SFP_Object::send_frame | ( | ACE_Message_Block * | frame, | |
TAO_AV_frame_info * | frame_info = 0 | |||
) | [virtual] |
send a data frame.
Implements TAO_AV_Protocol_Object.
Definition at line 871 of file sfp.cpp.
References ACE_DEBUG, ACE_ERROR_RETURN, TAO_AV_frame_info::boundary_marker, ACE_Message_Block::cont(), current_credit_, TAO_SFP_Base::fragment_len, TAO_SFP_Base::frame_header_len, flowProtocol::Frame_Msg, get_fragment(), LM_DEBUG, LM_ERROR, ACE_OutputCDR::reset(), TAO_SFP_Base::send_message(), sequence_num_, flowProtocol::SimpleFrame_Msg, ACE_OS::sleep(), TAO_AV_frame_info::ssrc, TAO_SFP_Base::start_frame(), TAO_debug_level, TAO_ENCAP_BYTE_ORDER, TAO_SFP_MAX_PACKET_SIZE, TAO_AV_frame_info::timestamp, ACE_OutputCDR::total_length(), TAO_SFP_Base::write_fragment_message(), and TAO_SFP_Base::write_frame_message().
00873 { 00874 TAO_OutputCDR out_stream; 00875 CORBA::Boolean result = 0; 00876 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_SFP_Object::send_frame\n")); 00877 CORBA::Octet flags = TAO_ENCAP_BYTE_ORDER; 00878 if (this->transport_ == 0) 00879 ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP_Object::send_frame: transport is null\n"),-1); 00880 if (this->current_credit_ != 0) 00881 { 00882 // if we have enough credit then we send. 00883 size_t total_length = 0; 00884 for (ACE_Message_Block *temp = frame;temp != 0;temp = temp->cont ()) 00885 total_length += temp->length (); 00886 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"total_length of frame=%d\n",total_length)); 00887 if (total_length < (TAO_SFP_MAX_PACKET_SIZE -TAO_SFP_Base::frame_header_len)) 00888 { 00889 if (frame_info != 0) 00890 { 00891 if (frame_info->boundary_marker) 00892 flags |= 4; 00893 CORBA::Boolean result = TAO_SFP_Base::start_frame (flags, 00894 flowProtocol::Frame_Msg, 00895 out_stream); 00896 if (result == 0) 00897 return 0; 00898 flowProtocol::my_seq_ulong source_ids; 00899 source_ids.length (1); 00900 source_ids [0] = 0; 00901 TAO_SFP_Base::write_frame_message (frame_info->timestamp, 00902 frame_info->ssrc, 00903 source_ids, 00904 this->sequence_num_, 00905 out_stream); 00906 } 00907 else 00908 { 00909 CORBA::Boolean result = TAO_SFP_Base::start_frame (flags, 00910 flowProtocol::SimpleFrame_Msg, 00911 out_stream); 00912 if (result == 0) 00913 return 0; 00914 } 00915 TAO_SFP_Base::send_message (this->transport_, 00916 out_stream, 00917 frame); 00918 } 00919 else // larger frame,fragment and send it. 00920 { 00921 flags = flags | 2; 00922 if (frame_info != 0) 00923 { 00924 if (frame_info->boundary_marker) 00925 flags |= 4; 00926 result = TAO_SFP_Base::start_frame (flags, 00927 flowProtocol::Frame_Msg, 00928 out_stream); 00929 if (result == 0) 00930 return result; 00931 flowProtocol::my_seq_ulong source_ids; 00932 source_ids.length (1); 00933 source_ids [0] = 0; 00934 TAO_SFP_Base::write_frame_message (frame_info->timestamp, 00935 frame_info->ssrc, 00936 source_ids, 00937 this->sequence_num_, 00938 out_stream); 00939 } 00940 else 00941 { 00942 CORBA::Boolean result = TAO_SFP_Base::start_frame (flags, 00943 flowProtocol::SimpleFrame_Msg, 00944 out_stream); 00945 if (result == 0) 00946 return 0; 00947 } 00948 size_t last_len,current_len; 00949 int message_len = static_cast<int> (out_stream.total_length ()); 00950 ACE_Message_Block *mb = frame; 00951 ACE_Message_Block *fragment_mb = 00952 this->get_fragment (mb, 00953 message_len, 00954 last_len, 00955 current_len); 00956 // This can be either a simpleframe or a sequenced frame,other types of frames. 00957 TAO_SFP_Base::send_message (this->transport_, 00958 out_stream, 00959 fragment_mb); 00960 out_stream.reset (); 00961 int frag_number = 1; 00962 mb->length (last_len); 00963 mb->rd_ptr (current_len); 00964 // If there is any more data send those as fragments. 00965 while (mb != 0) 00966 { 00967 message_len = TAO_SFP_Base::fragment_len; 00968 fragment_mb = this->get_fragment (mb, 00969 message_len, 00970 last_len, 00971 current_len); 00972 if (mb == 0) 00973 { 00974 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"sending the last fragment\n")); 00975 // This is the last fragment so clear the fragments bit. 00976 flags = TAO_ENCAP_BYTE_ORDER; 00977 } 00978 if (fragment_mb == 0) 00979 break; 00980 if (frame_info != 0) 00981 { 00982 TAO_SFP_Base::write_fragment_message (flags, 00983 frag_number++, 00984 this->sequence_num_, 00985 frame_info->ssrc, 00986 out_stream); 00987 } 00988 else 00989 { 00990 TAO_SFP_Base::write_fragment_message (flags, 00991 frag_number++, 00992 this->sequence_num_, 00993 0, 00994 out_stream); 00995 } 00996 // send the fragment now. 00997 // without the sleep the fragments gets lost! 00998 // probably because the UDP buffer queue on the sender side 00999 // is overflown it drops the packets. 01000 // XXX: This is a hack. 01001 ACE_OS::sleep (1); 01002 result = TAO_SFP_Base::send_message (this->transport_, 01003 out_stream, 01004 fragment_mb); 01005 if (mb != 0) 01006 { 01007 mb->length (last_len); 01008 mb->rd_ptr (current_len); 01009 } 01010 } 01011 // Increment the sequence_num after sending the message. 01012 this->sequence_num_++; 01013 // Also reduce the number of credits. 01014 if (this->max_credit_ > 0) 01015 this->current_credit_--; 01016 } 01017 } 01018 else 01019 { 01020 // flow controlled so wait. 01021 // A greater than 0 value indicates that flow control is being exercised. 01022 return 1; 01023 } 01024 return 0; 01025 }
int TAO_SFP_Object::set_policies | ( | const TAO_AV_PolicyList & | policies | ) | [virtual] |
set/get policies.
Reimplemented from TAO_AV_Protocol_Object.
Definition at line 1087 of file sfp.cpp.
References max_credit_, TAO_AV_SFP_CREDIT_POLICY, and TAO_AV_SFP_Credit_Policy::value().
Referenced by TAO_SFP_Consumer_Object::TAO_SFP_Consumer_Object().
01088 { 01089 TAO_AV_Policy *policy = 0; 01090 for (CORBA::ULong i=0;i<policies.length ();i++) 01091 { 01092 policy = policies[i]; 01093 switch (policies[i]->type ()) 01094 { 01095 01096 case TAO_AV_SFP_CREDIT_POLICY: 01097 { 01098 TAO_AV_SFP_Credit_Policy *credit_policy = 01099 reinterpret_cast<TAO_AV_SFP_Credit_Policy*> (policy); 01100 this->max_credit_ = credit_policy->value (); 01101 } 01102 default: 01103 break; 01104 } 01105 } 01106 return 0; 01107 }
CORBA::Long TAO_SFP_Object::current_credit_ [protected] |
Definition at line 250 of file sfp.h.
Referenced by TAO_SFP_Producer_Object::handle_input(), and send_frame().
CORBA::Long TAO_SFP_Object::max_credit_ [protected] |
Definition at line 249 of file sfp.h.
Referenced by TAO_SFP_Producer_Object::handle_input(), set_policies(), and TAO_SFP_Producer_Object::TAO_SFP_Producer_Object().
CORBA::ULong TAO_SFP_Object::sequence_num_ [protected] |
CORBA::ULong TAO_SFP_Object::source_id_ [protected] |
TAO_SFP_Frame_State TAO_SFP_Object::state_ [protected] |
Definition at line 251 of file sfp.h.
Referenced by TAO_SFP_Consumer_Object::handle_input(), and TAO_SFP_Object().