Public Member Functions | Protected Member Functions | Protected Attributes

TAO_SFP_Object Class Reference

#include <sfp.h>

Inheritance diagram for TAO_SFP_Object:
Inheritance graph
[legend]
Collaboration diagram for TAO_SFP_Object:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 TAO_SFP_Object (TAO_AV_Callback *callback, TAO_AV_Transport *transport)
virtual ~TAO_SFP_Object (void)
virtual int handle_input (void)=0
virtual int send_frame (ACE_Message_Block *frame, TAO_AV_frame_info *frame_info=0)
 send a data frame.
virtual int send_frame (const iovec *iov, int iovcnt, TAO_AV_frame_info *frame_info=0)
 send a frame in iovecs.
virtual int send_frame (const char *buf, size_t len)
virtual int destroy (void)
virtual int set_policies (const TAO_AV_PolicyList &policies)
 set/get policies.

Protected Member Functions

ACE_Message_Blockget_fragment (ACE_Message_Block *&frame, size_t initial_len, size_t &last_mb_orig_len, size_t &last_mb_current_len)

Protected Attributes

CORBA::ULong sequence_num_
CORBA::ULong source_id_
CORBA::Long max_credit_
CORBA::Long current_credit_
TAO_SFP_Frame_State state_

Detailed Description

Definition at line 218 of file sfp.h.


Constructor & Destructor Documentation

TAO_SFP_Object::TAO_SFP_Object ( TAO_AV_Callback callback,
TAO_AV_Transport transport 
)

Definition at line 836 of file sfp.cpp.

  :TAO_AV_Protocol_Object (callback,transport),
   source_id_ (10),
   max_credit_ (-1),
   current_credit_ (-1)
{
  TAO_SFP_BASE::instance ();
  this->state_.static_frame_.size (2* this->transport_->mtu ());
}

TAO_SFP_Object::~TAO_SFP_Object ( void   )  [virtual]

Definition at line 847 of file sfp.cpp.

{
  //no-op
}


Member Function Documentation

int TAO_SFP_Object::destroy ( void   )  [virtual]

Implements TAO_AV_Protocol_Object.

Definition at line 853 of file sfp.cpp.

{
  int result = -1;
  TAO_OutputCDR out_stream;
  result = TAO_SFP_Base::start_frame (TAO_ENCAP_BYTE_ORDER,
                                      flowProtocol::EndofStream_Msg,
                                      out_stream);
  if (result < 0)
    return result;
  result = TAO_SFP_Base::send_message (this->transport_,
                                       out_stream);
  if (result < 0)
    return result;
  this->callback_->handle_destroy ();
  return 0;
}

ACE_Message_Block * TAO_SFP_Object::get_fragment ( ACE_Message_Block *&  frame,
size_t  initial_len,
size_t &  last_mb_orig_len,
size_t &  last_mb_current_len 
) [protected]

Definition at line 1044 of file sfp.cpp.

{
  ACE_Message_Block *fragment_mb = 0,*temp_mb = 0;
  size_t prev_len,last_len = 0;
  size_t current_len = 0;
  size_t message_len = initial_len;
  while (mb != 0)
    {
      prev_len = message_len;
      message_len += mb->length ();
      if (fragment_mb == 0)
        fragment_mb = temp_mb = mb->duplicate ();
      if (message_len > TAO_SFP_MAX_PACKET_SIZE)
        {
          // get only the length that we can accomodate.
          current_len = TAO_SFP_MAX_PACKET_SIZE - prev_len;
          if (current_len < mb->length ())
            {
              // The above condition is an assertion.
              message_len += (current_len-mb->length ());
              last_len = mb->length ();
              mb->length (current_len);
              temp_mb->length (current_len);
            }
          break;
        }
      else
        {
          // we can accomodate this message block
          message_len += mb->length ();
          mb = mb->cont ();
          temp_mb = temp_mb->cont ();
        }
    }
  last_mb_orig_len = last_len;
  last_mb_current_len = current_len;
  return fragment_mb;
}

virtual int TAO_SFP_Object::handle_input ( void   )  [pure virtual]
int TAO_SFP_Object::send_frame ( ACE_Message_Block frame,
TAO_AV_frame_info frame_info = 0 
) [virtual]

send a data frame.

Implements TAO_AV_Protocol_Object.

Definition at line 871 of file sfp.cpp.

