Public Types | Public Member Functions | Static Public Member Functions | Static Public Attributes | Static Protected Member Functions

TAO_SFP_Base Class Reference

#include <sfp.h>

List of all members.

Public Types

enum  State {
  ACTIVE_START, PASSIVE_START, TIMEDOUT_T1, TIMEDOUT_T2,
  REPLY_RECEIVED, START_RECEIVED
}

Public Member Functions

 TAO_SFP_Base (void)

Static Public Member Functions

static CORBA::Boolean start_frame (CORBA::Octet flags, flowProtocol::MsgType type, TAO_OutputCDR &msg)
static CORBA::Boolean write_start_message (TAO_OutputCDR &msg)
static CORBA::Boolean write_start_reply_message (TAO_OutputCDR &msg)
static CORBA::Boolean write_credit_message (CORBA::ULong cred_num, TAO_OutputCDR &msg)
static CORBA::Boolean write_fragment_message (CORBA::Octet flags, CORBA::ULong fragment_number, CORBA::ULong sequence_number, CORBA::ULong source_id, TAO_OutputCDR &msg)
static CORBA::Boolean write_frame_message (CORBA::ULong timestamp, CORBA::ULong synchSource, flowProtocol::my_seq_ulong source_ids, CORBA::ULong sequence_num, TAO_OutputCDR &msg)
static int send_message (TAO_AV_Transport *transport, TAO_OutputCDR &stream, ACE_Message_Block *mb=0)
static int peek_message_type (TAO_AV_Transport *transport, flowProtocol::MsgType &type)
static int read_start_message (TAO_AV_Transport *transport, flowProtocol::Start &start, TAO_InputCDR &cdr)
static int read_start_reply_message (TAO_AV_Transport *transport, flowProtocol::StartReply &start_reply, TAO_InputCDR &cdr)
static int read_credit_message (TAO_AV_Transport *transport, flowProtocol::credit &credit, TAO_InputCDR &cdr)
static int read_endofstream_message (TAO_AV_Transport *transport, flowProtocol::frameHeader &endofstream, TAO_InputCDR &cdr)
static int read_frame (TAO_AV_Transport *transport, flowProtocol::frameHeader &frame_header, TAO_SFP_Frame_State &state, TAO_AV_frame_info *&frame_info)
static int read_fragment (TAO_AV_Transport *transport, flowProtocol::fragment &fragment, TAO_SFP_Frame_State &state, TAO_AV_frame_info *&frame_info)
static int peek_frame_header (TAO_AV_Transport *transport, flowProtocol::frameHeader &header, TAO_InputCDR &cdr)
static int peek_fragment_header (TAO_AV_Transport *transport, flowProtocol::fragment &fragment, TAO_InputCDR &cdr)
static int handle_input (TAO_AV_Transport *transport, TAO_SFP_Frame_State &state, TAO_AV_frame_info *&frame_info)
static ACE_Message_Blockcheck_all_fragments (TAO_SFP_Fragment_Table_Entry *fragment_entry)

Static Public Attributes

static const char TAO_SFP_ORB_ARGUMENTS [] = "-ORBObjRefStyle URL"
static const char TAO_SFP_MAGIC_NUMBER [] = "=SFP"
static const char TAO_SFP_FRAGMENT_MAGIC_NUMBER [] = "FRAG"
static const char TAO_SFP_START_MAGIC_NUMBER [] = "=STA"
static const char TAO_SFP_CREDIT_MAGIC_NUMBER [] = "=CRE"
static const char TAO_SFP_STARTREPLY_MAGIC_NUMBER [] = "=STR"
static const unsigned char TAO_SFP_MAJOR_VERSION = 1
static const unsigned char TAO_SFP_MINOR_VERSION = 0
static const unsigned char TAO_SFP_FRAME_HEADER_LEN = 12
static const unsigned char TAO_SFP_MESSAGE_SIZE_OFFSET = 8
static const unsigned char TAO_SFP_FRAGMENT_SIZE_OFFSET = 16
static u_int frame_header_len
static u_int start_reply_len
static u_int start_len
static u_int credit_len
static u_int fragment_len

Static Protected Member Functions

static void dump_buf (char *buf, int n)

Detailed Description

Definition at line 106 of file sfp.h.


Member Enumeration Documentation

