sfp.cpp

Go to the documentation of this file.
00001 // $Id: sfp.cpp 80173 2007-12-03 16:25:40Z sowayaa $
00002 
00003 #include "orbsvcs/AV/sfp.h"
00004 #include "tao/debug.h"
00005 #include "ace/ARGV.h"
00006 #include "ace/OS_NS_strings.h"
00007 
00008 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00009 
00010 // default arguments to pass to use for the ORB
00011 const char TAO_SFP_Base::TAO_SFP_ORB_ARGUMENTS[] = "-ORBObjRefStyle URL";
00012 
00013 // SFP magic numbers
00014 const char TAO_SFP_Base::TAO_SFP_MAGIC_NUMBER[] = "=SFP";
00015 const char TAO_SFP_Base::TAO_SFP_FRAGMENT_MAGIC_NUMBER[] = "FRAG";
00016 const char TAO_SFP_Base::TAO_SFP_START_MAGIC_NUMBER[] = "=STA";
00017 const char TAO_SFP_Base::TAO_SFP_CREDIT_MAGIC_NUMBER[] = "=CRE";
00018 const char TAO_SFP_Base::TAO_SFP_STARTREPLY_MAGIC_NUMBER[] = "=STR";
00019 
00020 // SFP version 1.0
00021 const unsigned char TAO_SFP_Base::TAO_SFP_MAJOR_VERSION = 1;
00022 const unsigned char TAO_SFP_Base::TAO_SFP_MINOR_VERSION = 0;
00023 
00024 // lengths of various SFP headers
00025 const unsigned char TAO_SFP_Base::TAO_SFP_FRAME_HEADER_LEN = 12;
00026 const unsigned char TAO_SFP_Base::TAO_SFP_MESSAGE_SIZE_OFFSET = 8;
00027 const unsigned char TAO_SFP_Base::TAO_SFP_FRAGMENT_SIZE_OFFSET = 16;
00028 
00029 u_int TAO_SFP_Base::frame_header_len;
00030 u_int TAO_SFP_Base::start_reply_len;
00031 u_int TAO_SFP_Base::start_len;
00032 u_int TAO_SFP_Base::credit_len;
00033 u_int TAO_SFP_Base::fragment_len;
00034 
00035 bool
00036 operator< (const TAO_SFP_Fragment_Node& left,
00037            const TAO_SFP_Fragment_Node& right)
00038 {
00039   return left.fragment_info_.frag_number < right.fragment_info_.frag_number;
00040 }
00041 
00042 
00043 //------------------------------------------------------------
00044 // TAO_SFP_Base
00045 //------------------------------------------------------------
00046 
00047 TAO_SFP_Base::TAO_SFP_Base (void)
00048 {
00049   TAO_OutputCDR output_cdr;
00050   flowProtocol::frameHeader frame_header;
00051   flowProtocol::fragment fragment;
00052   flowProtocol::credit credit;
00053   flowProtocol::Start start;
00054   flowProtocol::StartReply start_reply;
00055 
00056   // fill in the default frameHeader fields.
00057   frame_header.magic_number [0] = '=';
00058   frame_header.magic_number [1] = 'S';
00059   frame_header.magic_number [2] = 'F';
00060   frame_header.magic_number [3] = 'P';
00061   frame_header.flags = TAO_ENCAP_BYTE_ORDER;
00062   output_cdr.reset ();
00063   if (!(output_cdr << frame_header))
00064     {
00065       ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n"));
00066       return;
00067     }
00068 
00069   frame_header_len = static_cast<u_int> (output_cdr.total_length ());
00070   // fill in the default fragment message fields.
00071   fragment.magic_number [0] = 'F';
00072   fragment.magic_number [1] = 'R';
00073   fragment.magic_number [2] = 'A';
00074   fragment.magic_number [3] = 'G';
00075   output_cdr.reset ();
00076   if (!(output_cdr << fragment))
00077     {
00078       ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n"));
00079       return;
00080     }
00081 
00082   fragment_len = static_cast<u_int> (output_cdr.total_length ());
00083   // fill in the default Start message fields.
00084   start.magic_number [0] = '=';
00085   start.magic_number [1] = 'S';
00086   start.magic_number [2] = 'T';
00087   start.magic_number [3] = 'A';
00088   start.major_version = TAO_SFP_Base::TAO_SFP_MAJOR_VERSION;
00089   start.minor_version = TAO_SFP_Base::TAO_SFP_MINOR_VERSION;
00090   start.flags = 0;
00091   output_cdr.reset ();
00092   if (!(output_cdr << start))
00093     {
00094       ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n"));
00095       return;
00096     }
00097 
00098   start_len = static_cast<u_int> (output_cdr.total_length ());
00099   // fill in the default StartReply message fields.
00100   start_reply.magic_number [0] = '=';
00101   start_reply.magic_number [1] = 'S';
00102   start_reply.magic_number [2] = 'T';
00103   start_reply.magic_number [3] = 'R';
00104   start_reply.flags = 0;
00105   output_cdr.reset ();
00106   if (!(output_cdr << start_reply))
00107     {
00108       ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n"));
00109       return;
00110     }
00111 
00112   start_reply_len = static_cast<u_int> (output_cdr.total_length ());
00113 
00114   // fill in the default Credit message fields.
00115   credit.magic_number [0] = '=';
00116   credit.magic_number [1] = 'C';
00117   credit.magic_number [2] = 'R';
00118   credit.magic_number [3] = 'E';
00119   output_cdr.reset ();
00120 
00121   if (!(output_cdr << credit))
00122     {
00123       ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n"));
00124       return;
00125     }
00126   credit_len = static_cast<u_int> (output_cdr.total_length ());
00127 }
00128 
00129 int
00130 TAO_SFP_Base::handle_input (TAO_AV_Transport *transport,
00131                             TAO_SFP_Frame_State &state,
00132                             TAO_AV_frame_info *&frame_info)
00133 {
00134   int result;
00135   flowProtocol::MsgType msg_type;
00136   result = TAO_SFP_Base::peek_message_type (transport,
00137                                             msg_type);
00138   if (result < 0)
00139     return result;
00140   //  TAO_InputCDR &input = state.cdr;
00141   switch (msg_type)
00142     {
00143     case flowProtocol::SimpleFrame_Msg:
00144     case flowProtocol::Frame_Msg:
00145       {
00146         result = TAO_SFP_Base::peek_frame_header (transport,
00147                                                   state.frame_header_,
00148                                                   state.cdr);
00149         if (result < 0)
00150           return result;
00151         int result =TAO_SFP_Base::read_frame (transport,
00152                                               state.frame_header_,
00153                                               state,
00154                                               frame_info);
00155         if (result < 0)
00156           return result;
00157         break;
00158       }
00159     case flowProtocol::Fragment_Msg:
00160       {
00161         result = TAO_SFP_Base::peek_fragment_header (transport,
00162                                                      state.fragment_,
00163                                                      state.cdr);
00164         if (result < 0)
00165           return result;
00166         if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"Fragment received\n"));
00167         result = TAO_SFP_Base::read_fragment (transport,
00168                                               state.fragment_,
00169                                               state,
00170                                               frame_info);
00171         if (result < 0)
00172           return result;
00173         break;
00174       }
00175     case flowProtocol::EndofStream_Msg:
00176       {
00177         result = TAO_SFP_Base::read_endofstream_message (transport,
00178                                                          state.frame_header_,
00179                                                          state.cdr);
00180         if (result < 0)
00181           return result;
00182         break;
00183       }
00184     default:
00185       break;
00186     }
00187   return 0;
00188 }
00189 
00190 int
00191 TAO_SFP_Base::read_frame (TAO_AV_Transport *transport,
00192                           flowProtocol::frameHeader &frame_header,
00193                           TAO_SFP_Frame_State &state,
00194                           TAO_AV_frame_info *&frame_info)
00195 {
00196   ACE_Message_Block *message_block = 0;
00197   int result = -1;
00198 
00199   if (TAO_debug_level > 0)
00200     ACE_DEBUG ((LM_DEBUG,"Reading simple frame\n"));
00201   // Check to see what the length of the message is.
00202   int byte_order = frame_header.flags & 0x1;
00203   int message_len = frame_header.message_size;
00204 
00205 //       ACE_NEW_RETURN (message_block,
00206 //                       ACE_Message_Block (message_len),
00207 //                       0);
00208   state.static_frame_.rd_ptr (state.static_frame_.base ());
00209   state.static_frame_.wr_ptr (state.static_frame_.base ());
00210   int n = transport->recv (state.static_frame_.rd_ptr (),message_len);
00211   if (n == -1)
00212     ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0);
00213   else if (n==0)
00214     ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0);
00215   else if (n != message_len)
00216     ACE_ERROR_RETURN ((LM_ERROR,"SFP::read_simple_frame:message truncated\n"),0);
00217   message_block = &state.static_frame_;
00218   // print the buffer.
00219   //      this->dump_buf (message,n);
00220   // skip over the frame header.
00221   message_block->rd_ptr (frame_header_len);
00222   message_block->wr_ptr (n);
00223   CORBA::ULong ssrc = 0;
00224   TAO_SFP_Fragment_Table_Entry *fragment_entry = 0;
00225   if (frame_header.flags & 0x2)
00226     {
00227       if (TAO_debug_level > 0)
00228         ACE_DEBUG ((LM_DEBUG,"fragmented frame:0th fragment\n"));
00229       state.more_fragments_ = 1;
00230       ACE_Message_Block *data = 0;
00231       switch (frame_header.message_type)
00232         {
00233         case flowProtocol::Frame_Msg:
00234           {
00235             // read the frame info.
00236             ACE_Message_Block frame_info_mb (message_len-frame_header_len+ACE_CDR::MAX_ALIGNMENT);
00237             ACE_CDR::mb_align (&frame_info_mb);
00238             frame_info_mb.copy (message_block->rd_ptr (),
00239                                 message_block->length ());
00240             // print the buffer.
00241             //          this->dump_buf (message_block->rd_ptr (),16);
00242             TAO_InputCDR frame_info_cdr (&frame_info_mb,byte_order);
00243             frame_info_cdr >> state.frame_;
00244             if (TAO_debug_level > 0)
00245               ACE_DEBUG ((LM_DEBUG,
00246                           "frame.timestamp = %d, "
00247                           "frame.synchsource = %d, "
00248                           "frame.sequence_num = %d\n",
00249                           state.frame_.timestamp,
00250                           state.frame_.synchSource,
00251                           state.frame_.sequence_num));
00252             ssrc = state.frame_.synchSource;
00253             // The remaining message in the CDR stream is the fragment
00254             // data for frag.0
00255             data = frame_info_cdr.start ()->clone ();
00256             break;
00257           }
00258         case flowProtocol::SimpleFrame_Msg:
00259           {
00260             data = message_block->clone ();
00261             break;
00262           }
00263         case flowProtocol::SequencedFrame_Msg:
00264           break;
00265         case flowProtocol::SpecialFrame_Msg:
00266           break;
00267         }
00268       if (TAO_debug_level > 0)
00269         ACE_DEBUG ((LM_DEBUG,"Length of 0th fragment= %d\n",data->length ()));
00270       TAO_SFP_Fragment_Table *fragment_table = 0;
00271       result = state.fragment_table_map_.find (ssrc,fragment_table);
00272       if (result != 0)
00273         {
00274           ACE_NEW_RETURN (fragment_table,
00275                           TAO_SFP_Fragment_Table,
00276                           -1);
00277           result = state.fragment_table_map_.bind (ssrc,fragment_table);
00278           if (result < 0)
00279             ACE_ERROR_RETURN ((LM_ERROR,
00280                                "TAO_SFP_Base::read_frame: "
00281                                "fragment_table_map:bind failed\n"),-1);
00282         }
00283 
00284       TAO_SFP_Fragment_Node *new_node;
00285       ACE_NEW_RETURN (new_node,
00286                       TAO_SFP_Fragment_Node,
00287                       0);
00288       new_node->fragment_info_.frag_sz = static_cast<CORBA::ULong> (data->length ());
00289       new_node->fragment_info_.frag_number = 0;
00290       if (state.frame_.source_ids.length () > 0)
00291         new_node->fragment_info_.source_id = state.frame_.source_ids [0];
00292       else
00293         new_node->fragment_info_.source_id = 0;
00294       new_node->data_ = data;
00295       //          TAO_SFP_Base::dump_buf (data->rd_ptr (),data->length ());
00296       if (fragment_table->find (state.frame_.sequence_num,fragment_entry) == 0)
00297         {
00298           // This case can happen where a nth (n > 0)fragment is
00299           // received before the 0th fragment.
00300           if (TAO_debug_level > 0)
00301             ACE_DEBUG ((LM_DEBUG,
00302                         "fragment table entry found for 0th fragment:\n"));
00303           result = fragment_entry->fragment_set_.insert (*new_node);
00304           if (result != 0)
00305             ACE_ERROR_RETURN ((LM_ERROR,
00306                                "insert for 0th fragment failed\n"),0);
00307           //  enter the frame info.
00308 
00309           // check if all the fragments have been received.
00310           state.frame_block_ =
00311             TAO_SFP_Base::check_all_fragments (fragment_entry);
00312           if (state.frame_block_ != 0)
00313             state.more_fragments_ = 0;
00314         }
00315       else
00316         {
00317           if (TAO_debug_level > 0)
00318             ACE_DEBUG ((LM_DEBUG,
00319                         "fragment table entry not found for 0th fragment\n"));
00320           TAO_SFP_Fragment_Table_Entry *new_entry;
00321           ACE_NEW_RETURN (new_entry,
00322                           TAO_SFP_Fragment_Table_Entry,
00323                           0);
00324           result = new_entry->fragment_set_.insert (*new_node);
00325           if (result != 0)
00326             ACE_ERROR_RETURN ((LM_ERROR,"insert for 0th fragment failed\n"),0);
00327           fragment_entry = new_entry;
00328           // not found. so bind a new entry.
00329           result = fragment_table->bind (state.frame_.sequence_num,new_entry);
00330           if (result != 0)
00331             ACE_ERROR_RETURN ((LM_ERROR,"fragment table bind failed\n"),0);
00332           if (frame_header.message_type & 4 )
00333             fragment_entry->frame_info.boundary_marker = 1;
00334           switch (frame_header.message_type)
00335             {
00336             case flowProtocol::Frame_Msg:
00337               fragment_entry->frame_info.ssrc = state.frame_.synchSource;
00338               fragment_entry->frame_info.timestamp = state.frame_.timestamp;
00339               fragment_entry->frame_info.sequence_num = state.frame_.sequence_num;
00340               break;
00341             case flowProtocol::SimpleFrame_Msg:
00342               fragment_entry->frame_info.ssrc =
00343                 fragment_entry->frame_info.timestamp =
00344                 fragment_entry->frame_info.sequence_num = 0;
00345               break;
00346             }
00347           return 0;
00348         }
00349     }
00350   else
00351     {
00352       state.more_fragments_ = 0;
00353       state.frame_block_ = message_block;
00354     }
00355   if (state.more_fragments_ == 0)
00356     {
00357       if (fragment_entry != 0)
00358         {
00359           ACE_NEW_RETURN (frame_info,
00360                           TAO_AV_frame_info,
00361                           -1);
00362           *frame_info = fragment_entry->frame_info;
00363         }
00364     }
00365   return 0;
00366 }
00367 
00368 int
00369 TAO_SFP_Base::read_fragment (TAO_AV_Transport *transport,
00370                              flowProtocol::fragment &fragment,
00371                              TAO_SFP_Frame_State &state,
00372                              TAO_AV_frame_info *&frame_info)
00373 {
00374   TAO_SFP_Fragment_Table_Entry *fragment_entry = 0;
00375   int result = -1;
00376 
00377   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"frag_number = %d, frag_size = %d,source_id  = %d sequnce_num = %d\n",
00378               fragment.frag_number,fragment.frag_sz,fragment.source_id,fragment.sequence_num));
00379 
00380   ACE_Message_Block *data;
00381   ACE_NEW_RETURN (data,
00382                   ACE_Message_Block(fragment.frag_sz),
00383                   -1);
00384 
00385   // Read the fragment.
00386   int n = transport->recv (data->wr_ptr (),fragment.frag_sz);
00387   if ((n == -1) || (n==0))
00388     ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP::read_fragment:%p",""),-1);
00389   // move past the fragment header.
00390   data->rd_ptr (fragment_len);
00391   data->wr_ptr (n);
00392   //  TAO_SFP_Base::dump_buf (data->rd_ptr (),data->length ());
00393   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"length of %dth fragment is: %d\n",
00394               fragment.frag_number,
00395               data->length ()));
00396 
00397   TAO_SFP_Fragment_Node *new_node;
00398   ACE_NEW_RETURN (new_node,
00399                   TAO_SFP_Fragment_Node,
00400                   -1);
00401   new_node->fragment_info_ = fragment;
00402   new_node->data_ = data;
00403   TAO_SFP_Fragment_Table *fragment_table = 0;
00404   result = state.fragment_table_map_.find (fragment.source_id,fragment_table);
00405   if (result != 0)
00406     {
00407       ACE_NEW_RETURN (fragment_table,
00408                       TAO_SFP_Fragment_Table,
00409                       -1);
00410       result = state.fragment_table_map_.bind (fragment.source_id,fragment_table);
00411       if (result < 0)
00412         ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP_Base::read_fragment:fragment_table_map:bind failed\n"),-1);
00413     }
00414   if (fragment_table->find (fragment.sequence_num,fragment_entry) == 0)
00415     {
00416       // Already an entry exists. Traverse the list and insert it at the right place.
00417       result = fragment_entry->fragment_set_.insert (*new_node);
00418       if (result != 0)
00419         ACE_ERROR_RETURN ((LM_ERROR,"insert for %dth node failed\n",fragment.frag_number),-1);
00420       // check if all the fragments have been received.
00421     }
00422   else
00423     {
00424       ACE_NEW_RETURN (fragment_entry,
00425                       TAO_SFP_Fragment_Table_Entry,
00426                       -1);
00427       fragment_entry->fragment_set_.insert (*new_node);
00428       // bind a new entry for this sequence number.
00429       result = fragment_table->bind (fragment.sequence_num,fragment_entry);
00430       if (result != 0)
00431         ACE_ERROR_RETURN ((LM_ERROR,"bind for %dth fragment failed\n",
00432                            fragment.frag_number),-1);
00433     }
00434   if (!(fragment.flags & 0x2))
00435     {
00436       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"Last fragment received\n"));
00437       // if bit 1 is not set then there are
00438       // no more fragments.
00439       fragment_entry->last_received_ = 1;
00440       // since fragment number starts from 0 to n-1 we add 1.
00441       fragment_entry->num_fragments_ = fragment.frag_number + 1;
00442     }
00443 
00444 
00445   state.frame_block_ = check_all_fragments (fragment_entry);
00446   if (state.frame_block_ != 0)
00447     {
00448       state.more_fragments_ = 0;
00449       ACE_NEW_RETURN (frame_info,
00450                       TAO_AV_frame_info,
00451                       -1);
00452       *frame_info = fragment_entry->frame_info;
00453     }
00454   return 0;
00455 }
00456 
00457 ACE_Message_Block*
00458 TAO_SFP_Base::check_all_fragments (TAO_SFP_Fragment_Table_Entry *fragment_entry)
00459 {
00460   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"table size: %d, num_fragments: %d\n",fragment_entry->fragment_set_.size (),fragment_entry->num_fragments_));
00461   // check to see if all the frames have been received.
00462   if (fragment_entry->fragment_set_.size () == fragment_entry->num_fragments_)
00463     {
00464       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"all fragments have been received\n"));
00465       // all the fragments have been received
00466       // we can now chain the ACE_Message_Blocks in the fragment_set_ and then return them
00467       // back.
00468       ACE_Message_Block *frame = 0,*head = 0;
00469       FRAGMENT_SET_ITERATOR frag_iterator (fragment_entry->fragment_set_);
00470       TAO_SFP_Fragment_Node *node;
00471       for (;frag_iterator.next (node) != 0;frag_iterator.advance ())
00472         {
00473           if (!head)
00474             head = frame = node->data_;
00475           else
00476             {
00477               frame->cont (node->data_);
00478               frame = node->data_;
00479             }
00480         }
00481       return head;
00482     }
00483   return 0;
00484 }
00485 
00486 CORBA::Boolean
00487 TAO_SFP_Base::start_frame (CORBA::Octet flags,
00488                            flowProtocol::MsgType type,
00489                            TAO_OutputCDR &msg)
00490 {
00491   msg.reset ();
00492   flowProtocol::frameHeader frame_header;
00493 
00494   frame_header.magic_number [0] = '=';
00495   frame_header.magic_number [1] = 'S';
00496   frame_header.magic_number [2] = 'F';
00497   frame_header.magic_number [3] = 'P';
00498   frame_header.flags = flags;
00499   frame_header.message_type = static_cast<CORBA::Octet> (type);
00500   frame_header.message_size = 0;
00501   if (!(msg << frame_header))
00502     return 0;
00503   return 1;
00504 }
00505 
00506 CORBA::Boolean
00507 TAO_SFP_Base::write_start_message (TAO_OutputCDR &msg)
00508 {
00509   flowProtocol::Start start;
00510 
00511   start.magic_number [0] = '=';
00512   start.magic_number [1] = 'S';
00513   start.magic_number [2] = 'T';
00514   start.magic_number [3] = 'A';
00515   start.major_version = TAO_SFP_MAJOR_VERSION;
00516   start.minor_version = TAO_SFP_MINOR_VERSION;
00517   start.flags = 0;
00518   if (!(msg << start))
00519     return 0;
00520   return 1;
00521 }
00522 
00523 CORBA::Boolean
00524 TAO_SFP_Base::write_start_reply_message (TAO_OutputCDR &msg)
00525 {
00526   flowProtocol::StartReply start_reply;
00527 
00528   start_reply.magic_number [0] = '=';
00529   start_reply.magic_number [1] = 'S';
00530   start_reply.magic_number [2] = 'T';
00531   start_reply.magic_number [3] = 'R';
00532   start_reply.flags = 0;
00533   if (!(msg << start_reply))
00534     return 0;
00535   return 1;
00536 }
00537 
00538 CORBA::Boolean
00539 TAO_SFP_Base::write_credit_message (CORBA::ULong cred_num,
00540                                     TAO_OutputCDR &msg)
00541 {
00542   flowProtocol::credit credit;
00543 
00544   credit.magic_number [0] = '=';
00545   credit.magic_number [1] = 'C';
00546   credit.magic_number [2] = 'R';
00547   credit.magic_number [3] = 'E';
00548   credit.cred_num = cred_num;
00549   if (!(msg << credit))
00550     return 0;
00551   return 1;
00552 }
00553 
00554 CORBA::Boolean
00555 TAO_SFP_Base::write_fragment_message (CORBA::Octet flags,
00556                                       CORBA::ULong fragment_number,
00557                                       CORBA::ULong sequence_number,
00558                                       CORBA::ULong source_id,
00559                                       TAO_OutputCDR &msg)
00560 {
00561   msg.reset ();
00562   flowProtocol::fragment fragment;
00563 
00564   fragment.magic_number [0] = 'F';
00565   fragment.magic_number [1] = 'R';
00566   fragment.magic_number [2] = 'A';
00567   fragment.magic_number [3] = 'G';
00568   fragment.flags = flags;
00569   fragment.frag_number = fragment_number;
00570   fragment.sequence_num = sequence_number;
00571   fragment.source_id = source_id;
00572   if (!(msg << fragment))
00573     return 0;
00574   return 1;
00575 }
00576 
00577 CORBA::Boolean
00578 TAO_SFP_Base::write_frame_message (CORBA::ULong timestamp,
00579                                    CORBA::ULong synchSource,
00580                                    flowProtocol::my_seq_ulong source_ids,
00581                                    CORBA::ULong sequence_num,
00582                                    TAO_OutputCDR &msg)
00583 {
00584   flowProtocol::frame frame;
00585 
00586   frame.timestamp = timestamp;
00587   frame.synchSource = synchSource;
00588   frame.source_ids = source_ids;
00589   frame.sequence_num = sequence_num;
00590   if (!(msg << frame))
00591     return 0;
00592   return 1;
00593 }
00594 
00595 int
00596 TAO_SFP_Base::send_message (TAO_AV_Transport *transport,
00597                             TAO_OutputCDR &stream,
00598                             ACE_Message_Block *mb)
00599 {
00600   CORBA::ULong total_len = static_cast<CORBA::ULong> (stream.total_length ());
00601   if (mb != 0)
00602     {
00603       for (ACE_Message_Block *temp = mb;temp != 0;temp = temp->cont ())
00604         total_len += static_cast<CORBA::ULong> (temp->length ());
00605 
00606       char *buf = (char *) stream.buffer ();
00607       size_t offset = TAO_SFP_MESSAGE_SIZE_OFFSET;
00608       // second character distinguished =SFP and FRAG.
00609       if (*(buf) == 'F')
00610         {
00611           // Fragment message.
00612           offset = TAO_SFP_FRAGMENT_SIZE_OFFSET;
00613         }
00614 #if !defined (ACE_ENABLE_SWAP_ON_WRITE)
00615       *reinterpret_cast<CORBA::ULong *> (buf + offset) = total_len;
00616 #else
00617       if (!stream.do_byte_swap ())
00618         *reinterpret_cast<CORBA::ULong *> (buf + offset) = total_len;
00619       else
00620         ACE_CDR::swap_4 (reinterpret_cast<char *> (&total_len),
00621                          buf + offset);
00622 #endif /* ACE_ENABLE_SWAP_ON_WRITE */
00623     }
00624   // we join the data block with the cdr block.
00625   ACE_Message_Block *end = (ACE_Message_Block *)stream.end ();
00626   if (end == 0)
00627     {
00628       // There is only one message block.
00629       end = (ACE_Message_Block *)stream.begin ();
00630       //      TAO_SFP_Base::dump_buf (end->rd_ptr (),end->length ());
00631     }
00632   end->cont (mb);
00633   ssize_t n = transport->send (stream.begin ());
00634   if (n == -1)
00635     {
00636       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
00637                   "TAO: (%P|%t) closing conn after fault %p\n",
00638                   "GIOP::send_request ()"));
00639       return -1;
00640     }
00641   // EOF.
00642   if (n == 0)
00643     {
00644       if (TAO_debug_level > 0)
00645         ACE_DEBUG ((LM_DEBUG,
00646                     "TAO: (%P|%t) GIOP::send_request () "
00647                     "EOF, closing conn:\n"));
00648       return -1;
00649     }
00650   return 1;
00651 
00652 }
00653 
00654 int
00655 TAO_SFP_Base::peek_message_type (TAO_AV_Transport *transport,
00656                                  flowProtocol::MsgType &msg_type)
00657 {
00658   char peek_buffer [TAO_SFP_MAGIC_NUMBER_LEN+2];// 2 is for flags + message_type.
00659   int peek_len = TAO_SFP_MAGIC_NUMBER_LEN +2;
00660   char magic_number [TAO_SFP_MAGIC_NUMBER_LEN+1];
00661   ssize_t n =transport->recv (peek_buffer,
00662                               peek_len,
00663                               MSG_PEEK);
00664   ACE_OS::strncpy (magic_number,
00665                    peek_buffer,
00666                    TAO_SFP_MAGIC_NUMBER_LEN);
00667   magic_number [TAO_SFP_MAGIC_NUMBER_LEN] = 0;
00668   if (n == -1)
00669     ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),-1);
00670   else if (n==0)
00671     ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),-1);
00672 
00673   if (ACE_OS::strcmp (magic_number,TAO_SFP_START_MAGIC_NUMBER) == 0)
00674     {
00675       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t)Start message received\n"));
00676       msg_type = flowProtocol::Start_Msg;
00677     }
00678   else if (ACE_OS::strcmp (magic_number,TAO_SFP_STARTREPLY_MAGIC_NUMBER) == 0)
00679     {
00680       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t)StartReply message received\n"));
00681       msg_type = flowProtocol::StartReply_Msg;
00682     }
00683   else if (ACE_OS::strcmp (magic_number,TAO_SFP_MAGIC_NUMBER) == 0)
00684     {
00685       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t) frameHeader received\n"));
00686       //      msg_type = flowProtocol::SimpleFrame;
00687       msg_type = (flowProtocol::MsgType)peek_buffer [TAO_SFP_MESSAGE_TYPE_OFFSET];
00688       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"Message Type = %d\n",msg_type));
00689     }
00690   else if (ACE_OS::strcmp (magic_number,TAO_SFP_FRAGMENT_MAGIC_NUMBER) == 0)
00691     {
00692       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t) fragment Header received\n"));
00693       msg_type = flowProtocol::Fragment_Msg;
00694     }
00695   else if (ACE_OS::strcmp (magic_number,TAO_SFP_CREDIT_MAGIC_NUMBER) == 0)
00696     {
00697       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t) credit message received\n"));
00698       msg_type = flowProtocol::Credit_Msg;
00699     }
00700   else
00701     ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP:Invalid magic number\n"),-1);
00702   return 0;
00703 }
00704 
00705 int
00706 TAO_SFP_Base::read_start_message (TAO_AV_Transport *transport,
00707                                   flowProtocol::Start &start,
00708                                   TAO_InputCDR &input)
00709 {
00710   input.grow (start_len);
00711   char *buf = input.rd_ptr ();
00712   int n = transport->recv (buf,
00713                            start_len);
00714   if (n != static_cast<int> (start_len))
00715     ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_start\n"),0);
00716   else
00717     {
00718       if (!(input >> start))
00719         return -1;
00720     }
00721   return 0;
00722 }
00723 
00724 
00725 int
00726 TAO_SFP_Base::read_start_reply_message (TAO_AV_Transport *transport,
00727                                         flowProtocol::StartReply &start_reply,
00728                                         TAO_InputCDR &input)
00729 {
00730   input.grow (start_len);
00731   char *buf = input.rd_ptr ();
00732   int n = transport->recv (buf,
00733                            start_reply_len);
00734   if (n != static_cast<int> (start_len))
00735     ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_start_reply_message"),0);
00736   else
00737     {
00738       if (!(input >> start_reply))
00739         return -1;
00740     }
00741   return 0;
00742 }
00743 
00744 int
00745 TAO_SFP_Base::read_credit_message (TAO_AV_Transport *transport,
00746                                    flowProtocol::credit &credit,
00747                                    TAO_InputCDR &input)
00748 {
00749   input.grow (start_len);
00750   char *buf = input.rd_ptr ();
00751   int n = transport->recv (buf,
00752                            credit_len);
00753   if (n != static_cast<int> (credit_len))
00754     ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_credit_message"),0);
00755   else
00756     {
00757       if (!(input >> credit))
00758         return -1;
00759     }
00760   return 0;
00761 }
00762 
00763 int
00764 TAO_SFP_Base::read_endofstream_message (TAO_AV_Transport *transport,
00765                                         flowProtocol::frameHeader &endofstream,
00766                                         TAO_InputCDR &input)
00767 {
00768   input.grow (start_len);
00769   char *buf = input.rd_ptr ();
00770   int n = transport->recv (buf,
00771                            frame_header_len);
00772   if (n != static_cast<int> (frame_header_len))
00773     ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_endofstream_message"),0);
00774   else
00775     {
00776       if (!(input >> endofstream))
00777         return -1;
00778     }
00779   return 0;
00780 }
00781 
00782 int
00783 TAO_SFP_Base::peek_frame_header (TAO_AV_Transport *transport,
00784                                  flowProtocol::frameHeader &header,
00785                                  TAO_InputCDR &input)
00786 {
00787   input.grow (frame_header_len);
00788   char *buf = input.rd_ptr ();
00789   int n = transport->recv (buf,
00790                            frame_header_len,
00791                            MSG_PEEK);
00792   if (n != static_cast<int> (frame_header_len))
00793     ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_endofstream_message"),0);
00794   else
00795     {
00796       if (!(input >> header))
00797         return -1;
00798     }
00799   return 0;
00800 }
00801 
00802 int
00803 TAO_SFP_Base::peek_fragment_header (TAO_AV_Transport *transport,
00804                                     flowProtocol::fragment &fragment,
00805                                     TAO_InputCDR &input)
00806 {
00807   input.grow (fragment_len);
00808   char *buf = input.rd_ptr ();
00809   int n = transport->recv (buf,
00810                            fragment_len,
00811                            MSG_PEEK);
00812   if (n != static_cast<int> (fragment_len))
00813     ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_endofstream_message"),0);
00814   else
00815     {
00816       if (!(input >> fragment))
00817         return -1;
00818     }
00819   return 0;
00820 }
00821 
00822 void
00823 TAO_SFP_Base::dump_buf (char *buffer,int size)
00824 {
00825   char *buf = buffer;
00826   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"\n========================================\n"));
00827   for (int i=0;i<size;i++)
00828     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"%d ",buf[i]));
00829   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"\n========================================\n"));
00830 }
00831 
00832 //------------------------------------------------------------
00833 // TAO_SFP_Object
00834 //------------------------------------------------------------
00835 
00836 TAO_SFP_Object::TAO_SFP_Object (TAO_AV_Callback *callback,
00837                                 TAO_AV_Transport *transport)
00838   :TAO_AV_Protocol_Object (callback,transport),
00839    source_id_ (10),
00840    max_credit_ (-1),
00841    current_credit_ (-1)
00842 {
00843   TAO_SFP_BASE::instance ();
00844   this->state_.static_frame_.size (2* this->transport_->mtu ());
00845 }
00846 
00847 TAO_SFP_Object::~TAO_SFP_Object (void)
00848 {
00849   //no-op
00850 }
00851 
00852 int
00853 TAO_SFP_Object::destroy (void)
00854 {
00855   int result = -1;
00856   TAO_OutputCDR out_stream;
00857   result = TAO_SFP_Base::start_frame (TAO_ENCAP_BYTE_ORDER,
00858                                       flowProtocol::EndofStream_Msg,
00859                                       out_stream);
00860   if (result < 0)
00861     return result;
00862   result = TAO_SFP_Base::send_message (this->transport_,
00863                                        out_stream);
00864   if (result < 0)
00865     return result;
00866   this->callback_->handle_destroy ();
00867   return 0;
00868 }
00869 
00870 int
00871 TAO_SFP_Object::send_frame (ACE_Message_Block *frame,
00872                             TAO_AV_frame_info *frame_info)
00873 {
00874   TAO_OutputCDR out_stream;
00875   CORBA::Boolean result = 0;
00876   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_SFP_Object::send_frame\n"));
00877   CORBA::Octet flags = TAO_ENCAP_BYTE_ORDER;
00878   if (this->transport_ == 0)
00879     ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP_Object::send_frame: transport is null\n"),-1);
00880    if (this->current_credit_ != 0)
00881     {
00882       // if we have enough credit then we send.
00883       size_t total_length = 0;
00884       for (ACE_Message_Block *temp = frame;temp != 0;temp = temp->cont ())
00885         total_length += temp->length ();
00886       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"total_length of frame=%d\n",total_length));
00887       if (total_length < (TAO_SFP_MAX_PACKET_SIZE -TAO_SFP_Base::frame_header_len))
00888         {
00889           if (frame_info != 0)
00890             {
00891               if (frame_info->boundary_marker)
00892                 flags |= 4;
00893               CORBA::Boolean result = TAO_SFP_Base::start_frame (flags,
00894                                                                  flowProtocol::Frame_Msg,
00895                                                                  out_stream);
00896               if (result == 0)
00897                 return 0;
00898               flowProtocol::my_seq_ulong source_ids;
00899               source_ids.length (1);
00900               source_ids [0] = 0;
00901               TAO_SFP_Base::write_frame_message (frame_info->timestamp,
00902                                                  frame_info->ssrc,
00903                                                  source_ids,
00904                                                  this->sequence_num_,
00905                                                  out_stream);
00906             }
00907           else
00908             {
00909               CORBA::Boolean result = TAO_SFP_Base::start_frame (flags,
00910                                                                  flowProtocol::SimpleFrame_Msg,
00911                                                                  out_stream);
00912               if (result == 0)
00913                 return 0;
00914             }
00915           TAO_SFP_Base::send_message (this->transport_,
00916                                       out_stream,
00917                                       frame);
00918         }
00919       else // larger frame,fragment and send it.
00920         {
00921           flags = flags | 2;
00922           if (frame_info != 0)
00923             {
00924               if (frame_info->boundary_marker)
00925                 flags |= 4;
00926               result = TAO_SFP_Base::start_frame (flags,
00927                                                   flowProtocol::Frame_Msg,
00928                                                   out_stream);
00929               if (result == 0)
00930                 return result;
00931               flowProtocol::my_seq_ulong source_ids;
00932               source_ids.length (1);
00933               source_ids [0] = 0;
00934               TAO_SFP_Base::write_frame_message (frame_info->timestamp,
00935                                                  frame_info->ssrc,
00936                                                  source_ids,
00937                                                  this->sequence_num_,
00938                                                  out_stream);
00939             }
00940           else
00941             {
00942               CORBA::Boolean result = TAO_SFP_Base::start_frame (flags,
00943                                                                  flowProtocol::SimpleFrame_Msg,
00944                                                                  out_stream);
00945               if (result == 0)
00946                 return 0;
00947             }
00948           size_t last_len,current_len;
00949           int message_len = static_cast<int> (out_stream.total_length ());
00950           ACE_Message_Block *mb = frame;
00951           ACE_Message_Block *fragment_mb =
00952             this->get_fragment (mb,
00953                                 message_len,
00954                                 last_len,
00955                                 current_len);
00956           //  This can be either a simpleframe or a sequenced frame,other types of frames.
00957           TAO_SFP_Base::send_message (this->transport_,
00958                                       out_stream,
00959                                       fragment_mb);
00960           out_stream.reset ();
00961           int frag_number = 1;
00962           mb->length (last_len);
00963           mb->rd_ptr (current_len);
00964           // If there is any more data send those as fragments.
00965           while (mb != 0)
00966             {
00967               message_len = TAO_SFP_Base::fragment_len;
00968               fragment_mb = this->get_fragment (mb,
00969                                                 message_len,
00970                                                 last_len,
00971                                                 current_len);
00972               if (mb == 0)
00973                 {
00974                   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"sending the last fragment\n"));
00975                   // This is the last fragment so clear the fragments bit.
00976                   flags = TAO_ENCAP_BYTE_ORDER;
00977                 }
00978               if (fragment_mb == 0)
00979                 break;
00980               if (frame_info != 0)
00981                 {
00982                   TAO_SFP_Base::write_fragment_message (flags,
00983                                                         frag_number++,
00984                                                         this->sequence_num_,
00985                                                         frame_info->ssrc,
00986                                                         out_stream);
00987                 }
00988               else
00989                 {
00990                   TAO_SFP_Base::write_fragment_message (flags,
00991                                                         frag_number++,
00992                                                         this->sequence_num_,
00993                                                         0,
00994                                                         out_stream);
00995                 }
00996               //   send the fragment now.
00997               // without the sleep the fragments gets lost!
00998               // probably because the UDP buffer queue on the sender side
00999               // is overflown it drops the packets.
01000               // XXX: This is a hack.
01001               ACE_OS::sleep (1);
01002               result = TAO_SFP_Base::send_message (this->transport_,
01003                                                    out_stream,
01004                                                    fragment_mb);
01005               if (mb != 0)
01006                 {
01007                   mb->length (last_len);
01008                   mb->rd_ptr (current_len);
01009                 }
01010             }
01011           // Increment the sequence_num after sending the message.
01012           this->sequence_num_++;
01013           // Also reduce the number of credits.
01014           if (this->max_credit_ > 0)
01015             this->current_credit_--;
01016         }
01017     }
01018   else
01019     {
01020       // flow controlled so wait.
01021       // A greater than 0 value indicates that flow control is being exercised.
01022       return 1;
01023     }
01024    return 0;
01025 }
01026 
01027 int
01028 TAO_SFP_Object::send_frame (const iovec * /*iov*/,
01029                             int /*iovcnt*/,
01030                             TAO_AV_frame_info * /*frame_info*/)
01031 {
01032   ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SFP_Object::send_frame"),-1);
01033 }
01034 
01035 int
01036 TAO_SFP_Object::send_frame (const char* /*buf*/,
01037                                size_t /*len*/)
01038 {
01039   return 0;
01040 }
01041 
01042 
01043 ACE_Message_Block*
01044 TAO_SFP_Object::get_fragment (ACE_Message_Block *&mb,
01045                               size_t initial_len,
01046                               size_t &last_mb_orig_len,
01047                               size_t &last_mb_current_len)
01048 {
01049   ACE_Message_Block *fragment_mb = 0,*temp_mb = 0;
01050   size_t prev_len,last_len = 0;
01051   size_t current_len = 0;
01052   size_t message_len = initial_len;
01053   while (mb != 0)
01054     {
01055       prev_len = message_len;
01056       message_len += mb->length ();
01057       if (fragment_mb == 0)
01058         fragment_mb = temp_mb = mb->duplicate ();
01059       if (message_len > TAO_SFP_MAX_PACKET_SIZE)
01060         {
01061           // get only the length that we can accomodate.
01062           current_len = TAO_SFP_MAX_PACKET_SIZE - prev_len;
01063           if (current_len < mb->length ())
01064             {
01065               // The above condition is an assertion.
01066               message_len += (current_len-mb->length ());
01067               last_len = mb->length ();
01068               mb->length (current_len);
01069               temp_mb->length (current_len);
01070             }
01071           break;
01072         }
01073       else
01074         {
01075           // we can accomodate this message block
01076           message_len += mb->length ();
01077           mb = mb->cont ();
01078           temp_mb = temp_mb->cont ();
01079         }
01080     }
01081   last_mb_orig_len = last_len;
01082   last_mb_current_len = current_len;
01083   return fragment_mb;
01084 }
01085 
01086 int
01087 TAO_SFP_Object::set_policies (const TAO_AV_PolicyList& policies)
01088 {
01089   TAO_AV_Policy *policy = 0;
01090   for (CORBA::ULong i=0;i<policies.length ();i++)
01091     {
01092       policy = policies[i];
01093       switch (policies[i]->type ())
01094         {
01095 
01096         case TAO_AV_SFP_CREDIT_POLICY:
01097           {
01098             TAO_AV_SFP_Credit_Policy *credit_policy =
01099               reinterpret_cast<TAO_AV_SFP_Credit_Policy*> (policy);
01100             this->max_credit_ = credit_policy->value ();
01101           }
01102         default:
01103           break;
01104         }
01105     }
01106   return 0;
01107 }
01108 
01109 // TAO_SFP_Consumer_Object
01110 TAO_SFP_Consumer_Object::TAO_SFP_Consumer_Object (TAO_AV_Callback *callback,
01111                                                   TAO_AV_Transport *transport,
01112                                                   ACE_CString& sfp_options)
01113   :TAO_SFP_Object (callback,transport)
01114 {
01115   TAO_AV_PolicyList policies = callback->get_policies ();
01116   if (policies.length () == 0)
01117     return;
01118   this->set_policies (policies);
01119   if (this->max_credit_ > 0)
01120     {
01121       sfp_options = "sfp:1.0:credit=";
01122       char buf[10];
01123       ACE_OS::sprintf(buf, "%d", this->max_credit_);
01124       sfp_options += buf;
01125     }
01126 }
01127 
01128 int
01129 TAO_SFP_Consumer_Object::handle_input (void)
01130 {
01131   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_SFP_Consumer_Object::handle_input\n"));
01132   // This is the entry point for receiving data.
01133   TAO_AV_frame_info *frame_info = 0;
01134   int result = TAO_SFP_Base::handle_input (this->transport_,
01135                                            this->state_,
01136                                            frame_info);
01137   if (result < 0)
01138     ACE_ERROR_RETURN ((LM_ERROR,"ERROR in TAO_SFP_Consumer_Object::handle_input"),result);
01139   if (this->state_.frame_header_.message_type == flowProtocol::EndofStream_Msg)
01140     this->callback_->handle_destroy ();
01141   if (this->state_.is_complete ())
01142     {
01143       this->callback_->receive_frame (this->state_.frame_block_,
01144                                       frame_info);
01145       // Now release the memory for the frame.
01146       if (this->state_.frame_block_ != &this->state_.static_frame_)
01147         {
01148           ACE_Message_Block *temp = 0;
01149           for (temp = this->state_.frame_block_;
01150                temp != 0;
01151                temp = temp->cont ())
01152             {
01153               temp->release ();
01154               delete temp;
01155             }
01156         }
01157       this->state_.reset ();
01158     }
01159   return 0;
01160 }
01161 
01162 TAO_SFP_Producer_Object::TAO_SFP_Producer_Object (TAO_AV_Callback *callback,
01163                                                   TAO_AV_Transport *transport,
01164                                                   const char *sfp_options)
01165   :TAO_SFP_Object (callback,transport),
01166    credit_sequence_num_ (0)
01167 
01168 {
01169   TAO_Tokenizer flow_string (sfp_options,':');
01170   if (flow_string [2] != 0)
01171     {
01172       TAO_Tokenizer options (flow_string[2],'=');
01173       if (options [1] != 0)
01174         this->max_credit_ = ACE_OS::atoi (options[1]);
01175     }
01176 }
01177 
01178 int
01179 TAO_SFP_Producer_Object::handle_input (void)
01180 {
01181   // A producer can only receive credit messages.
01182   int result;
01183   flowProtocol::MsgType msg_type = flowProtocol::Start_Msg;
01184   result = TAO_SFP_Base::peek_message_type (this->transport_,
01185                                             msg_type);
01186   if (result < 0)
01187     return result;
01188   switch (msg_type)
01189     {
01190     case flowProtocol::Credit_Msg:
01191       {
01192         flowProtocol::credit credit;
01193         result = TAO_SFP_Base::read_credit_message (this->transport_,
01194                                                     credit,
01195                                                     this->state_.cdr);
01196         if (result < 0)
01197           return result;
01198       if (!this->credit_sequence_num_)
01199         this->credit_sequence_num_ = credit.cred_num;
01200       else
01201         {
01202           // check that the sequence number is above the last sequence number
01203           // else its a duplicate credit message so we can ignore it.
01204           if (credit.cred_num <= this->credit_sequence_num_)
01205             return 0;
01206           else      // Update our credit now.
01207             this->current_credit_ = this->max_credit_;
01208         }
01209       }
01210       break;
01211     default:
01212       {
01213         ACE_Message_Block mb (2*this->transport_->mtu ());
01214 
01215       // Ignore the rest of the message by just reading.
01216         this->transport_->recv (mb.rd_ptr (),
01217                                 mb.size ());
01218         break;
01219       }
01220     }
01221   return 0;
01222 }
01223 
01224 // TAO_AV_SFP_Factory
01225 TAO_AV_SFP_Factory::TAO_AV_SFP_Factory (void)
01226 {
01227 }
01228 
01229 TAO_AV_SFP_Factory::~TAO_AV_SFP_Factory (void)
01230 {
01231 }
01232 
01233 // Initialization hook from service configurator.
01234 int
01235 TAO_AV_SFP_Factory::init (int /*argc*/, char ** /*argv*/)
01236 {
01237   return 0;
01238 }
01239 
01240 int
01241 TAO_AV_SFP_Factory::match_protocol (const char *flow_string)
01242 {
01243   if (ACE_OS::strncasecmp (flow_string,"sfp",3) == 0)
01244     return 1;
01245   return 0;
01246 }
01247 
01248 TAO_AV_Protocol_Object*
01249 TAO_AV_SFP_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry,
01250                                           TAO_Base_StreamEndPoint *endpoint,
01251                                           TAO_AV_Flow_Handler *handler,
01252                                           TAO_AV_Transport *transport)
01253 {
01254   TAO_AV_Protocol_Object *object = 0;
01255   TAO_AV_Callback *callback = 0;
01256   endpoint->get_callback (entry->flowname (),
01257                        callback);
01258   ACE_CString flow_string( entry->flow_protocol_str () );
01259   switch (entry->role ())
01260     {
01261     case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
01262       {
01263         ACE_NEW_RETURN (object,
01264                         TAO_SFP_Producer_Object (callback,
01265                                                  transport,
01266                                                  flow_string.c_str() ),
01267                         0);
01268       }
01269       break;
01270     case TAO_FlowSpec_Entry::TAO_AV_CONSUMER:
01271       {
01272 
01273         ACE_NEW_RETURN (object,
01274                         TAO_SFP_Consumer_Object (callback,
01275                                                  transport,
01276                                                  flow_string),
01277                         0);
01278         entry->flow_protocol_str( flow_string.c_str() );
01279       }
01280       break;
01281     case TAO_FlowSpec_Entry::TAO_AV_INVALID_ROLE:
01282       return 0;
01283     }
01284   callback->open (object,
01285                   handler);
01286   endpoint->set_protocol_object (entry->flowname (),
01287                                  object);
01288   return object;
01289 }
01290 
01291 //------------------------------------------------------------
01292 // TAO_SFP_Frame_State
01293 //------------------------------------------------------------
01294 
01295 TAO_SFP_Frame_State::TAO_SFP_Frame_State (void)
01296   :cdr (new ACE_Data_Block (ACE_CDR::DEFAULT_BUFSIZE,
01297                             ACE_Message_Block::MB_DATA,
01298                             0,
01299                             0,
01300                             0,
01301                             0,
01302                             0),
01303         0,
01304         TAO_ENCAP_BYTE_ORDER),
01305    more_fragments_ (0),
01306    frame_block_ (0)
01307 {
01308 }
01309 
01310 CORBA::Boolean
01311 TAO_SFP_Frame_State::is_complete (void)
01312 {
01313   return (!this->more_fragments_) &&  (this->frame_block_);
01314 }
01315 
01316 int
01317 TAO_SFP_Frame_State::reset (void)
01318 {
01319   this->frame_block_ = 0;
01320   return 0;
01321 }
01322 
01323 #if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
01324 template ACE_Singleton<TAO_SFP_Base, TAO_SYNCH_MUTEX> *ACE_Singleton<TAO_SFP_Base, TAO_SYNCH_MUTEX>::singleton_;
01325 #endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */
01326 
01327 TAO_END_VERSIONED_NAMESPACE_DECL
01328 
01329 ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_SFP_Factory)
01330 ACE_STATIC_SVC_DEFINE (TAO_AV_SFP_Factory,
01331                        ACE_TEXT ("SFP_Factory"),
01332                        ACE_SVC_OBJ_T,
01333                        &ACE_SVC_NAME (TAO_AV_SFP_Factory),
01334                        ACE_Service_Type::DELETE_THIS |
01335                        ACE_Service_Type::DELETE_OBJ,
01336                        0)

Generated on Tue Feb 2 17:47:49 2010 for TAO_AV by  doxygen 1.4.7