TAO_SFP_Object Class Reference

#include <sfp.h>

Inheritance diagram for TAO_SFP_Object:

Inheritance graph
[legend]
Collaboration diagram for TAO_SFP_Object:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TAO_SFP_Object (TAO_AV_Callback *callback, TAO_AV_Transport *transport)
virtual ~TAO_SFP_Object (void)
virtual int handle_input (void)=0
virtual int send_frame (ACE_Message_Block *frame, TAO_AV_frame_info *frame_info=0)
 send a data frame.
virtual int send_frame (const iovec *iov, int iovcnt, TAO_AV_frame_info *frame_info=0)
 send a frame in iovecs.
virtual int send_frame (const char *buf, size_t len)
virtual int destroy (void)
virtual int set_policies (const TAO_AV_PolicyList &policies)
 set/get policies.

Protected Member Functions

ACE_Message_Blockget_fragment (ACE_Message_Block *&frame, size_t initial_len, size_t &last_mb_orig_len, size_t &last_mb_current_len)

Protected Attributes

CORBA::ULong sequence_num_
CORBA::ULong source_id_
CORBA::Long max_credit_
CORBA::Long current_credit_
TAO_SFP_Frame_State state_

Detailed Description

Definition at line 218 of file sfp.h.


Constructor & Destructor Documentation

TAO_SFP_Object::TAO_SFP_Object ( TAO_AV_Callback callback,
TAO_AV_Transport transport 
)

Definition at line 836 of file sfp.cpp.

References ACE_Singleton< TYPE, ACE_LOCK >::instance(), ACE_Message_Block::size(), state_, and TAO_SFP_Frame_State::static_frame_.

00838   :TAO_AV_Protocol_Object (callback,transport),
00839    source_id_ (10),
00840    max_credit_ (-1),
00841    current_credit_ (-1)
00842 {
00843   TAO_SFP_BASE::instance ();
00844   this->state_.static_frame_.size (2* this->transport_->mtu ());
00845 }

TAO_SFP_Object::~TAO_SFP_Object ( void   )  [virtual]

Definition at line 847 of file sfp.cpp.

00848 {
00849   //no-op
00850 }


Member Function Documentation

int TAO_SFP_Object::destroy ( void   )  [virtual]

Implements TAO_AV_Protocol_Object.

Definition at line 853 of file sfp.cpp.

References TAO_AV_Protocol_Object::callback_, flowProtocol::EndofStream_Msg, TAO_AV_Callback::handle_destroy(), TAO_SFP_Base::send_message(), TAO_SFP_Base::start_frame(), and TAO_ENCAP_BYTE_ORDER.

00854 {
00855   int result = -1;
00856   TAO_OutputCDR out_stream;
00857   result = TAO_SFP_Base::start_frame (TAO_ENCAP_BYTE_ORDER,
00858                                       flowProtocol::EndofStream_Msg,
00859                                       out_stream);
00860   if (result < 0)
00861     return result;
00862   result = TAO_SFP_Base::send_message (this->transport_,
00863                                        out_stream);
00864   if (result < 0)
00865     return result;
00866   this->callback_->handle_destroy ();
00867   return 0;
00868 }

ACE_Message_Block * TAO_SFP_Object::get_fragment ( ACE_Message_Block *&  frame,
size_t  initial_len,
size_t &  last_mb_orig_len,
size_t &  last_mb_current_len 
) [protected]

Definition at line 1044 of file sfp.cpp.

References ACE_Message_Block::cont(), ACE_Message_Block::duplicate(), ACE_Message_Block::length(), and TAO_SFP_MAX_PACKET_SIZE.

Referenced by send_frame().

01048 {
01049   ACE_Message_Block *fragment_mb = 0,*temp_mb = 0;
01050   size_t prev_len,last_len = 0;
01051   size_t current_len = 0;
01052   size_t message_len = initial_len;
01053   while (mb != 0)
01054     {
01055       prev_len = message_len;
01056       message_len += mb->length ();
01057       if (fragment_mb == 0)
01058         fragment_mb = temp_mb = mb->duplicate ();
01059       if (message_len > TAO_SFP_MAX_PACKET_SIZE)
01060         {
01061           // get only the length that we can accomodate.
01062           current_len = TAO_SFP_MAX_PACKET_SIZE - prev_len;
01063           if (current_len < mb->length ())
01064             {
01065               // The above condition is an assertion.
01066               message_len += (current_len-mb->length ());
01067               last_len = mb->length ();
01068               mb->length (current_len);
01069               temp_mb->length (current_len);
01070             }
01071           break;
01072         }
01073       else
01074         {
01075           // we can accomodate this message block
01076           message_len += mb->length ();
01077           mb = mb->cont ();
01078           temp_mb = temp_mb->cont ();
01079         }
01080     }
01081   last_mb_orig_len = last_len;
01082   last_mb_current_len = current_len;
01083   return fragment_mb;
01084 }

virtual int TAO_SFP_Object::handle_input ( void   )  [pure virtual]

Implements TAO_AV_Protocol_Object.

Implemented in TAO_SFP_Producer_Object, and TAO_SFP_Consumer_Object.

int TAO_SFP_Object::send_frame ( const char *  buf,
size_t  len 
) [virtual]

Implements TAO_AV_Protocol_Object.

Definition at line 1036 of file sfp.cpp.

01038 {
01039   return 0;
01040 }

int TAO_SFP_Object::send_frame ( const iovec *  iov,
int  iovcnt,
TAO_AV_frame_info frame_info = 0 
) [virtual]

send a frame in iovecs.

Implements TAO_AV_Protocol_Object.

Definition at line 1028 of file sfp.cpp.

References ACE_ERROR_RETURN, and LM_ERROR.

01031 {
01032   ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SFP_Object::send_frame"),-1);
01033 }

int TAO_SFP_Object::send_frame ( ACE_Message_Block frame,
TAO_AV_frame_info frame_info = 0 
) [virtual]

send a data frame.

Implements TAO_AV_Protocol_Object.

Definition at line 871 of file sfp.cpp.

References ACE_DEBUG, ACE_ERROR_RETURN, TAO_AV_frame_info::boundary_marker, ACE_Message_Block::cont(), current_credit_, TAO_SFP_Base::fragment_len, TAO_SFP_Base::frame_header_len, flowProtocol::Frame_Msg, get_fragment(), LM_DEBUG, LM_ERROR, ACE_OutputCDR::reset(), TAO_SFP_Base::send_message(), sequence_num_, flowProtocol::SimpleFrame_Msg, ACE_OS::sleep(), TAO_AV_frame_info::ssrc, TAO_SFP_Base::start_frame(), TAO_debug_level, TAO_ENCAP_BYTE_ORDER, TAO_SFP_MAX_PACKET_SIZE, TAO_AV_frame_info::timestamp, ACE_OutputCDR::total_length(), TAO_SFP_Base::write_fragment_message(), and TAO_SFP_Base::write_frame_message().

00873 {
00874   TAO_OutputCDR out_stream;
00875   CORBA::Boolean result = 0;
00876   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_SFP_Object::send_frame\n"));
00877   CORBA::Octet flags = TAO_ENCAP_BYTE_ORDER;
00878   if (this->transport_ == 0)
00879     ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP_Object::send_frame: transport is null\n"),-1);
00880    if (this->current_credit_ != 0)
00881     {
00882       // if we have enough credit then we send.
00883       size_t total_length = 0;
00884       for (ACE_Message_Block *temp = frame;temp != 0;temp = temp->cont ())
00885         total_length += temp->length ();
00886       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"total_length of frame=%d\n",total_length));
00887       if (total_length < (TAO_SFP_MAX_PACKET_SIZE -TAO_SFP_Base::frame_header_len))
00888         {
00889           if (frame_info != 0)
00890             {
00891               if (frame_info->boundary_marker)
00892                 flags |= 4;
00893               CORBA::Boolean result = TAO_SFP_Base::start_frame (flags,
00894                                                                  flowProtocol::Frame_Msg,
00895                                                                  out_stream);
00896               if (result == 0)
00897                 return 0;
00898               flowProtocol::my_seq_ulong source_ids;
00899               source_ids.length (1);
00900               source_ids [0] = 0;
00901               TAO_SFP_Base::write_frame_message (frame_info->timestamp,
00902                                                  frame_info->ssrc,
00903                                                  source_ids,
00904                                                  this->sequence_num_,
00905                                                  out_stream);
00906             }
00907           else
00908             {
00909               CORBA::Boolean result = TAO_SFP_Base::start_frame (flags,
00910                                                                  flowProtocol::SimpleFrame_Msg,
00911                                                                  out_stream);
00912               if (result == 0)
00913                 return 0;
00914             }
00915           TAO_SFP_Base::send_message (this->transport_,
00916                                       out_stream,
00917                                       frame);
00918         }
00919       else // larger frame,fragment and send it.
00920         {
00921           flags = flags | 2;
00922           if (frame_info != 0)
00923             {
00924               if (frame_info->boundary_marker)
00925                 flags |= 4;
00926               result = TAO_SFP_Base::start_frame (flags,
00927                                                   flowProtocol::Frame_Msg,
00928                                                   out_stream);
00929               if (result == 0)
00930                 return result;
00931               flowProtocol::my_seq_ulong source_ids;
00932               source_ids.length (1);
00933               source_ids [0] = 0;
00934               TAO_SFP_Base::write_frame_message (frame_info->timestamp,
00935                                                  frame_info->ssrc,
00936                                                  source_ids,
00937                                                  this->sequence_num_,
00938                                                  out_stream);
00939             }
00940           else
00941             {
00942               CORBA::Boolean result = TAO_SFP_Base::start_frame (flags,
00943                                                                  flowProtocol::SimpleFrame_Msg,
00944                                                                  out_stream);
00945               if (result == 0)
00946                 return 0;
00947             }
00948           size_t last_len,current_len;
00949           int message_len = static_cast<int> (out_stream.total_length ());
00950           ACE_Message_Block *mb = frame;
00951           ACE_Message_Block *fragment_mb =
00952             this->get_fragment (mb,
00953                                 message_len,
00954                                 last_len,
00955                                 current_len);
00956           //  This can be either a simpleframe or a sequenced frame,other types of frames.
00957           TAO_SFP_Base::send_message (this->transport_,
00958                                       out_stream,
00959                                       fragment_mb);
00960           out_stream.reset ();
00961           int frag_number = 1;
00962           mb->length (last_len);
00963           mb->rd_ptr (current_len);
00964           // If there is any more data send those as fragments.
00965           while (mb != 0)
00966             {
00967               message_len = TAO_SFP_Base::fragment_len;
00968               fragment_mb = this->get_fragment (mb,
00969                                                 message_len,
00970                                                 last_len,
00971                                                 current_len);
00972               if (mb == 0)
00973                 {
00974                   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"sending the last fragment\n"));
00975                   // This is the last fragment so clear the fragments bit.
00976                   flags = TAO_ENCAP_BYTE_ORDER;
00977                 }
00978               if (fragment_mb == 0)
00979                 break;
00980               if (frame_info != 0)
00981                 {
00982                   TAO_SFP_Base::write_fragment_message (flags,
00983                                                         frag_number++,
00984                                                         this->sequence_num_,
00985                                                         frame_info->ssrc,
00986                                                         out_stream);
00987                 }
00988               else
00989                 {
00990                   TAO_SFP_Base::write_fragment_message (flags,
00991                                                         frag_number++,
00992                                                         this->sequence_num_,
00993                                                         0,
00994                                                         out_stream);
00995                 }
00996               //   send the fragment now.
00997               // without the sleep the fragments gets lost!
00998               // probably because the UDP buffer queue on the sender side
00999               // is overflown it drops the packets.
01000               // XXX: This is a hack.
01001               ACE_OS::sleep (1);
01002               result = TAO_SFP_Base::send_message (this->transport_,
01003                                                    out_stream,
01004                                                    fragment_mb);
01005               if (mb != 0)
01006                 {
01007                   mb->length (last_len);
01008                   mb->rd_ptr (current_len);
01009                 }
01010             }
01011           // Increment the sequence_num after sending the message.
01012           this->sequence_num_++;
01013           // Also reduce the number of credits.
01014           if (this->max_credit_ > 0)
01015             this->current_credit_--;
01016         }
01017     }
01018   else
01019     {
01020       // flow controlled so wait.
01021       // A greater than 0 value indicates that flow control is being exercised.
01022       return 1;
01023     }
01024    return 0;
01025 }

int TAO_SFP_Object::set_policies ( const TAO_AV_PolicyList policies  )  [virtual]

set/get policies.

Reimplemented from TAO_AV_Protocol_Object.

Definition at line 1087 of file sfp.cpp.

References max_credit_, TAO_AV_SFP_CREDIT_POLICY, and TAO_AV_SFP_Credit_Policy::value().

Referenced by TAO_SFP_Consumer_Object::TAO_SFP_Consumer_Object().

01088 {
01089   TAO_AV_Policy *policy = 0;
01090   for (CORBA::ULong i=0;i<policies.length ();i++)
01091     {
01092       policy = policies[i];
01093       switch (policies[i]->type ())
01094         {
01095 
01096         case TAO_AV_SFP_CREDIT_POLICY:
01097           {
01098             TAO_AV_SFP_Credit_Policy *credit_policy =
01099               reinterpret_cast<TAO_AV_SFP_Credit_Policy*> (policy);
01100             this->max_credit_ = credit_policy->value ();
01101           }
01102         default:
01103           break;
01104         }
01105     }
01106   return 0;
01107 }


Member Data Documentation

CORBA::Long TAO_SFP_Object::current_credit_ [protected]

Definition at line 250 of file sfp.h.

Referenced by TAO_SFP_Producer_Object::handle_input(), and send_frame().

CORBA::Long TAO_SFP_Object::max_credit_ [protected]

Definition at line 249 of file sfp.h.

Referenced by TAO_SFP_Producer_Object::handle_input(), set_policies(), and TAO_SFP_Producer_Object::TAO_SFP_Producer_Object().

CORBA::ULong TAO_SFP_Object::sequence_num_ [protected]

Definition at line 247 of file sfp.h.

Referenced by send_frame().

CORBA::ULong TAO_SFP_Object::source_id_ [protected]

Definition at line 248 of file sfp.h.

TAO_SFP_Frame_State TAO_SFP_Object::state_ [protected]

Definition at line 251 of file sfp.h.

Referenced by TAO_SFP_Consumer_Object::handle_input(), and TAO_SFP_Object().


The documentation for this class was generated from the following files:
Generated on Tue Feb 2 17:48:11 2010 for TAO_AV by  doxygen 1.4.7