Enumerator:
ACTIVE_START 
PASSIVE_START 
TIMEDOUT_T1 
TIMEDOUT_T2 
REPLY_RECEIVED 
START_RECEIVED 

Definition at line 133 of file sfp.h.

  {
    ACTIVE_START,
    PASSIVE_START,
    TIMEDOUT_T1,
    TIMEDOUT_T2,
    REPLY_RECEIVED,
    START_RECEIVED
  };


Constructor & Destructor Documentation

TAO_SFP_Base::TAO_SFP_Base ( void   ) 

Definition at line 47 of file sfp.cpp.

{
  TAO_OutputCDR output_cdr;
  flowProtocol::frameHeader frame_header;
  flowProtocol::fragment fragment;
  flowProtocol::credit credit;
  flowProtocol::Start start;
  flowProtocol::StartReply start_reply;

  // fill in the default frameHeader fields.
  frame_header.magic_number [0] = '=';
  frame_header.magic_number [1] = 'S';
  frame_header.magic_number [2] = 'F';
  frame_header.magic_number [3] = 'P';
  frame_header.flags = TAO_ENCAP_BYTE_ORDER;
  output_cdr.reset ();
  if (!(output_cdr << frame_header))
    {
      ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n"));
      return;
    }

  frame_header_len = static_cast<u_int> (output_cdr.total_length ());
  // fill in the default fragment message fields.
  fragment.magic_number [0] = 'F';
  fragment.magic_number [1] = 'R';
  fragment.magic_number [2] = 'A';
  fragment.magic_number [3] = 'G';
  output_cdr.reset ();
  if (!(output_cdr << fragment))
    {
      ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n"));
      return;
    }

  fragment_len = static_cast<u_int> (output_cdr.total_length ());
  // fill in the default Start message fields.
  start.magic_number [0] = '=';
  start.magic_number [1] = 'S';
  start.magic_number [2] = 'T';
  start.magic_number [3] = 'A';
  start.major_version = TAO_SFP_Base::TAO_SFP_MAJOR_VERSION;
  start.minor_version = TAO_SFP_Base::TAO_SFP_MINOR_VERSION;
  start.flags = 0;
  output_cdr.reset ();
  if (!(output_cdr << start))
    {
      ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n"));
      return;
    }

  start_len = static_cast<u_int> (output_cdr.total_length ());
  // fill in the default StartReply message fields.
  start_reply.magic_number [0] = '=';
  start_reply.magic_number [1] = 'S';
  start_reply.magic_number [2] = 'T';
  start_reply.magic_number [3] = 'R';
  start_reply.flags = 0;
  output_cdr.reset ();
  if (!(output_cdr << start_reply))
    {
      ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n"));
      return;
    }

  start_reply_len = static_cast<u_int> (output_cdr.total_length ());

  // fill in the default Credit message fields.
  credit.magic_number [0] = '=';
  credit.magic_number [1] = 'C';
  credit.magic_number [2] = 'R';
  credit.magic_number [3] = 'E';
  output_cdr.reset ();

  if (!(output_cdr << credit))
    {
      ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n"));
      return;
    }
  credit_len = static_cast<u_int> (output_cdr.total_length ());
}


Member Function Documentation

ACE_Message_Block * TAO_SFP_Base::check_all_fragments ( TAO_SFP_Fragment_Table_Entry fragment_entry  )  [static]

Definition at line 458 of file sfp.cpp.

{
  if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"table size: %d, num_fragments: %d\n",fragment_entry->fragment_set_.size (),fragment_entry->num_fragments_));
  // check to see if all the frames have been received.
  if (fragment_entry->fragment_set_.size () == fragment_entry->num_fragments_)
    {
      if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"all fragments have been received\n"));
      // all the fragments have been received
      // we can now chain the ACE_Message_Blocks in the fragment_set_ and then return them
      // back.
      ACE_Message_Block *frame = 0,*head = 0;
      FRAGMENT_SET_ITERATOR frag_iterator (fragment_entry->fragment_set_);
      TAO_SFP_Fragment_Node *node;
      for (;frag_iterator.next (node) != 0;frag_iterator.advance ())
        {
          if (!head)
            head = frame = node->data_;
          else
            {
              frame->cont (node->data_);
              frame = node->data_;
            }
        }
      return head;
    }
  return 0;
}

void TAO_SFP_Base::dump_buf ( char *  buf,
int  n 
) [static, protected]

Definition at line 823 of file sfp.cpp.

{
  char *buf = buffer;
  if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"\n========================================\n"));
  for (int i=0;i<size;i++)
    if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"%d ",buf[i]));
  if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"\n========================================\n"));
}

int TAO_SFP_Base::handle_input ( TAO_AV_Transport transport,
TAO_SFP_Frame_State state,
TAO_AV_frame_info *&  frame_info 
) [static]

Definition at line 130 of file sfp.cpp.

{
  int result;
  flowProtocol::MsgType msg_type;
  result = TAO_SFP_Base::peek_message_type (transport,
                                            msg_type);
  if (result < 0)
    return result;
  //  TAO_InputCDR &input = state.cdr;
  switch (msg_type)
    {
    case flowProtocol::SimpleFrame_Msg:
    case flowProtocol::Frame_Msg:
      {
        result = TAO_SFP_Base::peek_frame_header (transport,
                                                  state.frame_header_,
                                                  state.cdr);
        if (result < 0)
          return result;
        int result =TAO_SFP_Base::read_frame (transport,
                                              state.frame_header_,
                                              state,
                                              frame_info);
        if (result < 0)
          return result;
        break;
      }
    case flowProtocol::Fragment_Msg:
      {
        result = TAO_SFP_Base::peek_fragment_header (transport,
                                                     state.fragment_,
                                                     state.cdr);
        if (result < 0)
          return result;
        if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"Fragment received\n"));
        result = TAO_SFP_Base::read_fragment (transport,
                                              state.fragment_,
                                              state,
                                              frame_info);
        if (result < 0)
          return result;
        break;
      }
    case flowProtocol::EndofStream_Msg:
      {
        result = TAO_SFP_Base::read_endofstream_message (transport,
                                                         state.frame_header_,
                                                         state.cdr);
        if (result < 0)
          return result;
        break;
      }
    default:
      break;
    }
  return 0;
}

int TAO_SFP_Base::peek_fragment_header ( TAO_AV_Transport transport,
flowProtocol::fragment fragment,
TAO_InputCDR cdr 
) [static]

Definition at line 803 of file sfp.cpp.

{
  input.grow (fragment_len);
  char *buf = input.rd_ptr ();
  int n = transport->recv (buf,
                           fragment_len,
                           MSG_PEEK);
  if (n != static_cast<int> (fragment_len))
    ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_endofstream_message"),0);
  else
    {
      if (!(input >> fragment))
        return -1;
    }
  return 0;
}

int TAO_SFP_Base::peek_frame_header ( TAO_AV_Transport transport,
flowProtocol::frameHeader header,
TAO_InputCDR cdr 
) [static]

Definition at line 783 of file sfp.cpp.

{
  input.grow (frame_header_len);
  char *buf = input.rd_ptr ();
  int n = transport->recv (buf,
                           frame_header_len,
                           MSG_PEEK);
  if (n != static_cast<int> (frame_header_len))
    ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_endofstream_message"),0);
  else
    {
      if (!(input >> header))
        return -1;
    }
  return 0;
}

int TAO_SFP_Base::peek_message_type ( TAO_AV_Transport transport,
flowProtocol::MsgType type 
) [static]

Definition at line 655 of file sfp.cpp.

{
  char peek_buffer [TAO_SFP_MAGIC_NUMBER_LEN+2];// 2 is for flags + message_type.
  int peek_len = TAO_SFP_MAGIC_NUMBER_LEN +2;
  char magic_number [TAO_SFP_MAGIC_NUMBER_LEN+1];
  ssize_t n =transport->recv (peek_buffer,
                              peek_len,
                              MSG_PEEK);
  ACE_OS::strncpy (magic_number,
                   peek_buffer,
                   TAO_SFP_MAGIC_NUMBER_LEN);
  magic_number [TAO_SFP_MAGIC_NUMBER_LEN] = 0;
  if (n == -1)
    ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),-1);
  else if (n==0)
    ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),-1);

  if (ACE_OS::strcmp (magic_number,TAO_SFP_START_MAGIC_NUMBER) == 0)
    {
      if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t)Start message received\n"));
      msg_type = flowProtocol::Start_Msg;
    }
  else if (ACE_OS::strcmp (magic_number,TAO_SFP_STARTREPLY_MAGIC_NUMBER) == 0)
    {
      if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t)StartReply message received\n"));
      msg_type = flowProtocol::StartReply_Msg;
    }
  else if (ACE_OS::strcmp (magic_number,TAO_SFP_MAGIC_NUMBER) == 0)
    {
      if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t) frameHeader received\n"));
      //      msg_type = flowProtocol::SimpleFrame;
      msg_type = (flowProtocol::MsgType)peek_buffer [TAO_SFP_MESSAGE_TYPE_OFFSET];
      if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"Message Type = %d\n",msg_type));
    }
  else if (ACE_OS::strcmp (magic_number,TAO_SFP_FRAGMENT_MAGIC_NUMBER) == 0)
    {
      if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t) fragment Header received\n"));
      msg_type = flowProtocol::Fragment_Msg;
    }
  else if (ACE_OS::strcmp (magic_number,TAO_SFP_CREDIT_MAGIC_NUMBER) == 0)
    {
      if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t) credit message received\n"));
      msg_type = flowProtocol::Credit_Msg;
    }
  else
    ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP:Invalid magic number\n"),-1);
  return 0;
}

int TAO_SFP_Base::read_credit_message ( TAO_AV_Transport transport,
flowProtocol::credit credit,
TAO_InputCDR cdr 
) [static]

Definition at line 745 of file sfp.cpp.

{
  input.grow (start_len);
  char *buf = input.rd_ptr ();
  int n = transport->recv (buf,
                           credit_len);
  if (n != static_cast<int> (credit_len))
    ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_credit_message"),0);
  else
    {
      if (!(input >> credit))
        return -1;
    }
  return 0;
}

int TAO_SFP_Base::read_endofstream_message ( TAO_AV_Transport transport,
flowProtocol::frameHeader endofstream,
TAO_InputCDR cdr 
) [static]

Definition at line 764 of file sfp.cpp.

{
  input.grow (start_len);
  char *buf = input.rd_ptr ();
  int n = transport->recv (buf,
                           frame_header_len);
  if (n != static_cast<int> (frame_header_len))
    ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_endofstream_message"),0);
  else
    {
      if (!(input >> endofstream))
        return -1;
    }
  return 0;
}

int TAO_SFP_Base::read_fragment ( TAO_AV_Transport transport,
flowProtocol::fragment fragment,
TAO_SFP_Frame_State state,
TAO_AV_frame_info *&  frame_info 
) [static]

Definition at line 369 of file sfp.cpp.

{
  TAO_SFP_Fragment_Table_Entry *fragment_entry = 0;
  int result = -1;

  if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"frag_number = %d, frag_size = %d,source_id  = %d sequnce_num = %d\n",
              fragment.frag_number,fragment.frag_sz,fragment.source_id,fragment.sequence_num));

  ACE_Message_Block *data;
  ACE_NEW_RETURN (data,
                  ACE_Message_Block(fragment.frag_sz),
                  -1);

  // Read the fragment.
  int n = transport->recv (data->wr_ptr (),fragment.frag_sz);
  if ((n == -1) || (n==0))
    ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP::read_fragment:%p",""),-1);
  // move past the fragment header.
  data->rd_ptr (fragment_len);
  data->wr_ptr (n);
  //  TAO_SFP_Base::dump_buf (data->rd_ptr (),data->length ());
  if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"length of %dth fragment is: %d\n",
              fragment.frag_number,
              data->length ()));

  TAO_SFP_Fragment_Node *new_node;
  ACE_NEW_RETURN (new_node,
                  TAO_SFP_Fragment_Node,
                  -1);
  new_node->fragment_info_ = fragment;
  new_node->data_ = data;
  TAO_SFP_Fragment_Table *fragment_table = 0;
  result = state.fragment_table_map_.find (fragment.source_id,fragment_table);
  if (result != 0)
    {
      ACE_NEW_RETURN (fragment_table,
                      TAO_SFP_Fragment_Table,
                      -1);
      result = state.fragment_table_map_.bind (fragment.source_id,fragment_table);
      if (result < 0)
        ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP_Base::read_fragment:fragment_table_map:bind failed\n"),-1);
    }
  if (fragment_table->find (fragment.sequence_num,fragment_entry) == 0)
    {
      // Already an entry exists. Traverse the list and insert it at the right place.
      result = fragment_entry->fragment_set_.insert (*new_node);
      if (result != 0)
        ACE_ERROR_RETURN ((LM_ERROR,"insert for %dth node failed\n",fragment.frag_number),-1);
      // check if all the fragments have been received.
    }
  else
    {
      ACE_NEW_RETURN (fragment_entry,
                      TAO_SFP_Fragment_Table_Entry,
                      -1);
      fragment_entry->fragment_set_.insert (*new_node);
      // bind a new entry for this sequence number.
      result = fragment_table->bind (fragment.sequence_num,fragment_entry);
      if (result != 0)
        ACE_ERROR_RETURN ((LM_ERROR,"bind for %dth fragment failed\n",
                           fragment.frag_number),-1);
    }
  if (!(fragment.flags & 0x2))
    {
      if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"Last fragment received\n"));
      // if bit 1 is not set then there are
      // no more fragments.
      fragment_entry->last_received_ = 1;
      // since fragment number starts from 0 to n-1 we add 1.
      fragment_entry->num_fragments_ = fragment.frag_number + 1;
    }


  state.frame_block_ = check_all_fragments (fragment_entry);
  if (state.frame_block_ != 0)
    {
      state.more_fragments_ = 0;
      ACE_NEW_RETURN (frame_info,
                      TAO_AV_frame_info,
                      -1);
      *frame_info = fragment_entry->frame_info;
    }
  return 0;
}

int TAO_SFP_Base::read_frame ( TAO_AV_Transport transport,
flowProtocol::frameHeader frame_header,
TAO_SFP_Frame_State state,
TAO_AV_frame_info *&  frame_info 
) [static]

Definition at line 191 of file sfp.cpp.

{
  ACE_Message_Block *message_block = 0;
  int result = -1;

  if (TAO_debug_level > 0)
    ACE_DEBUG ((LM_DEBUG,"Reading simple frame\n"));
  // Check to see what the length of the message is.
  int byte_order = frame_header.flags & 0x1;
  int message_len = frame_header.message_size;

//       ACE_NEW_RETURN (message_block,
//                       ACE_Message_Block (message_len),
//                       0);
  state.static_frame_.rd_ptr (state.static_frame_.base ());
  state.static_frame_.wr_ptr (state.static_frame_.base ());
  int n = transport->recv (state.static_frame_.rd_ptr (),message_len);
  if (n == -1)
    ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0);
  else if (n==0)
    ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0);
  else if (n != message_len)
    ACE_ERROR_RETURN ((LM_ERROR,"SFP::read_simple_frame:message truncated\n"),0);
  message_block = &state.static_frame_;
  // print the buffer.
  //      this->dump_buf (message,n);
  // skip over the frame header.
  message_block->rd_ptr (frame_header_len);
  message_block->wr_ptr (n);
  CORBA::ULong ssrc = 0;
  TAO_SFP_Fragment_Table_Entry *fragment_entry = 0;
  if (frame_header.flags & 0x2)
    {
      if (TAO_debug_level > 0)
        ACE_DEBUG ((LM_DEBUG,"fragmented frame:0th fragment\n"));
      state.more_fragments_ = 1;
      ACE_Message_Block *data = 0;
      switch (frame_header.message_type)
        {
        case flowProtocol::Frame_Msg:
          {
            // read the frame info.
            ACE_Message_Block frame_info_mb (message_len-frame_header_len+ACE_CDR::MAX_ALIGNMENT);
            ACE_CDR::mb_align (&frame_info_mb);
            frame_info_mb.copy (message_block->rd_ptr (),
                                message_block->length ());
            // print the buffer.
            //          this->dump_buf (message_block->rd_ptr (),16);
            TAO_InputCDR frame_info_cdr (&frame_info_mb,byte_order);
            frame_info_cdr >> state.frame_;
            if (TAO_debug_level > 0)
              ACE_DEBUG ((LM_DEBUG,
                          "frame.timestamp = %d, "
                          "frame.synchsource = %d, "
                          "frame.sequence_num = %d\n",
                          state.frame_.timestamp,
                          state.frame_.synchSource,
                          state.frame_.sequence_num));
            ssrc = state.frame_.synchSource;
            // The remaining message in the CDR stream is the fragment
            // data for frag.0
            data = frame_info_cdr.start ()->clone ();
            break;
          }
        case flowProtocol::SimpleFrame_Msg:
          {
            data = message_block->clone ();
            break;
          }
        case flowProtocol::SequencedFrame_Msg:
          break;
        case flowProtocol::SpecialFrame_Msg:
          break;
        }
      if (TAO_debug_level > 0)
        ACE_DEBUG ((LM_DEBUG,"Length of 0th fragment= %d\n",data->length ()));
      TAO_SFP_Fragment_Table *fragment_table = 0;
      result = state.fragment_table_map_.find (ssrc,fragment_table);
      if (result != 0)
        {
          ACE_NEW_RETURN (fragment_table,
                          TAO_SFP_Fragment_Table,
                          -1);
          result = state.fragment_table_map_.bind (ssrc,fragment_table);
          if (result < 0)
            ACE_ERROR_RETURN ((LM_ERROR,
                               "TAO_SFP_Base::read_frame: "
                               "fragment_table_map:bind failed\n"),-1);
        }

      TAO_SFP_Fragment_Node *new_node;
      ACE_NEW_RETURN (new_node,
                      TAO_SFP_Fragment_Node,
                      0);
      new_node->fragment_info_.frag_sz = static_cast<CORBA::ULong> (data->length ());
      new_node->fragment_info_.frag_number = 0;
      if (state.frame_.source_ids.length () > 0)
        new_node->fragment_info_.source_id = state.frame_.source_ids [0];
      else
        new_node->fragment_info_.source_id = 0;
      new_node->data_ = data;
      //          TAO_SFP_Base::dump_buf (data->rd_ptr (),data->length ());
      if (fragment_table->find (state.frame_.sequence_num,fragment_entry) == 0)
        {
          // This case can happen where a nth (n > 0)fragment is
          // received before the 0th fragment.
          if (TAO_debug_level > 0)
            ACE_DEBUG ((LM_DEBUG,
                        "fragment table entry found for 0th fragment:\n"));
          result = fragment_entry->fragment_set_.insert (*new_node);
          if (result != 0)
            ACE_ERROR_RETURN ((LM_ERROR,
                               "insert for 0th fragment failed\n"),0);
          //  enter the frame info.

          // check if all the fragments have been received.
          state.frame_block_ =
            TAO_SFP_Base::check_all_fragments (fragment_entry);
          if (state.frame_block_ != 0)
            state.more_fragments_ = 0;
        }
      else
        {
          if (TAO_debug_level > 0)
            ACE_DEBUG ((LM_DEBUG,
                        "fragment table entry not found for 0th fragment\n"));
          TAO_SFP_Fragment_Table_Entry *new_entry;
          ACE_NEW_RETURN (new_entry,
                          TAO_SFP_Fragment_Table_Entry,
                          0);
          result = new_entry->fragment_set_.insert (*new_node);
          if (result != 0)
            ACE_ERROR_RETURN ((LM_ERROR,"insert for 0th fragment failed\n"),0);
          fragment_entry = new_entry;
          // not found. so bind a new entry.
          result = fragment_table->bind (state.frame_.sequence_num,new_entry);
          if (result != 0)
            ACE_ERROR_RETURN ((LM_ERROR,"fragment table bind failed\n"),0);
          if (frame_header.message_type & 4 )
            fragment_entry->frame_info.boundary_marker = 1;
          switch (frame_header.message_type)
            {
            case flowProtocol::Frame_Msg:
              fragment_entry->frame_info.ssrc = state.frame_.synchSource;
              fragment_entry->frame_info.timestamp = state.frame_.timestamp;
              fragment_entry->frame_info.sequence_num = state.frame_.sequence_num;
              break;
            case flowProtocol::SimpleFrame_Msg:
              fragment_entry->frame_info.ssrc =
                fragment_entry->frame_info.timestamp =
                fragment_entry->frame_info.sequence_num = 0;
              break;
            }
          return 0;
        }
    }
  else
    {
      state.more_fragments_ = 0;
      state.frame_block_ = message_block;
    }
  if (state.more_fragments_ == 0)
    {
      if (fragment_entry != 0)
        {
          ACE_NEW_RETURN (frame_info,
                          TAO_AV_frame_info,
                          -1);
          *frame_info = fragment_entry->frame_info;
        }
    }
  return 0;
}

int TAO_SFP_Base::read_start_message ( TAO_AV_Transport transport,
flowProtocol::Start start,
TAO_InputCDR cdr 
) [static]

Definition at line 706 of file sfp.cpp.

{
  input.grow (start_len);
  char *buf = input.rd_ptr ();
  int n = transport->recv (buf,
                           start_len);
  if (n != static_cast<int> (start_len))
    ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_start\n"),0);
  else
    {
      if (!(input >> start))
        return -1;
    }
  return 0;
}

int TAO_SFP_Base::read_start_reply_message ( TAO_AV_Transport transport,
flowProtocol::StartReply start_reply,
TAO_InputCDR cdr 
) [static]

Definition at line 726 of file sfp.cpp.

{
  input.grow (start_len);
  char *buf = input.rd_ptr ();
  int n = transport->recv (buf,
                           start_reply_len);
  if (n != static_cast<int> (start_len))
    ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_start_reply_message"),0);
  else
    {
      if (!(input >> start_reply))
        return -1;
    }
  return 0;
}

int TAO_SFP_Base::send_message ( TAO_AV_Transport transport,
TAO_OutputCDR stream,
ACE_Message_Block mb = 0 
) [static]

Definition at line 596 of file sfp.cpp.

{
  CORBA::ULong total_len = static_cast<CORBA::ULong> (stream.total_length ());
  if (mb != 0)
    {
      for (ACE_Message_Block *temp = mb;temp != 0;temp = temp->cont ())
        total_len += static_cast<CORBA::ULong> (temp->length ());

      char *buf = (char *) stream.buffer ();
      size_t offset = TAO_SFP_MESSAGE_SIZE_OFFSET;
      // second character distinguished =SFP and FRAG.
      if (*(buf) == 'F')
        {
          // Fragment message.
          offset = TAO_SFP_FRAGMENT_SIZE_OFFSET;
        }
#if !defined (ACE_ENABLE_SWAP_ON_WRITE)
      *reinterpret_cast<CORBA::ULong *> (buf + offset) = total_len;
#else
      if (!stream.do_byte_swap ())
        *reinterpret_cast<CORBA::ULong *> (buf + offset) = total_len;
      else
        ACE_CDR::swap_4 (reinterpret_cast<char *> (&total_len),
                         buf + offset);
#endif /* ACE_ENABLE_SWAP_ON_WRITE */
    }
  // we join the data block with the cdr block.
  ACE_Message_Block *end = (ACE_Message_Block *)stream.end ();
  if (end == 0)
    {
      // There is only one message block.
      end = (ACE_Message_Block *)stream.begin ();
      //      TAO_SFP_Base::dump_buf (end->rd_ptr (),end->length ());
    }
  end->cont (mb);
  ssize_t n = transport->send (stream.begin ());
  if (n == -1)
    {
      if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
                  "TAO: (%P|%t) closing conn after fault %p\n",
                  "GIOP::send_request ()"));
      return -1;
    }
  // EOF.
  if (n == 0)
    {
      if (TAO_debug_level > 0)
        ACE_DEBUG ((LM_DEBUG,
                    "TAO: (%P|%t) GIOP::send_request () "
                    "EOF, closing conn:\n"));
      return -1;
    }
  return 1;

}

