#include <sfp.h>
Public Member Functions | |
TAO_SFP_Object (TAO_AV_Callback *callback, TAO_AV_Transport *transport) | |
virtual | ~TAO_SFP_Object (void) |
virtual int | handle_input (void)=0 |
virtual int | send_frame (ACE_Message_Block *frame, TAO_AV_frame_info *frame_info=0) |
send a data frame. | |
virtual int | send_frame (const iovec *iov, int iovcnt, TAO_AV_frame_info *frame_info=0) |
send a frame in iovecs. | |
virtual int | send_frame (const char *buf, size_t len) |
virtual int | destroy (void) |
virtual int | set_policies (const TAO_AV_PolicyList &policies) |
set/get policies. | |
Protected Member Functions | |
ACE_Message_Block * | get_fragment (ACE_Message_Block *&frame, size_t initial_len, size_t &last_mb_orig_len, size_t &last_mb_current_len) |
Protected Attributes | |
CORBA::ULong | sequence_num_ |
CORBA::ULong | source_id_ |
CORBA::Long | max_credit_ |
CORBA::Long | current_credit_ |
TAO_SFP_Frame_State | state_ |
Definition at line 218 of file sfp.h.
TAO_SFP_Object::TAO_SFP_Object | ( | TAO_AV_Callback * | callback, | |
TAO_AV_Transport * | transport | |||
) |
Definition at line 836 of file sfp.cpp.
:TAO_AV_Protocol_Object (callback,transport), source_id_ (10), max_credit_ (-1), current_credit_ (-1) { TAO_SFP_BASE::instance (); this->state_.static_frame_.size (2* this->transport_->mtu ()); }
TAO_SFP_Object::~TAO_SFP_Object | ( | void | ) | [virtual] |
int TAO_SFP_Object::destroy | ( | void | ) | [virtual] |
Implements TAO_AV_Protocol_Object.
Definition at line 853 of file sfp.cpp.
{ int result = -1; TAO_OutputCDR out_stream; result = TAO_SFP_Base::start_frame (TAO_ENCAP_BYTE_ORDER, flowProtocol::EndofStream_Msg, out_stream); if (result < 0) return result; result = TAO_SFP_Base::send_message (this->transport_, out_stream); if (result < 0) return result; this->callback_->handle_destroy (); return 0; }
ACE_Message_Block * TAO_SFP_Object::get_fragment | ( | ACE_Message_Block *& | frame, | |
size_t | initial_len, | |||
size_t & | last_mb_orig_len, | |||
size_t & | last_mb_current_len | |||
) | [protected] |
Definition at line 1044 of file sfp.cpp.
{ ACE_Message_Block *fragment_mb = 0,*temp_mb = 0; size_t prev_len,last_len = 0; size_t current_len = 0; size_t message_len = initial_len; while (mb != 0) { prev_len = message_len; message_len += mb->length (); if (fragment_mb == 0) fragment_mb = temp_mb = mb->duplicate (); if (message_len > TAO_SFP_MAX_PACKET_SIZE) { // get only the length that we can accomodate. current_len = TAO_SFP_MAX_PACKET_SIZE - prev_len; if (current_len < mb->length ()) { // The above condition is an assertion. message_len += (current_len-mb->length ()); last_len = mb->length (); mb->length (current_len); temp_mb->length (current_len); } break; } else { // we can accomodate this message block message_len += mb->length (); mb = mb->cont (); temp_mb = temp_mb->cont (); } } last_mb_orig_len = last_len; last_mb_current_len = current_len; return fragment_mb; }
virtual int TAO_SFP_Object::handle_input | ( | void | ) | [pure virtual] |
Implements TAO_AV_Protocol_Object.
Implemented in TAO_SFP_Producer_Object, and TAO_SFP_Consumer_Object.
int TAO_SFP_Object::send_frame | ( | ACE_Message_Block * | frame, | |
TAO_AV_frame_info * | frame_info = 0 | |||
) | [virtual] |
send a data frame.
Implements TAO_AV_Protocol_Object.
Definition at line 871 of file sfp.cpp.
{ TAO_OutputCDR out_stream; CORBA::Boolean result = 0; if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_SFP_Object::send_frame\n")); CORBA::Octet flags = TAO_ENCAP_BYTE_ORDER; if (this->transport_ == 0) ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP_Object::send_frame: transport is null\n"),-1); if (this->current_credit_ != 0) { // if we have enough credit then we send. size_t total_length = 0; for (ACE_Message_Block *temp = frame;temp != 0;temp = temp->cont ()) total_length += temp->length (); if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"total_length of frame=%d\n",total_length)); if (total_length < (TAO_SFP_MAX_PACKET_SIZE -TAO_SFP_Base::frame_header_len)) { if (frame_info != 0) { if (frame_info->boundary_marker) flags |= 4; CORBA::Boolean result = TAO_SFP_Base::start_frame (flags, flowProtocol::Frame_Msg, out_stream); if (result == 0) return 0; flowProtocol::my_seq_ulong source_ids; source_ids.length (1); source_ids [0] = 0; TAO_SFP_Base::write_frame_message (frame_info->timestamp, frame_info->ssrc, source_ids, this->sequence_num_, out_stream); } else { CORBA::Boolean result = TAO_SFP_Base::start_frame (flags, flowProtocol::SimpleFrame_Msg, out_stream); if (result == 0) return 0; } TAO_SFP_Base::send_message (this->transport_, out_stream, frame); } else // larger frame,fragment and send it. { flags = flags | 2; if (frame_info != 0) { if (frame_info->boundary_marker) flags |= 4; result = TAO_SFP_Base::start_frame (flags, flowProtocol::Frame_Msg, out_stream); if (result == 0) return result; flowProtocol::my_seq_ulong source_ids; source_ids.length (1); source_ids [0] = 0; TAO_SFP_Base::write_frame_message (frame_info->timestamp, frame_info->ssrc, source_ids, this->sequence_num_, out_stream); } else { CORBA::Boolean result = TAO_SFP_Base::start_frame (flags, flowProtocol::SimpleFrame_Msg, out_stream); if (result == 0) return 0; } size_t last_len,current_len; int message_len = static_cast<int> (out_stream.total_length ()); ACE_Message_Block *mb = frame; ACE_Message_Block *fragment_mb = this->get_fragment (mb, message_len, last_len, current_len); // This can be either a simpleframe or a sequenced frame,other types of frames. TAO_SFP_Base::send_message (this->transport_, out_stream, fragment_mb); out_stream.reset (); int frag_number = 1; mb->length (last_len); mb->rd_ptr (current_len); // If there is any more data send those as fragments. while (mb != 0) { message_len = TAO_SFP_Base::fragment_len; fragment_mb = this->get_fragment (mb, message_len, last_len, current_len); if (mb == 0) { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"sending the last fragment\n")); // This is the last fragment so clear the fragments bit. flags = TAO_ENCAP_BYTE_ORDER; } if (fragment_mb == 0) break; if (frame_info != 0) { TAO_SFP_Base::write_fragment_message (flags, frag_number++, this->sequence_num_, frame_info->ssrc, out_stream); } else { TAO_SFP_Base::write_fragment_message (flags, frag_number++, this->sequence_num_, 0, out_stream); } // send the fragment now. // without the sleep the fragments gets lost! // probably because the UDP buffer queue on the sender side // is overflown it drops the packets. // XXX: This is a hack. ACE_OS::sleep (1); result = TAO_SFP_Base::send_message (this->transport_, out_stream, fragment_mb); if (mb != 0) { mb->length (last_len); mb->rd_ptr (current_len); } } // Increment the sequence_num after sending the message. this->sequence_num_++; // Also reduce the number of credits. if (this->max_credit_ > 0) this->current_credit_--; } } else { // flow controlled so wait. // A greater than 0 value indicates that flow control is being exercised. return 1; } return 0; }
int TAO_SFP_Object::send_frame | ( | const char * | buf, | |
size_t | len | |||
) | [virtual] |
int TAO_SFP_Object::send_frame | ( | const iovec * | iov, | |
int | iovcnt, | |||
TAO_AV_frame_info * | frame_info = 0 | |||
) | [virtual] |
send a frame in iovecs.
Implements TAO_AV_Protocol_Object.
Definition at line 1028 of file sfp.cpp.
{ ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SFP_Object::send_frame"),-1); }
int TAO_SFP_Object::set_policies | ( | const TAO_AV_PolicyList & | policy_list | ) | [virtual] |
set/get policies.
Reimplemented from TAO_AV_Protocol_Object.
Definition at line 1087 of file sfp.cpp.
{ TAO_AV_Policy *policy = 0; for (CORBA::ULong i=0;i<policies.length ();i++) { policy = policies[i]; switch (policies[i]->type ()) { case TAO_AV_SFP_CREDIT_POLICY: { TAO_AV_SFP_Credit_Policy *credit_policy = reinterpret_cast<TAO_AV_SFP_Credit_Policy*> (policy); this->max_credit_ = credit_policy->value (); } default: break; } } return 0; }
CORBA::Long TAO_SFP_Object::current_credit_ [protected] |
CORBA::Long TAO_SFP_Object::max_credit_ [protected] |
CORBA::ULong TAO_SFP_Object::sequence_num_ [protected] |
CORBA::ULong TAO_SFP_Object::source_id_ [protected] |
TAO_SFP_Frame_State TAO_SFP_Object::state_ [protected] |