#include <sfp.h>
Inheritance diagram for TAO_SFP_Object:


Public Member Functions | |
| TAO_SFP_Object (TAO_AV_Callback *callback, TAO_AV_Transport *transport) | |
| virtual | ~TAO_SFP_Object (void) |
| virtual int | handle_input (void)=0 |
| virtual int | send_frame (ACE_Message_Block *frame, TAO_AV_frame_info *frame_info=0) |
| send a data frame. | |
| virtual int | send_frame (const iovec *iov, int iovcnt, TAO_AV_frame_info *frame_info=0) |
| send a frame in iovecs. | |
| virtual int | send_frame (const char *buf, size_t len) |
| virtual int | destroy (void) |
| virtual int | set_policies (const TAO_AV_PolicyList &policies) |
| set/get policies. | |
Protected Member Functions | |
| ACE_Message_Block * | get_fragment (ACE_Message_Block *&frame, size_t initial_len, size_t &last_mb_orig_len, size_t &last_mb_current_len) |
Protected Attributes | |
| CORBA::ULong | sequence_num_ |
| CORBA::ULong | source_id_ |
| CORBA::Long | max_credit_ |
| CORBA::Long | current_credit_ |
| TAO_SFP_Frame_State | state_ |
|
||||||||||||
|
Definition at line 836 of file sfp.cpp. References ACE_Singleton< TYPE, ACE_LOCK >::instance(), ACE_Message_Block::size(), and TAO_SFP_Frame_State::static_frame_.
00838 :TAO_AV_Protocol_Object (callback,transport), 00839 source_id_ (10), 00840 max_credit_ (-1), 00841 current_credit_ (-1) 00842 { 00843 TAO_SFP_BASE::instance (); 00844 this->state_.static_frame_.size (2* this->transport_->mtu ()); 00845 } |
|
|
Definition at line 847 of file sfp.cpp.
00848 {
00849 //no-op
00850 }
|
|
|
Implements TAO_AV_Protocol_Object. Definition at line 853 of file sfp.cpp. References TAO_AV_Callback::handle_destroy(), TAO_SFP_Base::send_message(), TAO_SFP_Base::start_frame(), and TAO_ENCAP_BYTE_ORDER.
00854 {
00855 int result = -1;
00856 TAO_OutputCDR out_stream;
00857 result = TAO_SFP_Base::start_frame (TAO_ENCAP_BYTE_ORDER,
00858 flowProtocol::EndofStream_Msg,
00859 out_stream);
00860 if (result < 0)
00861 return result;
00862 result = TAO_SFP_Base::send_message (this->transport_,
00863 out_stream);
00864 if (result < 0)
00865 return result;
00866 this->callback_->handle_destroy ();
00867 return 0;
00868 }
|
|
||||||||||||||||||||
|
Definition at line 1044 of file sfp.cpp. References ACE_Message_Block::cont(), ACE_Message_Block::duplicate(), ACE_Message_Block::length(), and TAO_SFP_MAX_PACKET_SIZE. Referenced by send_frame().
01048 {
01049 ACE_Message_Block *fragment_mb = 0,*temp_mb = 0;
01050 size_t prev_len,last_len = 0;
01051 size_t current_len = 0;
01052 size_t message_len = initial_len;
01053 while (mb != 0)
01054 {
01055 prev_len = message_len;
01056 message_len += mb->length ();
01057 if (fragment_mb == 0)
01058 fragment_mb = temp_mb = mb->duplicate ();
01059 if (message_len > TAO_SFP_MAX_PACKET_SIZE)
01060 {
01061 // get only the length that we can accomodate.
01062 current_len = TAO_SFP_MAX_PACKET_SIZE - prev_len;
01063 if (current_len < mb->length ())
01064 {
01065 // The above condition is an assertion.
01066 message_len += (current_len-mb->length ());
01067 last_len = mb->length ();
01068 mb->length (current_len);
01069 temp_mb->length (current_len);
01070 }
01071 break;
01072 }
01073 else
01074 {
01075 // we can accomodate this message block
01076 message_len += mb->length ();
01077 mb = mb->cont ();
01078 temp_mb = temp_mb->cont ();
01079 }
01080 }
01081 last_mb_orig_len = last_len;
01082 last_mb_current_len = current_len;
01083 return fragment_mb;
01084 }
|
|
|
Implements TAO_AV_Protocol_Object. Implemented in TAO_SFP_Producer_Object, and TAO_SFP_Consumer_Object. |
|
||||||||||||
|
Implements TAO_AV_Protocol_Object. Definition at line 1036 of file sfp.cpp.
01038 {
01039 return 0;
01040 }
|
|
||||||||||||||||
|
send a frame in iovecs.
Implements TAO_AV_Protocol_Object. Definition at line 1028 of file sfp.cpp. References ACE_ERROR_RETURN, and LM_ERROR.
01031 {
01032 ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SFP_Object::send_frame"),-1);
01033 }
|
|
||||||||||||
|
send a data frame.
Implements TAO_AV_Protocol_Object. Definition at line 871 of file sfp.cpp. References ACE_DEBUG, ACE_ERROR_RETURN, TAO_AV_frame_info::boundary_marker, ACE_Message_Block::cont(), current_credit_, get_fragment(), ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, max_credit_, flowProtocol::my_seq_ulong, ACE_Message_Block::rd_ptr(), ACE_OutputCDR::reset(), TAO_SFP_Base::send_message(), ACE_OS::sleep(), TAO_AV_frame_info::ssrc, TAO_SFP_Base::start_frame(), TAO_debug_level, TAO_ENCAP_BYTE_ORDER, TAO_SFP_MAX_PACKET_SIZE, TAO_AV_frame_info::timestamp, ACE_OutputCDR::total_length(), TAO_SFP_Base::write_fragment_message(), and TAO_SFP_Base::write_frame_message().
00873 {
00874 TAO_OutputCDR out_stream;
00875 CORBA::Boolean result = 0;
00876 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_SFP_Object::send_frame\n"));
00877 CORBA::Octet flags = TAO_ENCAP_BYTE_ORDER;
00878 if (this->transport_ == 0)
00879 ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP_Object::send_frame: transport is null\n"),-1);
00880 if (this->current_credit_ != 0)
00881 {
00882 // if we have enough credit then we send.
00883 size_t total_length = 0;
00884 for (ACE_Message_Block *temp = frame;temp != 0;temp = temp->cont ())
00885 total_length += temp->length ();
00886 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"total_length of frame=%d\n",total_length));
00887 if (total_length < (TAO_SFP_MAX_PACKET_SIZE -TAO_SFP_Base::frame_header_len))
00888 {
00889 if (frame_info != 0)
00890 {
00891 if (frame_info->boundary_marker)
00892 flags |= 4;
00893 CORBA::Boolean result = TAO_SFP_Base::start_frame (flags,
00894 flowProtocol::Frame_Msg,
00895 out_stream);
00896 if (result == 0)
00897 return 0;
00898 flowProtocol::my_seq_ulong source_ids;
00899 source_ids.length (1);
00900 source_ids [0] = 0;
00901 TAO_SFP_Base::write_frame_message (frame_info->timestamp,
00902 frame_info->ssrc,
00903 source_ids,
00904 this->sequence_num_,
00905 out_stream);
00906 }
00907 else
00908 {
00909 CORBA::Boolean result = TAO_SFP_Base::start_frame (flags,
00910 flowProtocol::SimpleFrame_Msg,
00911 out_stream);
00912 if (result == 0)
00913 return 0;
00914 }
00915 TAO_SFP_Base::send_message (this->transport_,
00916 out_stream,
00917 frame);
00918 }
00919 else // larger frame,fragment and send it.
00920 {
00921 flags = flags | 2;
00922 if (frame_info != 0)
00923 {
00924 if (frame_info->boundary_marker)
00925 flags |= 4;
00926 result = TAO_SFP_Base::start_frame (flags,
00927 flowProtocol::Frame_Msg,
00928 out_stream);
00929 if (result == 0)
00930 return result;
00931 flowProtocol::my_seq_ulong source_ids;
00932 source_ids.length (1);
00933 source_ids [0] = 0;
00934 TAO_SFP_Base::write_frame_message (frame_info->timestamp,
00935 frame_info->ssrc,
00936 source_ids,
00937 this->sequence_num_,
00938 out_stream);
00939 }
00940 else
00941 {
00942 CORBA::Boolean result = TAO_SFP_Base::start_frame (flags,
00943 flowProtocol::SimpleFrame_Msg,
00944 out_stream);
00945 if (result == 0)
00946 return 0;
00947 }
00948 size_t last_len,current_len;
00949 int message_len = static_cast<int> (out_stream.total_length ());
00950 ACE_Message_Block *mb = frame;
00951 ACE_Message_Block *fragment_mb =
00952 this->get_fragment (mb,
00953 message_len,
00954 last_len,
00955 current_len);
00956 // This can be either a simpleframe or a sequenced frame,other types of frames.
00957 TAO_SFP_Base::send_message (this->transport_,
00958 out_stream,
00959 fragment_mb);
00960 out_stream.reset ();
00961 int frag_number = 1;
00962 mb->length (last_len);
00963 mb->rd_ptr (current_len);
00964 // If there is any more data send those as fragments.
00965 while (mb != 0)
00966 {
00967 message_len = TAO_SFP_Base::fragment_len;
00968 fragment_mb = this->get_fragment (mb,
00969 message_len,
00970 last_len,
00971 current_len);
00972 if (mb == 0)
00973 {
00974 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"sending the last fragment\n"));
00975 // This is the last fragment so clear the fragments bit.
00976 flags = TAO_ENCAP_BYTE_ORDER;
00977 }
00978 if (fragment_mb == 0)
00979 break;
00980 if (frame_info != 0)
00981 {
00982 TAO_SFP_Base::write_fragment_message (flags,
00983 frag_number++,
00984 this->sequence_num_,
00985 frame_info->ssrc,
00986 out_stream);
00987 }
00988 else
00989 {
00990 TAO_SFP_Base::write_fragment_message (flags,
00991 frag_number++,
00992 this->sequence_num_,
00993 0,
00994 out_stream);
00995 }
00996 // send the fragment now.
00997 // without the sleep the fragments gets lost!
00998 // probably because the UDP buffer queue on the sender side
00999 // is overflown it drops the packets.
01000 // XXX: This is a hack.
01001 ACE_OS::sleep (1);
01002 result = TAO_SFP_Base::send_message (this->transport_,
01003 out_stream,
01004 fragment_mb);
01005 if (mb != 0)
01006 {
01007 mb->length (last_len);
01008 mb->rd_ptr (current_len);
01009 }
01010 }
01011 // Increment the sequence_num after sending the message.
01012 this->sequence_num_++;
01013 // Also reduce the number of credits.
01014 if (this->max_credit_ > 0)
01015 this->current_credit_--;
01016 }
01017 }
01018 else
01019 {
01020 // flow controlled so wait.
01021 // A greater than 0 value indicates that flow control is being exercised.
01022 return 1;
01023 }
01024 return 0;
01025 }
|
|
|
set/get policies.
Reimplemented from TAO_AV_Protocol_Object. Definition at line 1087 of file sfp.cpp. References TAO::unbounded_value_sequence< T >::length(), max_credit_, TAO_AV_PolicyList, TAO_AV_SFP_CREDIT_POLICY, and TAO_AV_SFP_Credit_Policy::value(). Referenced by TAO_SFP_Consumer_Object::TAO_SFP_Consumer_Object().
01088 {
01089 TAO_AV_Policy *policy = 0;
01090 for (CORBA::ULong i=0;i<policies.length ();i++)
01091 {
01092 policy = policies[i];
01093 switch (policies[i]->type ())
01094 {
01095
01096 case TAO_AV_SFP_CREDIT_POLICY:
01097 {
01098 TAO_AV_SFP_Credit_Policy *credit_policy =
01099 reinterpret_cast<TAO_AV_SFP_Credit_Policy*> (policy);
01100 this->max_credit_ = credit_policy->value ();
01101 }
01102 default:
01103 break;
01104 }
01105 }
01106 return 0;
01107 }
|
|
|
Definition at line 250 of file sfp.h. Referenced by send_frame(). |
|
|
Definition at line 249 of file sfp.h. Referenced by send_frame(), and set_policies(). |
|
|
|
|
|
|
|
|
|
1.3.6