CORBA::Boolean TAO_SFP_Base::start_frame ( CORBA::Octet  flags,
flowProtocol::MsgType  type,
TAO_OutputCDR msg 
) [static]

Definition at line 487 of file sfp.cpp.

{
  msg.reset ();
  flowProtocol::frameHeader frame_header;

  frame_header.magic_number [0] = '=';
  frame_header.magic_number [1] = 'S';
  frame_header.magic_number [2] = 'F';
  frame_header.magic_number [3] = 'P';
  frame_header.flags = flags;
  frame_header.message_type = static_cast<CORBA::Octet> (type);
  frame_header.message_size = 0;
  if (!(msg << frame_header))
    return 0;
  return 1;
}

CORBA::Boolean TAO_SFP_Base::write_credit_message ( CORBA::ULong  cred_num,
TAO_OutputCDR msg 
) [static]

Definition at line 539 of file sfp.cpp.

{
  flowProtocol::credit credit;

  credit.magic_number [0] = '=';
  credit.magic_number [1] = 'C';
  credit.magic_number [2] = 'R';
  credit.magic_number [3] = 'E';
  credit.cred_num = cred_num;
  if (!(msg << credit))
    return 0;
  return 1;
}

CORBA::Boolean TAO_SFP_Base::write_fragment_message ( CORBA::Octet  flags,
CORBA::ULong  fragment_number,
CORBA::ULong  sequence_number,
CORBA::ULong  source_id,
TAO_OutputCDR msg 
) [static]