{
  TAO_OutputCDR out_stream;
  CORBA::Boolean result = 0;
  if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_SFP_Object::send_frame\n"));
  CORBA::Octet flags = TAO_ENCAP_BYTE_ORDER;
  if (this->transport_ == 0)
    ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP_Object::send_frame: transport is null\n"),-1);
   if (this->current_credit_ != 0)
    {
      // if we have enough credit then we send.
      size_t total_length = 0;
      for (ACE_Message_Block *temp = frame;temp != 0;temp = temp->cont ())
        total_length += temp->length ();
      if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"total_length of frame=%d\n",total_length));
      if (total_length < (TAO_SFP_MAX_PACKET_SIZE -TAO_SFP_Base::frame_header_len))
        {
          if (frame_info != 0)
            {
              if (frame_info->boundary_marker)
                flags |= 4;
              CORBA::Boolean result = TAO_SFP_Base::start_frame (flags,
                                                                 flowProtocol::Frame_Msg,
                                                                 out_stream);
              if (result == 0)
                return 0;
              flowProtocol::my_seq_ulong source_ids;
              source_ids.length (1);
              source_ids [0] = 0;
              TAO_SFP_Base::write_frame_message (frame_info->timestamp,
                                                 frame_info->ssrc,
                                                 source_ids,
                                                 this->sequence_num_,
                                                 out_stream);
            }
          else
            {
              CORBA::Boolean result = TAO_SFP_Base::start_frame (flags,
                                                                 flowProtocol::SimpleFrame_Msg,
                                                                 out_stream);
              if (result == 0)
                return 0;
            }
          TAO_SFP_Base::send_message (this->transport_,
                                      out_stream,
                                      frame);
        }
      else // larger frame,fragment and send it.
        {
          flags = flags | 2;
          if (frame_info != 0)
            {
              if (frame_info->boundary_marker)
                flags |= 4;
              result = TAO_SFP_Base::start_frame (flags,
                                                  flowProtocol::Frame_Msg,
                                                  out_stream);
              if (result == 0)
                return result;
              flowProtocol::my_seq_ulong source_ids;
              source_ids.length (1);
              source_ids [0] = 0;
              TAO_SFP_Base::write_frame_message (frame_info->timestamp,
                                                 frame_info->ssrc,
                                                 source_ids,
                                                 this->sequence_num_,
                                                 out_stream);
            }
          else
            {
              CORBA::Boolean result = TAO_SFP_Base::start_frame (flags,
                                                                 flowProtocol::SimpleFrame_Msg,
                                                                 out_stream);
              if (result == 0)
                return 0;
            }
          size_t last_len,current_len;
          int message_len = static_cast<int> (out_stream.total_length ());
          ACE_Message_Block *mb = frame;
          ACE_Message_Block *fragment_mb =
            this->get_fragment (mb,
                                message_len,
                                last_len,
                                current_len);
          //  This can be either a simpleframe or a sequenced frame,other types of frames.
          TAO_SFP_Base::send_message (this->transport_,
                                      out_stream,
                                      fragment_mb);
          out_stream.reset ();
          int frag_number = 1;
          mb->length (last_len);
          mb->rd_ptr (current_len);
          // If there is any more data send those as fragments.
          while (mb != 0)
            {
              message_len = TAO_SFP_Base::fragment_len;
              fragment_mb = this->get_fragment (mb,
                                                message_len,
                                                last_len,
                                                current_len);
              if (mb == 0)
                {
                  if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"sending the last fragment\n"));
                  // This is the last fragment so clear the fragments bit.
                  flags = TAO_ENCAP_BYTE_ORDER;
                }
              if (fragment_mb == 0)
                break;
              if (frame_info != 0)
                {
                  TAO_SFP_Base::write_fragment_message (flags,
                                                        frag_number++,
                                                        this->sequence_num_,
                                                        frame_info->ssrc,
                                                        out_stream);
                }
              else
                {
                  TAO_SFP_Base::write_fragment_message (flags,
                                                        frag_number++,
                                                        this->sequence_num_,
                                                        0,
                                                        out_stream);
                }
              //   send the fragment now.
              // without the sleep the fragments gets lost!
              // probably because the UDP buffer queue on the sender side
              // is overflown it drops the packets.
              // XXX: This is a hack.
              ACE_OS::sleep (1);
              result = TAO_SFP_Base::send_message (this->transport_,
                                                   out_stream,
                                                   fragment_mb);
              if (mb != 0)
                {
                  mb->length (last_len);
                  mb->rd_ptr (current_len);
                }
            }
          // Increment the sequence_num after sending the message.
          this->sequence_num_++;
          // Also reduce the number of credits.
          if (this->max_credit_ > 0)
            this->current_credit_--;
        }
    }
  else
    {
      // flow controlled so wait.
      // A greater than 0 value indicates that flow control is being exercised.
      return 1;
    }
   return 0;
}

int TAO_SFP_Object::send_frame ( const char *  buf,
size_t  len 
) [virtual]

Implements TAO_AV_Protocol_Object.

Definition at line 1036 of file sfp.cpp.

{
  return 0;
}

int TAO_SFP_Object::send_frame ( const iovec *  iov,
int  iovcnt,
TAO_AV_frame_info frame_info = 0 
) [virtual]

send a frame in iovecs.

Implements TAO_AV_Protocol_Object.

Definition at line 1028 of file sfp.cpp.

{
  ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SFP_Object::send_frame"),-1);
}

int TAO_SFP_Object::set_policies ( const TAO_AV_PolicyList policy_list  )  [virtual]

set/get policies.

Reimplemented from TAO_AV_Protocol_Object.

Definition at line 1087 of file sfp.cpp.

{
  TAO_AV_Policy *policy = 0;
  for (CORBA::ULong i=0;i<policies.length ();i++)
    {
      policy = policies[i];
      switch (policies[i]->type ())
        {

        case TAO_AV_SFP_CREDIT_POLICY:
          {
            TAO_AV_SFP_Credit_Policy *credit_policy =
              reinterpret_cast<TAO_AV_SFP_Credit_Policy*> (policy);
            this->max_credit_ = credit_policy->value ();
          }
        default:
          break;
        }
    }
  return 0;
}


Member Data Documentation

Definition at line 250 of file sfp.h.

Definition at line 249 of file sfp.h.

Definition at line 247 of file sfp.h.

Definition at line 248 of file sfp.h.

Definition at line 251 of file sfp.h.


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines