#include <sfp.h>
|
|
Definition at line 133 of file sfp.h.
00134 {
00135 ACTIVE_START,
00136 PASSIVE_START,
00137 TIMEDOUT_T1,
00138 TIMEDOUT_T2,
00139 REPLY_RECEIVED,
00140 START_RECEIVED
00141 };
|
|
|
Definition at line 47 of file sfp.cpp. References ACE_ERROR, credit_len, flowProtocol::StartReply::flags, flowProtocol::Start::flags, flowProtocol::frameHeader::flags, fragment_len, frame_header_len, LM_ERROR, flowProtocol::credit::magic_number, flowProtocol::StartReply::magic_number, flowProtocol::Start::magic_number, flowProtocol::fragment::magic_number, flowProtocol::frameHeader::magic_number, flowProtocol::Start::major_version, flowProtocol::Start::minor_version, ACE_OutputCDR::reset(), start_len, start_reply_len, TAO_ENCAP_BYTE_ORDER, TAO_SFP_MAJOR_VERSION, TAO_SFP_MINOR_VERSION, and ACE_OutputCDR::total_length().
00048 {
00049 TAO_OutputCDR output_cdr;
00050 flowProtocol::frameHeader frame_header;
00051 flowProtocol::fragment fragment;
00052 flowProtocol::credit credit;
00053 flowProtocol::Start start;
00054 flowProtocol::StartReply start_reply;
00055
00056 // fill in the default frameHeader fields.
00057 frame_header.magic_number [0] = '=';
00058 frame_header.magic_number [1] = 'S';
00059 frame_header.magic_number [2] = 'F';
00060 frame_header.magic_number [3] = 'P';
00061 frame_header.flags = TAO_ENCAP_BYTE_ORDER;
00062 output_cdr.reset ();
00063 if (!(output_cdr << frame_header))
00064 {
00065 ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n"));
00066 return;
00067 }
00068
00069 frame_header_len = static_cast<u_int> (output_cdr.total_length ());
00070 // fill in the default fragment message fields.
00071 fragment.magic_number [0] = 'F';
00072 fragment.magic_number [1] = 'R';
00073 fragment.magic_number [2] = 'A';
00074 fragment.magic_number [3] = 'G';
00075 output_cdr.reset ();
00076 if (!(output_cdr << fragment))
00077 {
00078 ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n"));
00079 return;
00080 }
00081
00082 fragment_len = static_cast<u_int> (output_cdr.total_length ());
00083 // fill in the default Start message fields.
00084 start.magic_number [0] = '=';
00085 start.magic_number [1] = 'S';
00086 start.magic_number [2] = 'T';
00087 start.magic_number [3] = 'A';
00088 start.major_version = TAO_SFP_Base::TAO_SFP_MAJOR_VERSION;
00089 start.minor_version = TAO_SFP_Base::TAO_SFP_MINOR_VERSION;
00090 start.flags = 0;
00091 output_cdr.reset ();
00092 if (!(output_cdr << start))
00093 {
00094 ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n"));
00095 return;
00096 }
00097
00098 start_len = static_cast<u_int> (output_cdr.total_length ());
00099 // fill in the default StartReply message fields.
00100 start_reply.magic_number [0] = '=';
00101 start_reply.magic_number [1] = 'S';
00102 start_reply.magic_number [2] = 'T';
00103 start_reply.magic_number [3] = 'R';
00104 start_reply.flags = 0;
00105 output_cdr.reset ();
00106 if (!(output_cdr << start_reply))
00107 {
00108 ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n"));
00109 return;
00110 }
00111
00112 start_reply_len = static_cast<u_int> (output_cdr.total_length ());
00113
00114 // fill in the default Credit message fields.
00115 credit.magic_number [0] = '=';
00116 credit.magic_number [1] = 'C';
00117 credit.magic_number [2] = 'R';
00118 credit.magic_number [3] = 'E';
00119 output_cdr.reset ();
00120
00121 if (!(output_cdr << credit))
00122 {
00123 ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n"));
00124 return;
00125 }
00126 credit_len = static_cast<u_int> (output_cdr.total_length ());
00127 }
|
|
|
Definition at line 458 of file sfp.cpp. References ACE_DEBUG, ACE_Ordered_MultiSet_Iterator< TAO_SFP_Fragment_Node >::advance(), ACE_Message_Block::cont(), TAO_SFP_Fragment_Node::data_, TAO_SFP_Fragment_Table_Entry::fragment_set_, FRAGMENT_SET_ITERATOR, LM_DEBUG, ACE_Ordered_MultiSet_Iterator< TAO_SFP_Fragment_Node >::next(), TAO_SFP_Fragment_Table_Entry::num_fragments_, ACE_Ordered_MultiSet< TAO_SFP_Fragment_Node >::size(), and TAO_debug_level. Referenced by read_fragment(), and read_frame().
00459 {
00460 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"table size: %d, num_fragments: %d\n",fragment_entry->fragment_set_.size (),fragment_entry->num_fragments_));
00461 // check to see if all the frames have been received.
00462 if (fragment_entry->fragment_set_.size () == fragment_entry->num_fragments_)
00463 {
00464 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"all fragments have been received\n"));
00465 // all the fragments have been received
00466 // we can now chain the ACE_Message_Blocks in the fragment_set_ and then return them
00467 // back.
00468 ACE_Message_Block *frame = 0,*head = 0;
00469 FRAGMENT_SET_ITERATOR frag_iterator (fragment_entry->fragment_set_);
00470 TAO_SFP_Fragment_Node *node;
00471 for (;frag_iterator.next (node) != 0;frag_iterator.advance ())
00472 {
00473 if (!head)
00474 head = frame = node->data_;
00475 else
00476 {
00477 frame->cont (node->data_);
00478 frame = node->data_;
00479 }
00480 }
00481 return head;
00482 }
00483 return 0;
00484 }
|
|
||||||||||||
|
Definition at line 823 of file sfp.cpp. References ACE_DEBUG, LM_DEBUG, and TAO_debug_level.
00824 {
00825 char *buf = buffer;
00826 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"\n========================================\n"));
00827 for (int i=0;i<size;i++)
00828 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"%d ",buf[i]));
00829 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"\n========================================\n"));
00830 }
|
|
||||||||||||||||
|
Definition at line 130 of file sfp.cpp. References ACE_DEBUG, TAO_SFP_Frame_State::cdr, TAO_SFP_Frame_State::fragment_, TAO_SFP_Frame_State::frame_header_, LM_DEBUG, peek_fragment_header(), peek_frame_header(), peek_message_type(), read_endofstream_message(), read_fragment(), read_frame(), and TAO_debug_level. Referenced by TAO_SFP_Consumer_Object::handle_input().
00133 {
00134 int result;
00135 flowProtocol::MsgType msg_type;
00136 result = TAO_SFP_Base::peek_message_type (transport,
00137 msg_type);
00138 if (result < 0)
00139 return result;
00140 // TAO_InputCDR &input = state.cdr;
00141 switch (msg_type)
00142 {
00143 case flowProtocol::SimpleFrame_Msg:
00144 case flowProtocol::Frame_Msg:
00145 {
00146 result = TAO_SFP_Base::peek_frame_header (transport,
00147 state.frame_header_,
00148 state.cdr);
00149 if (result < 0)
00150 return result;
00151 int result =TAO_SFP_Base::read_frame (transport,
00152 state.frame_header_,
00153 state,
00154 frame_info);
00155 if (result < 0)
00156 return result;
00157 break;
00158 }
00159 case flowProtocol::Fragment_Msg:
00160 {
00161 result = TAO_SFP_Base::peek_fragment_header (transport,
00162 state.fragment_,
00163 state.cdr);
00164 if (result < 0)
00165 return result;
00166 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"Fragment received\n"));
00167 result = TAO_SFP_Base::read_fragment (transport,
00168 state.fragment_,
00169 state,
00170 frame_info);
00171 if (result < 0)
00172 return result;
00173 break;
00174 }
00175 case flowProtocol::EndofStream_Msg:
00176 {
00177 result = TAO_SFP_Base::read_endofstream_message (transport,
00178 state.frame_header_,
00179 state.cdr);
00180 if (result < 0)
00181 return result;
00182 break;
00183 }
00184 default:
00185 break;
00186 }
00187 return 0;
00188 }
|
|
||||||||||||||||
|
Definition at line 803 of file sfp.cpp. References ACE_ERROR_RETURN, fragment_len, ACE_InputCDR::grow(), LM_ERROR, MSG_PEEK, ACE_InputCDR::rd_ptr(), and TAO_AV_Transport::recv(). Referenced by handle_input().
00806 {
00807 input.grow (fragment_len);
00808 char *buf = input.rd_ptr ();
00809 int n = transport->recv (buf,
00810 fragment_len,
00811 MSG_PEEK);
00812 if (n != static_cast<int> (fragment_len))
00813 ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_endofstream_message"),0);
00814 else
00815 {
00816 if (!(input >> fragment))
00817 return -1;
00818 }
00819 return 0;
00820 }
|
|
||||||||||||||||
|
Definition at line 783 of file sfp.cpp. References ACE_ERROR_RETURN, frame_header_len, ACE_InputCDR::grow(), LM_ERROR, MSG_PEEK, ACE_InputCDR::rd_ptr(), and TAO_AV_Transport::recv(). Referenced by handle_input().
00786 {
00787 input.grow (frame_header_len);
00788 char *buf = input.rd_ptr ();
00789 int n = transport->recv (buf,
00790 frame_header_len,
00791 MSG_PEEK);
00792 if (n != static_cast<int> (frame_header_len))
00793 ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_endofstream_message"),0);
00794 else
00795 {
00796 if (!(input >> header))
00797 return -1;
00798 }
00799 return 0;
00800 }
|
|
||||||||||||
|
Definition at line 655 of file sfp.cpp. References ACE_DEBUG, ACE_ERROR_RETURN, LM_DEBUG, LM_ERROR, MSG_PEEK, TAO_AV_Transport::recv(), ssize_t, ACE_OS::strcmp(), ACE_OS::strncpy(), TAO_debug_level, TAO_SFP_CREDIT_MAGIC_NUMBER, TAO_SFP_FRAGMENT_MAGIC_NUMBER, TAO_SFP_MAGIC_NUMBER, TAO_SFP_MAGIC_NUMBER_LEN, TAO_SFP_MESSAGE_TYPE_OFFSET, TAO_SFP_START_MAGIC_NUMBER, and TAO_SFP_STARTREPLY_MAGIC_NUMBER. Referenced by TAO_SFP_Producer_Object::handle_input(), and handle_input().
00657 {
00658 char peek_buffer [TAO_SFP_MAGIC_NUMBER_LEN+2];// 2 is for flags + message_type.
00659 int peek_len = TAO_SFP_MAGIC_NUMBER_LEN +2;
00660 char magic_number [TAO_SFP_MAGIC_NUMBER_LEN+1];
00661 ssize_t n =transport->recv (peek_buffer,
00662 peek_len,
00663 MSG_PEEK);
00664 ACE_OS::strncpy (magic_number,
00665 peek_buffer,
00666 TAO_SFP_MAGIC_NUMBER_LEN);
00667 magic_number [TAO_SFP_MAGIC_NUMBER_LEN] = 0;
00668 if (n == -1)
00669 ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),-1);
00670 else if (n==0)
00671 ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),-1);
00672
00673 if (ACE_OS::strcmp (magic_number,TAO_SFP_START_MAGIC_NUMBER) == 0)
00674 {
00675 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t)Start message received\n"));
00676 msg_type = flowProtocol::Start_Msg;
00677 }
00678 else if (ACE_OS::strcmp (magic_number,TAO_SFP_STARTREPLY_MAGIC_NUMBER) == 0)
00679 {
00680 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t)StartReply message received\n"));
00681 msg_type = flowProtocol::StartReply_Msg;
00682 }
00683 else if (ACE_OS::strcmp (magic_number,TAO_SFP_MAGIC_NUMBER) == 0)
00684 {
00685 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t) frameHeader received\n"));
00686 // msg_type = flowProtocol::SimpleFrame;
00687 msg_type = (flowProtocol::MsgType)peek_buffer [TAO_SFP_MESSAGE_TYPE_OFFSET];
00688 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"Message Type = %d\n",msg_type));
00689 }
00690 else if (ACE_OS::strcmp (magic_number,TAO_SFP_FRAGMENT_MAGIC_NUMBER) == 0)
00691 {
00692 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t) fragment Header received\n"));
00693 msg_type = flowProtocol::Fragment_Msg;
00694 }
00695 else if (ACE_OS::strcmp (magic_number,TAO_SFP_CREDIT_MAGIC_NUMBER) == 0)
00696 {
00697 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t) credit message received\n"));
00698 msg_type = flowProtocol::Credit_Msg;
00699 }
00700 else
00701 ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP:Invalid magic number\n"),-1);
00702 return 0;
00703 }
|
|
||||||||||||||||
|
Definition at line 745 of file sfp.cpp. References ACE_ERROR_RETURN, credit_len, ACE_InputCDR::grow(), LM_ERROR, ACE_InputCDR::rd_ptr(), TAO_AV_Transport::recv(), and start_len. Referenced by TAO_SFP_Producer_Object::handle_input().
00748 {
00749 input.grow (start_len);
00750 char *buf = input.rd_ptr ();
00751 int n = transport->recv (buf,
00752 credit_len);
00753 if (n != static_cast<int> (credit_len))
00754 ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_credit_message"),0);
00755 else
00756 {
00757 if (!(input >> credit))
00758 return -1;
00759 }
00760 return 0;
00761 }
|
|
||||||||||||||||
|
Definition at line 764 of file sfp.cpp. References ACE_ERROR_RETURN, frame_header_len, ACE_InputCDR::grow(), LM_ERROR, ACE_InputCDR::rd_ptr(), TAO_AV_Transport::recv(), and start_len. Referenced by handle_input().
00767 {
00768 input.grow (start_len);
00769 char *buf = input.rd_ptr ();
00770 int n = transport->recv (buf,
00771 frame_header_len);
00772 if (n != static_cast<int> (frame_header_len))
00773 ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_endofstream_message"),0);
00774 else
00775 {
00776 if (!(input >> endofstream))
00777 return -1;
00778 }
00779 return 0;
00780 }
|
|
||||||||||||||||||||
|
Definition at line 369 of file sfp.cpp. References ACE_DEBUG, ACE_ERROR_RETURN, ACE_Message_Block, ACE_NEW_RETURN, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_LOCK >::bind(), check_all_fragments(), TAO_SFP_Fragment_Node::data_, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_LOCK >::find(), flowProtocol::fragment::flags, flowProtocol::fragment::frag_number, flowProtocol::fragment::frag_sz, TAO_SFP_Fragment_Node::fragment_info_, fragment_len, TAO_SFP_Fragment_Table_Entry::fragment_set_, TAO_SFP_Frame_State::fragment_table_map_, TAO_SFP_Frame_State::frame_block_, TAO_SFP_Fragment_Table_Entry::frame_info, ACE_Ordered_MultiSet< TAO_SFP_Fragment_Node >::insert(), TAO_SFP_Fragment_Table_Entry::last_received_, ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO_SFP_Frame_State::more_fragments_, TAO_SFP_Fragment_Table_Entry::num_fragments_, ACE_Message_Block::rd_ptr(), TAO_AV_Transport::recv(), flowProtocol::fragment::sequence_num, flowProtocol::fragment::source_id, TAO_debug_level, TAO_SFP_Fragment_Table, and ACE_Message_Block::wr_ptr(). Referenced by handle_input().
00373 {
00374 TAO_SFP_Fragment_Table_Entry *fragment_entry = 0;
00375 int result = -1;
00376
00377 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"frag_number = %d, frag_size = %d,source_id = %d sequnce_num = %d\n",
00378 fragment.frag_number,fragment.frag_sz,fragment.source_id,fragment.sequence_num));
00379
00380 ACE_Message_Block *data;
00381 ACE_NEW_RETURN (data,
00382 ACE_Message_Block(fragment.frag_sz),
00383 -1);
00384
00385 // Read the fragment.
00386 int n = transport->recv (data->wr_ptr (),fragment.frag_sz);
00387 if ((n == -1) || (n==0))
00388 ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP::read_fragment:%p",""),-1);
00389 // move past the fragment header.
00390 data->rd_ptr (fragment_len);
00391 data->wr_ptr (n);
00392 // TAO_SFP_Base::dump_buf (data->rd_ptr (),data->length ());
00393 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"length of %dth fragment is: %d\n",
00394 fragment.frag_number,
00395 data->length ()));
00396
00397 TAO_SFP_Fragment_Node *new_node;
00398 ACE_NEW_RETURN (new_node,
00399 TAO_SFP_Fragment_Node,
00400 -1);
00401 new_node->fragment_info_ = fragment;
00402 new_node->data_ = data;
00403 TAO_SFP_Fragment_Table *fragment_table = 0;
00404 result = state.fragment_table_map_.find (fragment.source_id,fragment_table);
00405 if (result != 0)
00406 {
00407 ACE_NEW_RETURN (fragment_table,
00408 TAO_SFP_Fragment_Table,
00409 -1);
00410 result = state.fragment_table_map_.bind (fragment.source_id,fragment_table);
00411 if (result < 0)
00412 ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP_Base::read_fragment:fragment_table_map:bind failed\n"),-1);
00413 }
00414 if (fragment_table->find (fragment.sequence_num,fragment_entry) == 0)
00415 {
00416 // Already an entry exists. Traverse the list and insert it at the right place.
00417 result = fragment_entry->fragment_set_.insert (*new_node);
00418 if (result != 0)
00419 ACE_ERROR_RETURN ((LM_ERROR,"insert for %dth node failed\n",fragment.frag_number),-1);
00420 // check if all the fragments have been received.
00421 }
00422 else
00423 {
00424 ACE_NEW_RETURN (fragment_entry,
00425 TAO_SFP_Fragment_Table_Entry,
00426 -1);
00427 fragment_entry->fragment_set_.insert (*new_node);
00428 // bind a new entry for this sequence number.
00429 result = fragment_table->bind (fragment.sequence_num,fragment_entry);
00430 if (result != 0)
00431 ACE_ERROR_RETURN ((LM_ERROR,"bind for %dth fragment failed\n",
00432 fragment.frag_number),-1);
00433 }
00434 if (!(fragment.flags & 0x2))
00435 {
00436 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"Last fragment received\n"));
00437 // if bit 1 is not set then there are
00438 // no more fragments.
00439 fragment_entry->last_received_ = 1;
00440 // since fragment number starts from 0 to n-1 we add 1.
00441 fragment_entry->num_fragments_ = fragment.frag_number + 1;
00442 }
00443
00444
00445 state.frame_block_ = check_all_fragments (fragment_entry);
00446 if (state.frame_block_ != 0)
00447 {
00448 state.more_fragments_ = 0;
00449 ACE_NEW_RETURN (frame_info,
00450 TAO_AV_frame_info,
00451 -1);
00452 *frame_info = fragment_entry->frame_info;
00453 }
00454 return 0;
00455 }
|
|
||||||||||||||||||||
|
Definition at line 191 of file sfp.cpp. References ACE_DEBUG, ACE_ERROR_RETURN, ACE_NEW_RETURN, ACE_Message_Block::base(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_LOCK >::bind(), TAO_AV_frame_info::boundary_marker, check_all_fragments(), ACE_Message_Block::clone(), ACE_Message_Block::copy(), TAO_SFP_Fragment_Node::data_, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_LOCK >::find(), flowProtocol::frameHeader::flags, flowProtocol::fragment::frag_number, flowProtocol::fragment::frag_sz, TAO_SFP_Fragment_Node::fragment_info_, TAO_SFP_Fragment_Table_Entry::fragment_set_, TAO_SFP_Frame_State::fragment_table_map_, TAO_SFP_Frame_State::frame_, TAO_SFP_Frame_State::frame_block_, frame_header_len, TAO_SFP_Fragment_Table_Entry::frame_info, ACE_Ordered_MultiSet< TAO_SFP_Fragment_Node >::insert(), ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, ACE_CDR::mb_align(), flowProtocol::frameHeader::message_size, flowProtocol::frameHeader::message_type, TAO_SFP_Frame_State::more_fragments_, ACE_Message_Block::rd_ptr(), TAO_AV_Transport::recv(), TAO_AV_frame_info::sequence_num, flowProtocol::frame::sequence_num, flowProtocol::fragment::source_id, flowProtocol::frame::source_ids, TAO_AV_frame_info::ssrc, ACE_InputCDR::start(), TAO_SFP_Frame_State::static_frame_, flowProtocol::frame::synchSource, TAO_debug_level, TAO_SFP_Fragment_Table, TAO_AV_frame_info::timestamp, flowProtocol::frame::timestamp, and ACE_Message_Block::wr_ptr(). Referenced by handle_input().
00195 {
00196 ACE_Message_Block *message_block = 0;
00197 int result = -1;
00198
00199 if (TAO_debug_level > 0)
00200 ACE_DEBUG ((LM_DEBUG,"Reading simple frame\n"));
00201 // Check to see what the length of the message is.
00202 int byte_order = frame_header.flags & 0x1;
00203 int message_len = frame_header.message_size;
00204
00205 // ACE_NEW_RETURN (message_block,
00206 // ACE_Message_Block (message_len),
00207 // 0);
00208 state.static_frame_.rd_ptr (state.static_frame_.base ());
00209 state.static_frame_.wr_ptr (state.static_frame_.base ());
00210 int n = transport->recv (state.static_frame_.rd_ptr (),message_len);
00211 if (n == -1)
00212 ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0);
00213 else if (n==0)
00214 ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0);
00215 else if (n != message_len)
00216 ACE_ERROR_RETURN ((LM_ERROR,"SFP::read_simple_frame:message truncated\n"),0);
00217 message_block = &state.static_frame_;
00218 // print the buffer.
00219 // this->dump_buf (message,n);
00220 // skip over the frame header.
00221 message_block->rd_ptr (frame_header_len);
00222 message_block->wr_ptr (n);
00223 CORBA::ULong ssrc = 0;
00224 TAO_SFP_Fragment_Table_Entry *fragment_entry = 0;
00225 if (frame_header.flags & 0x2)
00226 {
00227 if (TAO_debug_level > 0)
00228 ACE_DEBUG ((LM_DEBUG,"fragmented frame:0th fragment\n"));
00229 state.more_fragments_ = 1;
00230 ACE_Message_Block *data = 0;
00231 switch (frame_header.message_type)
00232 {
00233 case flowProtocol::Frame_Msg:
00234 {
00235 // read the frame info.
00236 ACE_Message_Block frame_info_mb (message_len-frame_header_len+ACE_CDR::MAX_ALIGNMENT);
00237 ACE_CDR::mb_align (&frame_info_mb);
00238 frame_info_mb.copy (message_block->rd_ptr (),
00239 message_block->length ());
00240 // print the buffer.
00241 // this->dump_buf (message_block->rd_ptr (),16);
00242 TAO_InputCDR frame_info_cdr (&frame_info_mb,byte_order);
00243 frame_info_cdr >> state.frame_;
00244 if (TAO_debug_level > 0)
00245 ACE_DEBUG ((LM_DEBUG,
00246 "frame.timestamp = %d, "
00247 "frame.synchsource = %d, "
00248 "frame.sequence_num = %d\n",
00249 state.frame_.timestamp,
00250 state.frame_.synchSource,
00251 state.frame_.sequence_num));
00252 ssrc = state.frame_.synchSource;
00253 // The remaining message in the CDR stream is the fragment
00254 // data for frag.0
00255 data = frame_info_cdr.start ()->clone ();
00256 break;
00257 }
00258 case flowProtocol::SimpleFrame_Msg:
00259 {
00260 data = message_block->clone ();
00261 break;
00262 }
00263 case flowProtocol::SequencedFrame_Msg:
00264 break;
00265 case flowProtocol::SpecialFrame_Msg:
00266 break;
00267 }
00268 if (TAO_debug_level > 0)
00269 ACE_DEBUG ((LM_DEBUG,"Length of 0th fragment= %d\n",data->length ()));
00270 TAO_SFP_Fragment_Table *fragment_table = 0;
00271 result = state.fragment_table_map_.find (ssrc,fragment_table);
00272 if (result != 0)
00273 {
00274 ACE_NEW_RETURN (fragment_table,
00275 TAO_SFP_Fragment_Table,
00276 -1);
00277 result = state.fragment_table_map_.bind (ssrc,fragment_table);
00278 if (result < 0)
00279 ACE_ERROR_RETURN ((LM_ERROR,
00280 "TAO_SFP_Base::read_frame: "
00281 "fragment_table_map:bind failed\n"),-1);
00282 }
00283
00284 TAO_SFP_Fragment_Node *new_node;
00285 ACE_NEW_RETURN (new_node,
00286 TAO_SFP_Fragment_Node,
00287 0);
00288 new_node->fragment_info_.frag_sz = static_cast<CORBA::ULong> (data->length ());
00289 new_node->fragment_info_.frag_number = 0;
00290 if (state.frame_.source_ids.length () > 0)
00291 new_node->fragment_info_.source_id = state.frame_.source_ids [0];
00292 else
00293 new_node->fragment_info_.source_id = 0;
00294 new_node->data_ = data;
00295 // TAO_SFP_Base::dump_buf (data->rd_ptr (),data->length ());
00296 if (fragment_table->find (state.frame_.sequence_num,fragment_entry) == 0)
00297 {
00298 // This case can happen where a nth (n > 0)fragment is
00299 // received before the 0th fragment.
00300 if (TAO_debug_level > 0)
00301 ACE_DEBUG ((LM_DEBUG,
00302 "fragment table entry found for 0th fragment:\n"));
00303 result = fragment_entry->fragment_set_.insert (*new_node);
00304 if (result != 0)
00305 ACE_ERROR_RETURN ((LM_ERROR,
00306 "insert for 0th fragment failed\n"),0);
00307 // enter the frame info.
00308
00309 // check if all the fragments have been received.
00310 state.frame_block_ =
00311 TAO_SFP_Base::check_all_fragments (fragment_entry);
00312 if (state.frame_block_ != 0)
00313 state.more_fragments_ = 0;
00314 }
00315 else
00316 {
00317 if (TAO_debug_level > 0)
00318 ACE_DEBUG ((LM_DEBUG,
00319 "fragment table entry not found for 0th fragment\n"));
00320 TAO_SFP_Fragment_Table_Entry *new_entry;
00321 ACE_NEW_RETURN (new_entry,
00322 TAO_SFP_Fragment_Table_Entry,
00323 0);
00324 result = new_entry->fragment_set_.insert (*new_node);
00325 if (result != 0)
00326 ACE_ERROR_RETURN ((LM_ERROR,"insert for 0th fragment failed\n"),0);
00327 fragment_entry = new_entry;
00328 // not found. so bind a new entry.
00329 result = fragment_table->bind (state.frame_.sequence_num,new_entry);
00330 if (result != 0)
00331 ACE_ERROR_RETURN ((LM_ERROR,"fragment table bind failed\n"),0);
00332 if (frame_header.message_type & 4 )
00333 fragment_entry->frame_info.boundary_marker = 1;
00334 switch (frame_header.message_type)
00335 {
00336 case flowProtocol::Frame_Msg:
00337 fragment_entry->frame_info.ssrc = state.frame_.synchSource;
00338 fragment_entry->frame_info.timestamp = state.frame_.timestamp;
00339 fragment_entry->frame_info.sequence_num = state.frame_.sequence_num;
00340 break;
00341 case flowProtocol::SimpleFrame_Msg:
00342 fragment_entry->frame_info.ssrc =
00343 fragment_entry->frame_info.timestamp =
00344 fragment_entry->frame_info.sequence_num = 0;
00345 break;
00346 }
00347 return 0;
00348 }
00349 }
00350 else
00351 {
00352 state.more_fragments_ = 0;
00353 state.frame_block_ = message_block;
00354 }
00355 if (state.more_fragments_ == 0)
00356 {
00357 if (fragment_entry != 0)
00358 {
00359 ACE_NEW_RETURN (frame_info,
00360 TAO_AV_frame_info,
00361 -1);
00362 *frame_info = fragment_entry->frame_info;
00363 }
00364 }
00365 return 0;
00366 }
|
|
||||||||||||||||
|
Definition at line 706 of file sfp.cpp. References ACE_ERROR_RETURN, ACE_InputCDR::grow(), LM_ERROR, ACE_InputCDR::rd_ptr(), TAO_AV_Transport::recv(), and start_len.
00709 {
00710 input.grow (start_len);
00711 char *buf = input.rd_ptr ();
00712 int n = transport->recv (buf,
00713 start_len);
00714 if (n != static_cast<int> (start_len))
00715 ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_start\n"),0);
00716 else
00717 {
00718 if (!(input >> start))
00719 return -1;
00720 }
00721 return 0;
00722 }
|
|
||||||||||||||||
|
Definition at line 726 of file sfp.cpp. References ACE_ERROR_RETURN, ACE_InputCDR::grow(), LM_ERROR, ACE_InputCDR::rd_ptr(), TAO_AV_Transport::recv(), start_len, and start_reply_len.
00729 {
00730 input.grow (start_len);
00731 char *buf = input.rd_ptr ();
00732 int n = transport->recv (buf,
00733 start_reply_len);
00734 if (n != static_cast<int> (start_len))
00735 ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_start_reply_message"),0);
00736 else
00737 {
00738 if (!(input >> start_reply))
00739 return -1;
00740 }
00741 return 0;
00742 }
|
|
||||||||||||||||
|
Definition at line 596 of file sfp.cpp. References ACE_DEBUG, ACE_OutputCDR::begin(), ACE_OutputCDR::buffer(), ACE_Message_Block::cont(), ACE_OutputCDR::do_byte_swap(), ACE_OutputCDR::end(), ACE_Message_Block::length(), LM_DEBUG, TAO_AV_Transport::send(), ssize_t, ACE_CDR::swap_4(), TAO_debug_level, TAO_SFP_FRAGMENT_SIZE_OFFSET, TAO_SFP_MESSAGE_SIZE_OFFSET, and ACE_OutputCDR::total_length(). Referenced by TAO_SFP_Object::destroy(), and TAO_SFP_Object::send_frame().
00599 {
00600 CORBA::ULong total_len = static_cast<CORBA::ULong> (stream.total_length ());
00601 if (mb != 0)
00602 {
00603 for (ACE_Message_Block *temp = mb;temp != 0;temp = temp->cont ())
00604 total_len += static_cast<CORBA::ULong> (temp->length ());
00605
00606 char *buf = (char *) stream.buffer ();
00607 size_t offset = TAO_SFP_MESSAGE_SIZE_OFFSET;
00608 // second character distinguished =SFP and FRAG.
00609 if (*(buf) == 'F')
00610 {
00611 // Fragment message.
00612 offset = TAO_SFP_FRAGMENT_SIZE_OFFSET;
00613 }
00614 #if !defined (ACE_ENABLE_SWAP_ON_WRITE)
00615 *reinterpret_cast<CORBA::ULong *> (buf + offset) = total_len;
00616 #else
00617 if (!stream.do_byte_swap ())
00618 *reinterpret_cast<CORBA::ULong *> (buf + offset) = total_len;
00619 else
00620 ACE_CDR::swap_4 (reinterpret_cast<char *> (&total_len),
00621 buf + offset);
00622 #endif /* ACE_ENABLE_SWAP_ON_WRITE */
00623 }
00624 // we join the data block with the cdr block.
00625 ACE_Message_Block *end = (ACE_Message_Block *)stream.end ();
00626 if (end == 0)
00627 {
00628 // There is only one message block.
00629 end = (ACE_Message_Block *)stream.begin ();
00630 // TAO_SFP_Base::dump_buf (end->rd_ptr (),end->length ());
00631 }
00632 end->cont (mb);
00633 ssize_t n = transport->send (stream.begin ());
00634 if (n == -1)
00635 {
00636 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
00637 "TAO: (%P|%t) closing conn after fault %p\n",
00638 "GIOP::send_request ()"));
00639 return -1;
00640 }
00641 // EOF.
00642 if (n == 0)
00643 {
00644 if (TAO_debug_level > 0)
00645 ACE_DEBUG ((LM_DEBUG,
00646 "TAO: (%P|%t) GIOP::send_request () "
00647 "EOF, closing conn:\n"));
00648 return -1;
00649 }
00650 return 1;
00651
00652 }
|
|
||||||||||||||||
|
Definition at line 487 of file sfp.cpp. References flowProtocol::frameHeader::flags, flowProtocol::frameHeader::magic_number, flowProtocol::frameHeader::message_size, flowProtocol::frameHeader::message_type, and ACE_OutputCDR::reset(). Referenced by TAO_SFP_Object::destroy(), and TAO_SFP_Object::send_frame().
00490 {
00491 msg.reset ();
00492 flowProtocol::frameHeader frame_header;
00493
00494 frame_header.magic_number [0] = '=';
00495 frame_header.magic_number [1] = 'S';
00496 frame_header.magic_number [2] = 'F';
00497 frame_header.magic_number [3] = 'P';
00498 frame_header.flags = flags;
00499 frame_header.message_type = static_cast<CORBA::Octet> (type);
00500 frame_header.message_size = 0;
00501 if (!(msg << frame_header))
00502 return 0;
00503 return 1;
00504 }
|
|
||||||||||||
|
Definition at line 539 of file sfp.cpp. References flowProtocol::credit::cred_num, and flowProtocol::credit::magic_number.
00541 {
00542 flowProtocol::credit credit;
00543
00544 credit.magic_number [0] = '=';
00545 credit.magic_number [1] = 'C';
00546 credit.magic_number [2] = 'R';
00547 credit.magic_number [3] = 'E';
00548 credit.cred_num = cred_num;
00549 if (!(msg << credit))
00550 return 0;
00551 return 1;
00552 }
|
|
||||||||||||||||||||||||
|
Definition at line 555 of file sfp.cpp. References flowProtocol::fragment::flags, flowProtocol::fragment::frag_number, flowProtocol::fragment::magic_number, ACE_OutputCDR::reset(), flowProtocol::fragment::sequence_num, and flowProtocol::fragment::source_id. Referenced by TAO_SFP_Object::send_frame().
00560 {
00561 msg.reset ();
00562 flowProtocol::fragment fragment;
00563
00564 fragment.magic_number [0] = 'F';
00565 fragment.magic_number [1] = 'R';
00566 fragment.magic_number [2] = 'A';
00567 fragment.magic_number [3] = 'G';
00568 fragment.flags = flags;
00569 fragment.frag_number = fragment_number;
00570 fragment.sequence_num = sequence_number;
00571 fragment.source_id = source_id;
00572 if (!(msg << fragment))
00573 return 0;
00574 return 1;
00575 }
|
|
||||||||||||||||||||||||
|
Definition at line 578 of file sfp.cpp. References flowProtocol::my_seq_ulong, flowProtocol::frame::sequence_num, flowProtocol::frame::source_ids, flowProtocol::frame::synchSource, and flowProtocol::frame::timestamp. Referenced by TAO_SFP_Object::send_frame().
00583 {
00584 flowProtocol::frame frame;
00585
00586 frame.timestamp = timestamp;
00587 frame.synchSource = synchSource;
00588 frame.source_ids = source_ids;
00589 frame.sequence_num = sequence_num;
00590 if (!(msg << frame))
00591 return 0;
00592 return 1;
00593 }
|
|
|
Definition at line 507 of file sfp.cpp. References flowProtocol::Start::flags, flowProtocol::Start::magic_number, flowProtocol::Start::major_version, flowProtocol::Start::minor_version, TAO_SFP_MAJOR_VERSION, and TAO_SFP_MINOR_VERSION.
00508 {
00509 flowProtocol::Start start;
00510
00511 start.magic_number [0] = '=';
00512 start.magic_number [1] = 'S';
00513 start.magic_number [2] = 'T';
00514 start.magic_number [3] = 'A';
00515 start.major_version = TAO_SFP_MAJOR_VERSION;
00516 start.minor_version = TAO_SFP_MINOR_VERSION;
00517 start.flags = 0;
00518 if (!(msg << start))
00519 return 0;
00520 return 1;
00521 }
|
|
|
Definition at line 524 of file sfp.cpp. References flowProtocol::StartReply::flags, and flowProtocol::StartReply::magic_number.
00525 {
00526 flowProtocol::StartReply start_reply;
00527
00528 start_reply.magic_number [0] = '=';
00529 start_reply.magic_number [1] = 'S';
00530 start_reply.magic_number [2] = 'T';
00531 start_reply.magic_number [3] = 'R';
00532 start_reply.flags = 0;
00533 if (!(msg << start_reply))
00534 return 0;
00535 return 1;
00536 }
|
|
|
Definition at line 32 of file sfp.cpp. Referenced by read_credit_message(), and TAO_SFP_Base(). |
|
|
Definition at line 33 of file sfp.cpp. Referenced by peek_fragment_header(), read_fragment(), and TAO_SFP_Base(). |
|
|
Definition at line 29 of file sfp.cpp. Referenced by peek_frame_header(), read_endofstream_message(), read_frame(), and TAO_SFP_Base(). |
|
|
Definition at line 31 of file sfp.cpp. Referenced by read_credit_message(), read_endofstream_message(), read_start_message(), read_start_reply_message(), and TAO_SFP_Base(). |
|
|
Definition at line 30 of file sfp.cpp. Referenced by read_start_reply_message(), and TAO_SFP_Base(). |
|
|
Definition at line 17 of file sfp.cpp. Referenced by peek_message_type(). |
|
|
Definition at line 15 of file sfp.cpp. Referenced by peek_message_type(). |
|
|
Definition at line 27 of file sfp.cpp. Referenced by send_message(). |
|
|
|
|
|
Definition at line 14 of file sfp.cpp. Referenced by peek_message_type(). |
|
|
Definition at line 21 of file sfp.cpp. Referenced by TAO_SFP_Base(), and write_start_message(). |
|
|
Definition at line 26 of file sfp.cpp. Referenced by send_message(). |
|
|
Definition at line 22 of file sfp.cpp. Referenced by TAO_SFP_Base(), and write_start_message(). |
|
|
|
|
|
Definition at line 16 of file sfp.cpp. Referenced by peek_message_type(). |
|
|
Definition at line 18 of file sfp.cpp. Referenced by peek_message_type(). |
1.3.6