Definition at line 555 of file sfp.cpp.

{
  msg.reset ();
  flowProtocol::fragment fragment;

  fragment.magic_number [0] = 'F';
  fragment.magic_number [1] = 'R';
  fragment.magic_number [2] = 'A';
  fragment.magic_number [3] = 'G';
  fragment.flags = flags;
  fragment.frag_number = fragment_number;
  fragment.sequence_num = sequence_number;
  fragment.source_id = source_id;
  if (!(msg << fragment))
    return 0;
  return 1;
}

CORBA::Boolean TAO_SFP_Base::write_frame_message ( CORBA::ULong  timestamp,
CORBA::ULong  synchSource,
flowProtocol::my_seq_ulong  source_ids,
CORBA::ULong  sequence_num,
TAO_OutputCDR msg 
) [static]

Definition at line 578 of file sfp.cpp.

{
  flowProtocol::frame frame;

  frame.timestamp = timestamp;
  frame.synchSource = synchSource;
  frame.source_ids = source_ids;
  frame.sequence_num = sequence_num;
  if (!(msg << frame))
    return 0;
  return 1;
}

CORBA::Boolean TAO_SFP_Base::write_start_message ( TAO_OutputCDR msg  )  [static]

Definition at line 507 of file sfp.cpp.

{
  flowProtocol::Start start;

  start.magic_number [0] = '=';
  start.magic_number [1] = 'S';
  start.magic_number [2] = 'T';
  start.magic_number [3] = 'A';
  start.major_version = TAO_SFP_MAJOR_VERSION;
  start.minor_version = TAO_SFP_MINOR_VERSION;
  start.flags = 0;
  if (!(msg << start))
    return 0;
  return 1;
}

CORBA::Boolean TAO_SFP_Base::write_start_reply_message ( TAO_OutputCDR msg  )  [static]

Definition at line 524 of file sfp.cpp.

{
  flowProtocol::StartReply start_reply;

  start_reply.magic_number [0] = '=';
  start_reply.magic_number [1] = 'S';
  start_reply.magic_number [2] = 'T';
  start_reply.magic_number [3] = 'R';
  start_reply.flags = 0;
  if (!(msg << start_reply))
    return 0;
  return 1;
}


Member Data Documentation

u_int TAO_SFP_Base::credit_len [static]

Definition at line 130 of file sfp.h.

Definition at line 131 of file sfp.h.

Definition at line 127 of file sfp.h.

u_int TAO_SFP_Base::start_len [static]

Definition at line 129 of file sfp.h.

Definition at line 128 of file sfp.h.

const char TAO_SFP_Base::TAO_SFP_CREDIT_MAGIC_NUMBER = "=CRE" [static]

Definition at line 116 of file sfp.h.

const char TAO_SFP_Base::TAO_SFP_FRAGMENT_MAGIC_NUMBER = "FRAG" [static]

Definition at line 114 of file sfp.h.

const unsigned char TAO_SFP_Base::TAO_SFP_FRAGMENT_SIZE_OFFSET = 16 [static]

Definition at line 126 of file sfp.h.

const unsigned char TAO_SFP_Base::TAO_SFP_FRAME_HEADER_LEN = 12 [static]

Definition at line 124 of file sfp.h.

const char TAO_SFP_Base::TAO_SFP_MAGIC_NUMBER = "=SFP" [static]

Definition at line 113 of file sfp.h.

const unsigned char TAO_SFP_Base::TAO_SFP_MAJOR_VERSION = 1 [static]

Definition at line 120 of file sfp.h.

const unsigned char TAO_SFP_Base::TAO_SFP_MESSAGE_SIZE_OFFSET = 8 [static]

Definition at line 125 of file sfp.h.

const unsigned char TAO_SFP_Base::TAO_SFP_MINOR_VERSION = 0 [static]

Definition at line 121 of file sfp.h.

const char TAO_SFP_Base::TAO_SFP_ORB_ARGUMENTS = "-ORBObjRefStyle URL" [static]

Definition at line 110 of file sfp.h.

const char TAO_SFP_Base::TAO_SFP_START_MAGIC_NUMBER = "=STA" [static]

Definition at line 115 of file sfp.h.

const char TAO_SFP_Base::TAO_SFP_STARTREPLY_MAGIC_NUMBER = "=STR" [static]

Definition at line 117 of file sfp.h.


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines