#include <sfp.h>
Definition at line 106 of file sfp.h.
enum TAO_SFP_Base::State |
Definition at line 133 of file sfp.h.
00134 { 00135 ACTIVE_START, 00136 PASSIVE_START, 00137 TIMEDOUT_T1, 00138 TIMEDOUT_T2, 00139 REPLY_RECEIVED, 00140 START_RECEIVED 00141 };
TAO_SFP_Base::TAO_SFP_Base | ( | void | ) |
Definition at line 47 of file sfp.cpp.
References ACE_ERROR, credit_len, flowProtocol::StartReply::flags, flowProtocol::Start::flags, flowProtocol::frameHeader::flags, fragment_len, frame_header_len, LM_ERROR, flowProtocol::credit::magic_number, flowProtocol::StartReply::magic_number, flowProtocol::Start::magic_number, flowProtocol::fragment::magic_number, flowProtocol::frameHeader::magic_number, flowProtocol::Start::major_version, flowProtocol::Start::minor_version, ACE_OutputCDR::reset(), start_len, start_reply_len, TAO_ENCAP_BYTE_ORDER, TAO_SFP_MAJOR_VERSION, TAO_SFP_MINOR_VERSION, and ACE_OutputCDR::total_length().
00048 { 00049 TAO_OutputCDR output_cdr; 00050 flowProtocol::frameHeader frame_header; 00051 flowProtocol::fragment fragment; 00052 flowProtocol::credit credit; 00053 flowProtocol::Start start; 00054 flowProtocol::StartReply start_reply; 00055 00056 // fill in the default frameHeader fields. 00057 frame_header.magic_number [0] = '='; 00058 frame_header.magic_number [1] = 'S'; 00059 frame_header.magic_number [2] = 'F'; 00060 frame_header.magic_number [3] = 'P'; 00061 frame_header.flags = TAO_ENCAP_BYTE_ORDER; 00062 output_cdr.reset (); 00063 if (!(output_cdr << frame_header)) 00064 { 00065 ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n")); 00066 return; 00067 } 00068 00069 frame_header_len = static_cast<u_int> (output_cdr.total_length ()); 00070 // fill in the default fragment message fields. 00071 fragment.magic_number [0] = 'F'; 00072 fragment.magic_number [1] = 'R'; 00073 fragment.magic_number [2] = 'A'; 00074 fragment.magic_number [3] = 'G'; 00075 output_cdr.reset (); 00076 if (!(output_cdr << fragment)) 00077 { 00078 ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n")); 00079 return; 00080 } 00081 00082 fragment_len = static_cast<u_int> (output_cdr.total_length ()); 00083 // fill in the default Start message fields. 00084 start.magic_number [0] = '='; 00085 start.magic_number [1] = 'S'; 00086 start.magic_number [2] = 'T'; 00087 start.magic_number [3] = 'A'; 00088 start.major_version = TAO_SFP_Base::TAO_SFP_MAJOR_VERSION; 00089 start.minor_version = TAO_SFP_Base::TAO_SFP_MINOR_VERSION; 00090 start.flags = 0; 00091 output_cdr.reset (); 00092 if (!(output_cdr << start)) 00093 { 00094 ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n")); 00095 return; 00096 } 00097 00098 start_len = static_cast<u_int> (output_cdr.total_length ()); 00099 // fill in the default StartReply message fields. 00100 start_reply.magic_number [0] = '='; 00101 start_reply.magic_number [1] = 'S'; 00102 start_reply.magic_number [2] = 'T'; 00103 start_reply.magic_number [3] = 'R'; 00104 start_reply.flags = 0; 00105 output_cdr.reset (); 00106 if (!(output_cdr << start_reply)) 00107 { 00108 ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n")); 00109 return; 00110 } 00111 00112 start_reply_len = static_cast<u_int> (output_cdr.total_length ()); 00113 00114 // fill in the default Credit message fields. 00115 credit.magic_number [0] = '='; 00116 credit.magic_number [1] = 'C'; 00117 credit.magic_number [2] = 'R'; 00118 credit.magic_number [3] = 'E'; 00119 output_cdr.reset (); 00120 00121 if (!(output_cdr << credit)) 00122 { 00123 ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n")); 00124 return; 00125 } 00126 credit_len = static_cast<u_int> (output_cdr.total_length ()); 00127 }
ACE_Message_Block * TAO_SFP_Base::check_all_fragments | ( | TAO_SFP_Fragment_Table_Entry * | fragment_entry | ) | [static] |
Definition at line 458 of file sfp.cpp.
References ACE_DEBUG, ACE_Ordered_MultiSet_Iterator< T >::advance(), ACE_Message_Block::cont(), TAO_SFP_Fragment_Node::data_, TAO_SFP_Fragment_Table_Entry::fragment_set_, LM_DEBUG, ACE_Ordered_MultiSet_Iterator< T >::next(), TAO_SFP_Fragment_Table_Entry::num_fragments_, ACE_Ordered_MultiSet< T >::size(), and TAO_debug_level.
Referenced by read_fragment(), and read_frame().
00459 { 00460 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"table size: %d, num_fragments: %d\n",fragment_entry->fragment_set_.size (),fragment_entry->num_fragments_)); 00461 // check to see if all the frames have been received. 00462 if (fragment_entry->fragment_set_.size () == fragment_entry->num_fragments_) 00463 { 00464 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"all fragments have been received\n")); 00465 // all the fragments have been received 00466 // we can now chain the ACE_Message_Blocks in the fragment_set_ and then return them 00467 // back. 00468 ACE_Message_Block *frame = 0,*head = 0; 00469 FRAGMENT_SET_ITERATOR frag_iterator (fragment_entry->fragment_set_); 00470 TAO_SFP_Fragment_Node *node; 00471 for (;frag_iterator.next (node) != 0;frag_iterator.advance ()) 00472 { 00473 if (!head) 00474 head = frame = node->data_; 00475 else 00476 { 00477 frame->cont (node->data_); 00478 frame = node->data_; 00479 } 00480 } 00481 return head; 00482 } 00483 return 0; 00484 }
void TAO_SFP_Base::dump_buf | ( | char * | buf, | |
int | n | |||
) | [static, protected] |
Definition at line 823 of file sfp.cpp.
References ACE_DEBUG, LM_DEBUG, and TAO_debug_level.
00824 { 00825 char *buf = buffer; 00826 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"\n========================================\n")); 00827 for (int i=0;i<size;i++) 00828 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"%d ",buf[i])); 00829 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"\n========================================\n")); 00830 }
int TAO_SFP_Base::handle_input | ( | TAO_AV_Transport * | transport, | |
TAO_SFP_Frame_State & | state, | |||
TAO_AV_frame_info *& | frame_info | |||
) | [static] |
Definition at line 130 of file sfp.cpp.
References ACE_DEBUG, TAO_SFP_Frame_State::cdr, flowProtocol::EndofStream_Msg, TAO_SFP_Frame_State::fragment_, flowProtocol::Fragment_Msg, TAO_SFP_Frame_State::frame_header_, flowProtocol::Frame_Msg, LM_DEBUG, peek_fragment_header(), peek_frame_header(), peek_message_type(), read_endofstream_message(), read_fragment(), read_frame(), flowProtocol::SimpleFrame_Msg, and TAO_debug_level.
Referenced by TAO_SFP_Consumer_Object::handle_input().
00133 { 00134 int result; 00135 flowProtocol::MsgType msg_type; 00136 result = TAO_SFP_Base::peek_message_type (transport, 00137 msg_type); 00138 if (result < 0) 00139 return result; 00140 // TAO_InputCDR &input = state.cdr; 00141 switch (msg_type) 00142 { 00143 case flowProtocol::SimpleFrame_Msg: 00144 case flowProtocol::Frame_Msg: 00145 { 00146 result = TAO_SFP_Base::peek_frame_header (transport, 00147 state.frame_header_, 00148 state.cdr); 00149 if (result < 0) 00150 return result; 00151 int result =TAO_SFP_Base::read_frame (transport, 00152 state.frame_header_, 00153 state, 00154 frame_info); 00155 if (result < 0) 00156 return result; 00157 break; 00158 } 00159 case flowProtocol::Fragment_Msg: 00160 { 00161 result = TAO_SFP_Base::peek_fragment_header (transport, 00162 state.fragment_, 00163 state.cdr); 00164 if (result < 0) 00165 return result; 00166 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"Fragment received\n")); 00167 result = TAO_SFP_Base::read_fragment (transport, 00168 state.fragment_, 00169 state, 00170 frame_info); 00171 if (result < 0) 00172 return result; 00173 break; 00174 } 00175 case flowProtocol::EndofStream_Msg: 00176 { 00177 result = TAO_SFP_Base::read_endofstream_message (transport, 00178 state.frame_header_, 00179 state.cdr); 00180 if (result < 0) 00181 return result; 00182 break; 00183 } 00184 default: 00185 break; 00186 } 00187 return 0; 00188 }
int TAO_SFP_Base::peek_fragment_header | ( | TAO_AV_Transport * | transport, | |
flowProtocol::fragment & | fragment, | |||
TAO_InputCDR & | cdr | |||
) | [static] |
Definition at line 803 of file sfp.cpp.
References ACE_ERROR_RETURN, fragment_len, ACE_InputCDR::grow(), LM_ERROR, MSG_PEEK, ACE_InputCDR::rd_ptr(), and TAO_AV_Transport::recv().
Referenced by handle_input().
00806 { 00807 input.grow (fragment_len); 00808 char *buf = input.rd_ptr (); 00809 int n = transport->recv (buf, 00810 fragment_len, 00811 MSG_PEEK); 00812 if (n != static_cast<int> (fragment_len)) 00813 ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_endofstream_message"),0); 00814 else 00815 { 00816 if (!(input >> fragment)) 00817 return -1; 00818 } 00819 return 0; 00820 }
int TAO_SFP_Base::peek_frame_header | ( | TAO_AV_Transport * | transport, | |
flowProtocol::frameHeader & | header, | |||
TAO_InputCDR & | cdr | |||
) | [static] |
Definition at line 783 of file sfp.cpp.
References ACE_ERROR_RETURN, frame_header_len, ACE_InputCDR::grow(), LM_ERROR, MSG_PEEK, ACE_InputCDR::rd_ptr(), and TAO_AV_Transport::recv().
Referenced by handle_input().
00786 { 00787 input.grow (frame_header_len); 00788 char *buf = input.rd_ptr (); 00789 int n = transport->recv (buf, 00790 frame_header_len, 00791 MSG_PEEK); 00792 if (n != static_cast<int> (frame_header_len)) 00793 ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_endofstream_message"),0); 00794 else 00795 { 00796 if (!(input >> header)) 00797 return -1; 00798 } 00799 return 0; 00800 }
int TAO_SFP_Base::peek_message_type | ( | TAO_AV_Transport * | transport, | |
flowProtocol::MsgType & | type | |||
) | [static] |
Definition at line 655 of file sfp.cpp.
References ACE_DEBUG, ACE_ERROR_RETURN, flowProtocol::Credit_Msg, flowProtocol::Fragment_Msg, LM_DEBUG, LM_ERROR, MSG_PEEK, TAO_AV_Transport::recv(), flowProtocol::Start_Msg, flowProtocol::StartReply_Msg, ACE_OS::strcmp(), ACE_OS::strncpy(), TAO_debug_level, TAO_SFP_CREDIT_MAGIC_NUMBER, TAO_SFP_FRAGMENT_MAGIC_NUMBER, TAO_SFP_MAGIC_NUMBER, TAO_SFP_MAGIC_NUMBER_LEN, TAO_SFP_MESSAGE_TYPE_OFFSET, TAO_SFP_START_MAGIC_NUMBER, and TAO_SFP_STARTREPLY_MAGIC_NUMBER.
Referenced by TAO_SFP_Producer_Object::handle_input(), and handle_input().
00657 { 00658 char peek_buffer [TAO_SFP_MAGIC_NUMBER_LEN+2];// 2 is for flags + message_type. 00659 int peek_len = TAO_SFP_MAGIC_NUMBER_LEN +2; 00660 char magic_number [TAO_SFP_MAGIC_NUMBER_LEN+1]; 00661 ssize_t n =transport->recv (peek_buffer, 00662 peek_len, 00663 MSG_PEEK); 00664 ACE_OS::strncpy (magic_number, 00665 peek_buffer, 00666 TAO_SFP_MAGIC_NUMBER_LEN); 00667 magic_number [TAO_SFP_MAGIC_NUMBER_LEN] = 0; 00668 if (n == -1) 00669 ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),-1); 00670 else if (n==0) 00671 ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),-1); 00672 00673 if (ACE_OS::strcmp (magic_number,TAO_SFP_START_MAGIC_NUMBER) == 0) 00674 { 00675 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t)Start message received\n")); 00676 msg_type = flowProtocol::Start_Msg; 00677 } 00678 else if (ACE_OS::strcmp (magic_number,TAO_SFP_STARTREPLY_MAGIC_NUMBER) == 0) 00679 { 00680 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t)StartReply message received\n")); 00681 msg_type = flowProtocol::StartReply_Msg; 00682 } 00683 else if (ACE_OS::strcmp (magic_number,TAO_SFP_MAGIC_NUMBER) == 0) 00684 { 00685 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t) frameHeader received\n")); 00686 // msg_type = flowProtocol::SimpleFrame; 00687 msg_type = (flowProtocol::MsgType)peek_buffer [TAO_SFP_MESSAGE_TYPE_OFFSET]; 00688 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"Message Type = %d\n",msg_type)); 00689 } 00690 else if (ACE_OS::strcmp (magic_number,TAO_SFP_FRAGMENT_MAGIC_NUMBER) == 0) 00691 { 00692 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t) fragment Header received\n")); 00693 msg_type = flowProtocol::Fragment_Msg; 00694 } 00695 else if (ACE_OS::strcmp (magic_number,TAO_SFP_CREDIT_MAGIC_NUMBER) == 0) 00696 { 00697 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t) credit message received\n")); 00698 msg_type = flowProtocol::Credit_Msg; 00699 } 00700 else 00701 ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP:Invalid magic number\n"),-1); 00702 return 0; 00703 }
int TAO_SFP_Base::read_credit_message | ( | TAO_AV_Transport * | transport, | |
flowProtocol::credit & | credit, | |||
TAO_InputCDR & | cdr | |||
) | [static] |
Definition at line 745 of file sfp.cpp.
References ACE_ERROR_RETURN, credit_len, ACE_InputCDR::grow(), LM_ERROR, ACE_InputCDR::rd_ptr(), TAO_AV_Transport::recv(), and start_len.
Referenced by TAO_SFP_Producer_Object::handle_input().
00748 { 00749 input.grow (start_len); 00750 char *buf = input.rd_ptr (); 00751 int n = transport->recv (buf, 00752 credit_len); 00753 if (n != static_cast<int> (credit_len)) 00754 ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_credit_message"),0); 00755 else 00756 { 00757 if (!(input >> credit)) 00758 return -1; 00759 } 00760 return 0; 00761 }
int TAO_SFP_Base::read_endofstream_message | ( | TAO_AV_Transport * | transport, | |
flowProtocol::frameHeader & | endofstream, | |||
TAO_InputCDR & | cdr | |||
) | [static] |
Definition at line 764 of file sfp.cpp.
References ACE_ERROR_RETURN, frame_header_len, ACE_InputCDR::grow(), LM_ERROR, ACE_InputCDR::rd_ptr(), TAO_AV_Transport::recv(), and start_len.
Referenced by handle_input().
00767 { 00768 input.grow (start_len); 00769 char *buf = input.rd_ptr (); 00770 int n = transport->recv (buf, 00771 frame_header_len); 00772 if (n != static_cast<int> (frame_header_len)) 00773 ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_endofstream_message"),0); 00774 else 00775 { 00776 if (!(input >> endofstream)) 00777 return -1; 00778 } 00779 return 0; 00780 }
int TAO_SFP_Base::read_fragment | ( | TAO_AV_Transport * | transport, | |
flowProtocol::fragment & | fragment, | |||
TAO_SFP_Frame_State & | state, | |||
TAO_AV_frame_info *& | frame_info | |||
) | [static] |
Definition at line 369 of file sfp.cpp.
References ACE_DEBUG, ACE_ERROR_RETURN, ACE_NEW_RETURN, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::bind(), check_all_fragments(), TAO_SFP_Fragment_Node::data_, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::find(), flowProtocol::fragment::flags, flowProtocol::fragment::frag_number, flowProtocol::fragment::frag_sz, TAO_SFP_Fragment_Node::fragment_info_, fragment_len, TAO_SFP_Fragment_Table_Entry::fragment_set_, TAO_SFP_Frame_State::fragment_table_map_, TAO_SFP_Frame_State::frame_block_, TAO_SFP_Fragment_Table_Entry::frame_info, ACE_Ordered_MultiSet< T >::insert(), TAO_SFP_Fragment_Table_Entry::last_received_, ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, TAO_SFP_Frame_State::more_fragments_, TAO_SFP_Fragment_Table_Entry::num_fragments_, ACE_Message_Block::rd_ptr(), TAO_AV_Transport::recv(), flowProtocol::fragment::sequence_num, flowProtocol::fragment::source_id, TAO_debug_level, and ACE_Message_Block::wr_ptr().
Referenced by handle_input().
00373 { 00374 TAO_SFP_Fragment_Table_Entry *fragment_entry = 0; 00375 int result = -1; 00376 00377 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"frag_number = %d, frag_size = %d,source_id = %d sequnce_num = %d\n", 00378 fragment.frag_number,fragment.frag_sz,fragment.source_id,fragment.sequence_num)); 00379 00380 ACE_Message_Block *data; 00381 ACE_NEW_RETURN (data, 00382 ACE_Message_Block(fragment.frag_sz), 00383 -1); 00384 00385 // Read the fragment. 00386 int n = transport->recv (data->wr_ptr (),fragment.frag_sz); 00387 if ((n == -1) || (n==0)) 00388 ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP::read_fragment:%p",""),-1); 00389 // move past the fragment header. 00390 data->rd_ptr (fragment_len); 00391 data->wr_ptr (n); 00392 // TAO_SFP_Base::dump_buf (data->rd_ptr (),data->length ()); 00393 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"length of %dth fragment is: %d\n", 00394 fragment.frag_number, 00395 data->length ())); 00396 00397 TAO_SFP_Fragment_Node *new_node; 00398 ACE_NEW_RETURN (new_node, 00399 TAO_SFP_Fragment_Node, 00400 -1); 00401 new_node->fragment_info_ = fragment; 00402 new_node->data_ = data; 00403 TAO_SFP_Fragment_Table *fragment_table = 0; 00404 result = state.fragment_table_map_.find (fragment.source_id,fragment_table); 00405 if (result != 0) 00406 { 00407 ACE_NEW_RETURN (fragment_table, 00408 TAO_SFP_Fragment_Table, 00409 -1); 00410 result = state.fragment_table_map_.bind (fragment.source_id,fragment_table); 00411 if (result < 0) 00412 ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP_Base::read_fragment:fragment_table_map:bind failed\n"),-1); 00413 } 00414 if (fragment_table->find (fragment.sequence_num,fragment_entry) == 0) 00415 { 00416 // Already an entry exists. Traverse the list and insert it at the right place. 00417 result = fragment_entry->fragment_set_.insert (*new_node); 00418 if (result != 0) 00419 ACE_ERROR_RETURN ((LM_ERROR,"insert for %dth node failed\n",fragment.frag_number),-1); 00420 // check if all the fragments have been received. 00421 } 00422 else 00423 { 00424 ACE_NEW_RETURN (fragment_entry, 00425 TAO_SFP_Fragment_Table_Entry, 00426 -1); 00427 fragment_entry->fragment_set_.insert (*new_node); 00428 // bind a new entry for this sequence number. 00429 result = fragment_table->bind (fragment.sequence_num,fragment_entry); 00430 if (result != 0) 00431 ACE_ERROR_RETURN ((LM_ERROR,"bind for %dth fragment failed\n", 00432 fragment.frag_number),-1); 00433 } 00434 if (!(fragment.flags & 0x2)) 00435 { 00436 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"Last fragment received\n")); 00437 // if bit 1 is not set then there are 00438 // no more fragments. 00439 fragment_entry->last_received_ = 1; 00440 // since fragment number starts from 0 to n-1 we add 1. 00441 fragment_entry->num_fragments_ = fragment.frag_number + 1; 00442 } 00443 00444 00445 state.frame_block_ = check_all_fragments (fragment_entry); 00446 if (state.frame_block_ != 0) 00447 { 00448 state.more_fragments_ = 0; 00449 ACE_NEW_RETURN (frame_info, 00450 TAO_AV_frame_info, 00451 -1); 00452 *frame_info = fragment_entry->frame_info; 00453 } 00454 return 0; 00455 }
int TAO_SFP_Base::read_frame | ( | TAO_AV_Transport * | transport, | |
flowProtocol::frameHeader & | frame_header, | |||
TAO_SFP_Frame_State & | state, | |||
TAO_AV_frame_info *& | frame_info | |||
) | [static] |
Definition at line 191 of file sfp.cpp.
References ACE_DEBUG, ACE_ERROR_RETURN, ACE_NEW_RETURN, ACE_Message_Block::base(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::bind(), TAO_AV_frame_info::boundary_marker, check_all_fragments(), ACE_Message_Block::clone(), ACE_Message_Block::copy(), TAO_SFP_Fragment_Node::data_, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::find(), flowProtocol::frameHeader::flags, flowProtocol::fragment::frag_number, flowProtocol::fragment::frag_sz, TAO_SFP_Fragment_Node::fragment_info_, TAO_SFP_Fragment_Table_Entry::fragment_set_, TAO_SFP_Frame_State::fragment_table_map_, TAO_SFP_Frame_State::frame_, TAO_SFP_Frame_State::frame_block_, frame_header_len, TAO_SFP_Fragment_Table_Entry::frame_info, flowProtocol::Frame_Msg, ACE_Ordered_MultiSet< T >::insert(), ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, ACE_CDR::MAX_ALIGNMENT, ACE_CDR::mb_align(), flowProtocol::frameHeader::message_size, flowProtocol::frameHeader::message_type, TAO_SFP_Frame_State::more_fragments_, ACE_Message_Block::rd_ptr(), TAO_AV_Transport::recv(), TAO_AV_frame_info::sequence_num, flowProtocol::frame::sequence_num, flowProtocol::SequencedFrame_Msg, flowProtocol::SimpleFrame_Msg, flowProtocol::fragment::source_id, flowProtocol::frame::source_ids, flowProtocol::SpecialFrame_Msg, TAO_AV_frame_info::ssrc, ACE_InputCDR::start(), TAO_SFP_Frame_State::static_frame_, flowProtocol::frame::synchSource, TAO_debug_level, TAO_AV_frame_info::timestamp, flowProtocol::frame::timestamp, and ACE_Message_Block::wr_ptr().
Referenced by handle_input().
00195 { 00196 ACE_Message_Block *message_block = 0; 00197 int result = -1; 00198 00199 if (TAO_debug_level > 0) 00200 ACE_DEBUG ((LM_DEBUG,"Reading simple frame\n")); 00201 // Check to see what the length of the message is. 00202 int byte_order = frame_header.flags & 0x1; 00203 int message_len = frame_header.message_size; 00204 00205 // ACE_NEW_RETURN (message_block, 00206 // ACE_Message_Block (message_len), 00207 // 0); 00208 state.static_frame_.rd_ptr (state.static_frame_.base ()); 00209 state.static_frame_.wr_ptr (state.static_frame_.base ()); 00210 int n = transport->recv (state.static_frame_.rd_ptr (),message_len); 00211 if (n == -1) 00212 ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0); 00213 else if (n==0) 00214 ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0); 00215 else if (n != message_len) 00216 ACE_ERROR_RETURN ((LM_ERROR,"SFP::read_simple_frame:message truncated\n"),0); 00217 message_block = &state.static_frame_; 00218 // print the buffer. 00219 // this->dump_buf (message,n); 00220 // skip over the frame header. 00221 message_block->rd_ptr (frame_header_len); 00222 message_block->wr_ptr (n); 00223 CORBA::ULong ssrc = 0; 00224 TAO_SFP_Fragment_Table_Entry *fragment_entry = 0; 00225 if (frame_header.flags & 0x2) 00226 { 00227 if (TAO_debug_level > 0) 00228 ACE_DEBUG ((LM_DEBUG,"fragmented frame:0th fragment\n")); 00229 state.more_fragments_ = 1; 00230 ACE_Message_Block *data = 0; 00231 switch (frame_header.message_type) 00232 { 00233 case flowProtocol::Frame_Msg: 00234 { 00235 // read the frame info. 00236 ACE_Message_Block frame_info_mb (message_len-frame_header_len+ACE_CDR::MAX_ALIGNMENT); 00237 ACE_CDR::mb_align (&frame_info_mb); 00238 frame_info_mb.copy (message_block->rd_ptr (), 00239 message_block->length ()); 00240 // print the buffer. 00241 // this->dump_buf (message_block->rd_ptr (),16); 00242 TAO_InputCDR frame_info_cdr (&frame_info_mb,byte_order); 00243 frame_info_cdr >> state.frame_; 00244 if (TAO_debug_level > 0) 00245 ACE_DEBUG ((LM_DEBUG, 00246 "frame.timestamp = %d, " 00247 "frame.synchsource = %d, " 00248 "frame.sequence_num = %d\n", 00249 state.frame_.timestamp, 00250 state.frame_.synchSource, 00251 state.frame_.sequence_num)); 00252 ssrc = state.frame_.synchSource; 00253 // The remaining message in the CDR stream is the fragment 00254 // data for frag.0 00255 data = frame_info_cdr.start ()->clone (); 00256 break; 00257 } 00258 case flowProtocol::SimpleFrame_Msg: 00259 { 00260 data = message_block->clone (); 00261 break; 00262 } 00263 case flowProtocol::SequencedFrame_Msg: 00264 break; 00265 case flowProtocol::SpecialFrame_Msg: 00266 break; 00267 } 00268 if (TAO_debug_level > 0) 00269 ACE_DEBUG ((LM_DEBUG,"Length of 0th fragment= %d\n",data->length ())); 00270 TAO_SFP_Fragment_Table *fragment_table = 0; 00271 result = state.fragment_table_map_.find (ssrc,fragment_table); 00272 if (result != 0) 00273 { 00274 ACE_NEW_RETURN (fragment_table, 00275 TAO_SFP_Fragment_Table, 00276 -1); 00277 result = state.fragment_table_map_.bind (ssrc,fragment_table); 00278 if (result < 0) 00279 ACE_ERROR_RETURN ((LM_ERROR, 00280 "TAO_SFP_Base::read_frame: " 00281 "fragment_table_map:bind failed\n"),-1); 00282 } 00283 00284 TAO_SFP_Fragment_Node *new_node; 00285 ACE_NEW_RETURN (new_node, 00286 TAO_SFP_Fragment_Node, 00287 0); 00288 new_node->fragment_info_.frag_sz = static_cast<CORBA::ULong> (data->length ()); 00289 new_node->fragment_info_.frag_number = 0; 00290 if (state.frame_.source_ids.length () > 0) 00291 new_node->fragment_info_.source_id = state.frame_.source_ids [0]; 00292 else 00293 new_node->fragment_info_.source_id = 0; 00294 new_node->data_ = data; 00295 // TAO_SFP_Base::dump_buf (data->rd_ptr (),data->length ()); 00296 if (fragment_table->find (state.frame_.sequence_num,fragment_entry) == 0) 00297 { 00298 // This case can happen where a nth (n > 0)fragment is 00299 // received before the 0th fragment. 00300 if (TAO_debug_level > 0) 00301 ACE_DEBUG ((LM_DEBUG, 00302 "fragment table entry found for 0th fragment:\n")); 00303 result = fragment_entry->fragment_set_.insert (*new_node); 00304 if (result != 0) 00305 ACE_ERROR_RETURN ((LM_ERROR, 00306 "insert for 0th fragment failed\n"),0); 00307 // enter the frame info. 00308 00309 // check if all the fragments have been received. 00310 state.frame_block_ = 00311 TAO_SFP_Base::check_all_fragments (fragment_entry); 00312 if (state.frame_block_ != 0) 00313 state.more_fragments_ = 0; 00314 } 00315 else 00316 { 00317 if (TAO_debug_level > 0) 00318 ACE_DEBUG ((LM_DEBUG, 00319 "fragment table entry not found for 0th fragment\n")); 00320 TAO_SFP_Fragment_Table_Entry *new_entry; 00321 ACE_NEW_RETURN (new_entry, 00322 TAO_SFP_Fragment_Table_Entry, 00323 0); 00324 result = new_entry->fragment_set_.insert (*new_node); 00325 if (result != 0) 00326 ACE_ERROR_RETURN ((LM_ERROR,"insert for 0th fragment failed\n"),0); 00327 fragment_entry = new_entry; 00328 // not found. so bind a new entry. 00329 result = fragment_table->bind (state.frame_.sequence_num,new_entry); 00330 if (result != 0) 00331 ACE_ERROR_RETURN ((LM_ERROR,"fragment table bind failed\n"),0); 00332 if (frame_header.message_type & 4 ) 00333 fragment_entry->frame_info.boundary_marker = 1; 00334 switch (frame_header.message_type) 00335 { 00336 case flowProtocol::Frame_Msg: 00337 fragment_entry->frame_info.ssrc = state.frame_.synchSource; 00338 fragment_entry->frame_info.timestamp = state.frame_.timestamp; 00339 fragment_entry->frame_info.sequence_num = state.frame_.sequence_num; 00340 break; 00341 case flowProtocol::SimpleFrame_Msg: 00342 fragment_entry->frame_info.ssrc = 00343 fragment_entry->frame_info.timestamp = 00344 fragment_entry->frame_info.sequence_num = 0; 00345 break; 00346 } 00347 return 0; 00348 } 00349 } 00350 else 00351 { 00352 state.more_fragments_ = 0; 00353 state.frame_block_ = message_block; 00354 } 00355 if (state.more_fragments_ == 0) 00356 { 00357 if (fragment_entry != 0) 00358 { 00359 ACE_NEW_RETURN (frame_info, 00360 TAO_AV_frame_info, 00361 -1); 00362 *frame_info = fragment_entry->frame_info; 00363 } 00364 } 00365 return 0; 00366 }
int TAO_SFP_Base::read_start_message | ( | TAO_AV_Transport * | transport, | |
flowProtocol::Start & | start, | |||
TAO_InputCDR & | cdr | |||
) | [static] |
Definition at line 706 of file sfp.cpp.
References ACE_ERROR_RETURN, ACE_InputCDR::grow(), LM_ERROR, ACE_InputCDR::rd_ptr(), TAO_AV_Transport::recv(), and start_len.
00709 { 00710 input.grow (start_len); 00711 char *buf = input.rd_ptr (); 00712 int n = transport->recv (buf, 00713 start_len); 00714 if (n != static_cast<int> (start_len)) 00715 ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_start\n"),0); 00716 else 00717 { 00718 if (!(input >> start)) 00719 return -1; 00720 } 00721 return 0; 00722 }
int TAO_SFP_Base::read_start_reply_message | ( | TAO_AV_Transport * | transport, | |
flowProtocol::StartReply & | start_reply, | |||
TAO_InputCDR & | cdr | |||
) | [static] |
Definition at line 726 of file sfp.cpp.
References ACE_ERROR_RETURN, ACE_InputCDR::grow(), LM_ERROR, ACE_InputCDR::rd_ptr(), TAO_AV_Transport::recv(), start_len, and start_reply_len.
00729 { 00730 input.grow (start_len); 00731 char *buf = input.rd_ptr (); 00732 int n = transport->recv (buf, 00733 start_reply_len); 00734 if (n != static_cast<int> (start_len)) 00735 ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_start_reply_message"),0); 00736 else 00737 { 00738 if (!(input >> start_reply)) 00739 return -1; 00740 } 00741 return 0; 00742 }
int TAO_SFP_Base::send_message | ( | TAO_AV_Transport * | transport, | |
TAO_OutputCDR & | stream, | |||
ACE_Message_Block * | mb = 0 | |||
) | [static] |
Definition at line 596 of file sfp.cpp.
References ACE_DEBUG, ACE_OutputCDR::begin(), ACE_OutputCDR::buffer(), ACE_Message_Block::cont(), ACE_OutputCDR::do_byte_swap(), ACE_OutputCDR::end(), LM_DEBUG, TAO_AV_Transport::send(), ACE_CDR::swap_4(), TAO_debug_level, TAO_SFP_FRAGMENT_SIZE_OFFSET, TAO_SFP_MESSAGE_SIZE_OFFSET, and ACE_OutputCDR::total_length().
Referenced by TAO_SFP_Object::destroy(), and TAO_SFP_Object::send_frame().
00599 { 00600 CORBA::ULong total_len = static_cast<CORBA::ULong> (stream.total_length ()); 00601 if (mb != 0) 00602 { 00603 for (ACE_Message_Block *temp = mb;temp != 0;temp = temp->cont ()) 00604 total_len += static_cast<CORBA::ULong> (temp->length ()); 00605 00606 char *buf = (char *) stream.buffer (); 00607 size_t offset = TAO_SFP_MESSAGE_SIZE_OFFSET; 00608 // second character distinguished =SFP and FRAG. 00609 if (*(buf) == 'F') 00610 { 00611 // Fragment message. 00612 offset = TAO_SFP_FRAGMENT_SIZE_OFFSET; 00613 } 00614 #if !defined (ACE_ENABLE_SWAP_ON_WRITE) 00615 *reinterpret_cast<CORBA::ULong *> (buf + offset) = total_len; 00616 #else 00617 if (!stream.do_byte_swap ()) 00618 *reinterpret_cast<CORBA::ULong *> (buf + offset) = total_len; 00619 else 00620 ACE_CDR::swap_4 (reinterpret_cast<char *> (&total_len), 00621 buf + offset); 00622 #endif /* ACE_ENABLE_SWAP_ON_WRITE */ 00623 } 00624 // we join the data block with the cdr block. 00625 ACE_Message_Block *end = (ACE_Message_Block *)stream.end (); 00626 if (end == 0) 00627 { 00628 // There is only one message block. 00629 end = (ACE_Message_Block *)stream.begin (); 00630 // TAO_SFP_Base::dump_buf (end->rd_ptr (),end->length ()); 00631 } 00632 end->cont (mb); 00633 ssize_t n = transport->send (stream.begin ()); 00634 if (n == -1) 00635 { 00636 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, 00637 "TAO: (%P|%t) closing conn after fault %p\n", 00638 "GIOP::send_request ()")); 00639 return -1; 00640 } 00641 // EOF. 00642 if (n == 0) 00643 { 00644 if (TAO_debug_level > 0) 00645 ACE_DEBUG ((LM_DEBUG, 00646 "TAO: (%P|%t) GIOP::send_request () " 00647 "EOF, closing conn:\n")); 00648 return -1; 00649 } 00650 return 1; 00651 00652 }
CORBA::Boolean TAO_SFP_Base::start_frame | ( | CORBA::Octet | flags, | |
flowProtocol::MsgType | type, | |||
TAO_OutputCDR & | msg | |||
) | [static] |
Definition at line 487 of file sfp.cpp.
References flowProtocol::frameHeader::flags, flowProtocol::frameHeader::magic_number, flowProtocol::frameHeader::message_size, flowProtocol::frameHeader::message_type, and ACE_OutputCDR::reset().
Referenced by TAO_SFP_Object::destroy(), and TAO_SFP_Object::send_frame().
00490 { 00491 msg.reset (); 00492 flowProtocol::frameHeader frame_header; 00493 00494 frame_header.magic_number [0] = '='; 00495 frame_header.magic_number [1] = 'S'; 00496 frame_header.magic_number [2] = 'F'; 00497 frame_header.magic_number [3] = 'P'; 00498 frame_header.flags = flags; 00499 frame_header.message_type = static_cast<CORBA::Octet> (type); 00500 frame_header.message_size = 0; 00501 if (!(msg << frame_header)) 00502 return 0; 00503 return 1; 00504 }
CORBA::Boolean TAO_SFP_Base::write_credit_message | ( | CORBA::ULong | cred_num, | |
TAO_OutputCDR & | msg | |||
) | [static] |
Definition at line 539 of file sfp.cpp.
References flowProtocol::credit::cred_num, and flowProtocol::credit::magic_number.
00541 { 00542 flowProtocol::credit credit; 00543 00544 credit.magic_number [0] = '='; 00545 credit.magic_number [1] = 'C'; 00546 credit.magic_number [2] = 'R'; 00547 credit.magic_number [3] = 'E'; 00548 credit.cred_num = cred_num; 00549 if (!(msg << credit)) 00550 return 0; 00551 return 1; 00552 }
CORBA::Boolean TAO_SFP_Base::write_fragment_message | ( | CORBA::Octet | flags, | |
CORBA::ULong | fragment_number, | |||
CORBA::ULong | sequence_number, | |||
CORBA::ULong | source_id, | |||
TAO_OutputCDR & | msg | |||
) | [static] |
Definition at line 555 of file sfp.cpp.
References flowProtocol::fragment::flags, flowProtocol::fragment::frag_number, flowProtocol::fragment::magic_number, ACE_OutputCDR::reset(), flowProtocol::fragment::sequence_num, and flowProtocol::fragment::source_id.
Referenced by TAO_SFP_Object::send_frame().
00560 { 00561 msg.reset (); 00562 flowProtocol::fragment fragment; 00563 00564 fragment.magic_number [0] = 'F'; 00565 fragment.magic_number [1] = 'R'; 00566 fragment.magic_number [2] = 'A'; 00567 fragment.magic_number [3] = 'G'; 00568 fragment.flags = flags; 00569 fragment.frag_number = fragment_number; 00570 fragment.sequence_num = sequence_number; 00571 fragment.source_id = source_id; 00572 if (!(msg << fragment)) 00573 return 0; 00574 return 1; 00575 }
CORBA::Boolean TAO_SFP_Base::write_frame_message | ( | CORBA::ULong | timestamp, | |
CORBA::ULong | synchSource, | |||
flowProtocol::my_seq_ulong | source_ids, | |||
CORBA::ULong | sequence_num, | |||
TAO_OutputCDR & | msg | |||
) | [static] |
Definition at line 578 of file sfp.cpp.
References flowProtocol::frame::sequence_num, flowProtocol::frame::source_ids, flowProtocol::frame::synchSource, and flowProtocol::frame::timestamp.
Referenced by TAO_SFP_Object::send_frame().
00583 { 00584 flowProtocol::frame frame; 00585 00586 frame.timestamp = timestamp; 00587 frame.synchSource = synchSource; 00588 frame.source_ids = source_ids; 00589 frame.sequence_num = sequence_num; 00590 if (!(msg << frame)) 00591 return 0; 00592 return 1; 00593 }
CORBA::Boolean TAO_SFP_Base::write_start_message | ( | TAO_OutputCDR & | msg | ) | [static] |
Definition at line 507 of file sfp.cpp.
References flowProtocol::Start::flags, flowProtocol::Start::magic_number, flowProtocol::Start::major_version, flowProtocol::Start::minor_version, TAO_SFP_MAJOR_VERSION, and TAO_SFP_MINOR_VERSION.
00508 { 00509 flowProtocol::Start start; 00510 00511 start.magic_number [0] = '='; 00512 start.magic_number [1] = 'S'; 00513 start.magic_number [2] = 'T'; 00514 start.magic_number [3] = 'A'; 00515 start.major_version = TAO_SFP_MAJOR_VERSION; 00516 start.minor_version = TAO_SFP_MINOR_VERSION; 00517 start.flags = 0; 00518 if (!(msg << start)) 00519 return 0; 00520 return 1; 00521 }
CORBA::Boolean TAO_SFP_Base::write_start_reply_message | ( | TAO_OutputCDR & | msg | ) | [static] |
Definition at line 524 of file sfp.cpp.
References flowProtocol::StartReply::flags, and flowProtocol::StartReply::magic_number.
00525 { 00526 flowProtocol::StartReply start_reply; 00527 00528 start_reply.magic_number [0] = '='; 00529 start_reply.magic_number [1] = 'S'; 00530 start_reply.magic_number [2] = 'T'; 00531 start_reply.magic_number [3] = 'R'; 00532 start_reply.flags = 0; 00533 if (!(msg << start_reply)) 00534 return 0; 00535 return 1; 00536 }
u_int TAO_SFP_Base::credit_len [static] |
u_int TAO_SFP_Base::fragment_len [static] |
Definition at line 131 of file sfp.h.
Referenced by peek_fragment_header(), read_fragment(), TAO_SFP_Object::send_frame(), and TAO_SFP_Base().
u_int TAO_SFP_Base::frame_header_len [static] |
Definition at line 127 of file sfp.h.
Referenced by peek_frame_header(), read_endofstream_message(), read_frame(), TAO_SFP_Object::send_frame(), and TAO_SFP_Base().
u_int TAO_SFP_Base::start_len [static] |
Definition at line 129 of file sfp.h.
Referenced by read_credit_message(), read_endofstream_message(), read_start_message(), read_start_reply_message(), and TAO_SFP_Base().
u_int TAO_SFP_Base::start_reply_len [static] |
const char TAO_SFP_Base::TAO_SFP_CREDIT_MAGIC_NUMBER = "=CRE" [static] |
const char TAO_SFP_Base::TAO_SFP_FRAGMENT_MAGIC_NUMBER = "FRAG" [static] |
const unsigned char TAO_SFP_Base::TAO_SFP_FRAGMENT_SIZE_OFFSET = 16 [static] |
const unsigned char TAO_SFP_Base::TAO_SFP_FRAME_HEADER_LEN = 12 [static] |
const char TAO_SFP_Base::TAO_SFP_MAGIC_NUMBER = "=SFP" [static] |
const unsigned char TAO_SFP_Base::TAO_SFP_MAJOR_VERSION = 1 [static] |
const unsigned char TAO_SFP_Base::TAO_SFP_MESSAGE_SIZE_OFFSET = 8 [static] |
const unsigned char TAO_SFP_Base::TAO_SFP_MINOR_VERSION = 0 [static] |
TAO_BEGIN_VERSIONED_NAMESPACE_DECL const char TAO_SFP_Base::TAO_SFP_ORB_ARGUMENTS = "-ORBObjRefStyle URL" [static] |
const char TAO_SFP_Base::TAO_SFP_START_MAGIC_NUMBER = "=STA" [static] |
const char TAO_SFP_Base::TAO_SFP_STARTREPLY_MAGIC_NUMBER = "=STR" [static] |