00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include "orbsvcs/AV/ntp-time.h"
00037 #include "orbsvcs/AV/RTCP.h"
00038 #include "orbsvcs/AV/media_timer.h"
00039 #include "tao/debug.h"
00040 #include "orbsvcs/AV/global.h"
00041 #include "orbsvcs/AV/md5.h"
00042
00043 #include "orbsvcs/AV/RTCP_Packet.h"
00044 #include "ace/OS_NS_time.h"
00045 #include "ace/OS_NS_strings.h"
00046 #include "ace/Truncate.h"
00047
00048 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00049
00050 int
00051 TAO_AV_RTCP_Callback::receive_control_frame (ACE_Message_Block *data,
00052 const ACE_Addr &peer_address)
00053 {
00054 int length = static_cast<int> (data->length ());
00055 int more = length;
00056 char *buf_ptr = data->rd_ptr ();
00057 char first_rtcp_packet = 1;
00058 RTCP_Channel_In *c;
00059
00060
00061
00062
00063 while (more > 0)
00064 {
00065
00066 switch ((unsigned char)buf_ptr[length - more + 1])
00067 {
00068 case RTCP_PT_SR:
00069 {
00070 RTCP_SR_Packet sr(&buf_ptr[length-more],
00071 &more);
00072
00073 if (!sr.is_valid(first_rtcp_packet))
00074 ACE_DEBUG ((LM_DEBUG,
00075 "TAO_AV_RTCP_Callback::receive_control_frame - "
00076 "warning invalid rtcp packet\n"));
00077
00078 if (this->inputs_.find (sr.ssrc (), c) == -1)
00079 {
00080 ACE_NEW_RETURN (c,
00081 RTCP_Channel_In (sr.ssrc (),
00082 &peer_address),
00083 -1);
00084 this->inputs_.bind (sr.ssrc (), c);
00085 }
00086 c->updateStatistics (&sr);
00087
00088 if (TAO_debug_level > 0)
00089 sr.dump ();
00090 break;
00091 }
00092 case RTCP_PT_RR:
00093 {
00094 RTCP_RR_Packet rr(&buf_ptr[length-more],
00095 &more);
00096
00097 if (!rr.is_valid(first_rtcp_packet))
00098 ACE_DEBUG ((LM_DEBUG,
00099 "TAO_AV_RTCP_Callback::receive_control_frame - "
00100 "warning invalid rtcp packet\n"));
00101
00102 if (this->inputs_.find (rr.ssrc (), c) == -1)
00103 {
00104 ACE_NEW_RETURN (c,
00105 RTCP_Channel_In (rr.ssrc (),
00106 &peer_address),
00107 -1);
00108 this->inputs_.bind (rr.ssrc (), c);
00109 }
00110
00111 c->updateStatistics (&rr);
00112
00113 if (TAO_debug_level > 0)
00114 rr.dump ();
00115 break;
00116 }
00117 case RTCP_PT_SDES:
00118 {
00119 RTCP_SDES_Packet sdes (&buf_ptr[length-more],
00120 &more);
00121
00122 if (!sdes.is_valid(first_rtcp_packet))
00123 ACE_DEBUG ((LM_DEBUG,
00124 "TAO_AV_RTCP_Callback::receive_control_frame - "
00125 "warning invalid rtcp packet\n"));
00126
00127 if (TAO_debug_level > 0)
00128 sdes.dump ();
00129 break;
00130 }
00131 case RTCP_PT_BYE:
00132 {
00133 RTCP_BYE_Packet bye (&buf_ptr[length-more],
00134 &more);
00135
00136 if (!bye.is_valid(first_rtcp_packet))
00137 ACE_DEBUG ((LM_DEBUG,
00138 "TAO_AV_RTCP_Callback::receive_control_frame - "
00139 "warning invalid rtcp packet\n"));
00140
00141
00142 ACE_UINT32 *ssrc_list;
00143 unsigned char length;
00144
00145 bye.ssrc_list(&ssrc_list, length);
00146
00147 for (int i=0; i<length; i++)
00148 {
00149 RTCP_Channel_In *c = 0;
00150
00151
00152 this->inputs_.unbind(ssrc_list[i], c);
00153
00154 if (c != 0)
00155 delete c;
00156 }
00157
00158 if (TAO_debug_level > 0)
00159 bye.dump ();
00160
00161 break;
00162 }
00163 case RTCP_PT_APP:
00164
00165 ACE_DEBUG ((LM_DEBUG,
00166 "TAO_AV_RTCP_Callback::receive_control_frame - "
00167 "APP packet - ignore\n"));
00168 more -= (4 + (ACE_UINT16)buf_ptr[length - more + 2]);
00169 break;
00170 default:
00171 ACE_DEBUG ((LM_DEBUG,
00172 "TAO_AV_RTCP_Callback::receive_control_frame - "
00173 "UNKNOWN packet type %u; ignore the rest\n",
00174 (int)buf_ptr[length - more + 1]));
00175 more = 0;
00176 }
00177
00178 first_rtcp_packet = 0;
00179
00180 }
00181
00182 if (more != 0)
00183 ACE_DEBUG ((LM_DEBUG,
00184 "TAO_AV_RTCP_Callback::receive_control_frame - "
00185 "Error in overall packet length\n"));
00186 return 0;
00187 }
00188
00189 ACE_INT32 random32 (int);
00190
00191 ACE_UINT32
00192 TAO_AV_RTCP::alloc_srcid (ACE_UINT32 addr)
00193 {
00194 md5_string s;
00195
00196 s.type = addr;
00197 s.tv = ACE_OS::gettimeofday ();
00198 s.pid = ACE_OS::getpid();
00199 s.pgid = ACE_OS::getpgid(s.pid);
00200 s.ppid = ACE_OS::getppid();
00201 s.uid = ACE_OS::getuid();
00202 s.gid = ACE_OS::getgid();
00203
00204 unsigned char *string_val = (unsigned char *) &s;
00205 int length = sizeof(s);
00206
00207 MD5_CTX context;
00208 union
00209 {
00210 char c[16];
00211 u_long x[4];
00212 } digest;
00213 ACE_UINT32 r;
00214 int i;
00215
00216 MD5Init (&context);
00217 MD5Update (&context, string_val, length);
00218 MD5Final ((unsigned char*)&digest, &context);
00219 r=0;
00220 for (i=0; i<3; i++)
00221 r ^= digest.x[i];
00222
00223 return r;
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233 }
00234
00235
00236 double
00237 TAO_AV_RTCP::rtcp_interval (int members,
00238 int senders,
00239 double rtcp_bw,
00240 int we_sent,
00241 int packet_size,
00242 int *avg_rtcp_size,
00243 int initial)
00244 {
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262
00263
00264
00265
00266 double t;
00267 double rtcp_min_time = RTCP_MIN_RPT_TIME;
00268 int n;
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278 if (initial)
00279 {
00280
00281 ACE_OS::srand(ACE_Utils::truncate_cast<u_int> (ACE_OS::time(0L)));
00282
00283 rtcp_min_time /= 2;
00284 *avg_rtcp_size = 128;
00285 }
00286
00287
00288
00289
00290 n = members;
00291 if ((senders > 0) && (senders < members*RTCP_SENDER_BW_FRACTION))
00292 {
00293 if (we_sent)
00294 {
00295 rtcp_bw *= RTCP_SENDER_BW_FRACTION;
00296 n = senders;
00297 }
00298 else
00299 {
00300 rtcp_bw *= RTCP_RECEIVER_BW_FRACTION;
00301 n -= senders;
00302 }
00303 }
00304
00305
00306
00307 *avg_rtcp_size += (int)((packet_size - *avg_rtcp_size)*RTCP_SIZE_GAIN);
00308
00309
00310
00311
00312
00313
00314
00315
00316 t = (*avg_rtcp_size) * n / rtcp_bw;
00317 if (t < rtcp_min_time)
00318 t = rtcp_min_time;
00319
00320
00321
00322
00323
00324
00325 int max_rand = 32768;
00326
00327 return t * ((double)ACE_OS::rand()/max_rand + 0.5);
00328
00329 }
00330
00331
00332
00333
00334 TAO_AV_RTCP_Flow_Factory::TAO_AV_RTCP_Flow_Factory (void)
00335 {
00336 }
00337
00338 TAO_AV_RTCP_Flow_Factory::~TAO_AV_RTCP_Flow_Factory (void)
00339 {
00340 }
00341
00342 int
00343 TAO_AV_RTCP_Flow_Factory::match_protocol (const char *flow_string)
00344 {
00345 if (ACE_OS::strncasecmp (flow_string,"RTCP",4) == 0)
00346 return 1;
00347
00348 return 0;
00349 }
00350
00351 int
00352 TAO_AV_RTCP_Flow_Factory::init (int ,
00353 ACE_TCHAR * [])
00354 {
00355 return 0;
00356 }
00357
00358 TAO_AV_Protocol_Object*
00359 TAO_AV_RTCP_Flow_Factory::make_protocol_object (TAO_FlowSpec_Entry * ,
00360 TAO_Base_StreamEndPoint * ,
00361 TAO_AV_Flow_Handler *handler,
00362 TAO_AV_Transport *transport)
00363 {
00364 TAO_AV_Callback *client_cb = 0;
00365 TAO_AV_RTCP_Callback *rtcp_cb = 0;
00366
00367
00368
00369
00370
00371 TAO_AV_Protocol_Object *object = 0;
00372 ACE_NEW_RETURN (object,
00373 TAO_AV_RTCP_Object (client_cb,
00374 rtcp_cb,
00375 transport),
00376 0);
00377
00378 rtcp_cb->open (object, handler);
00379
00380 return object;
00381 }
00382
00383
00384 int
00385 TAO_AV_RTCP_Object::handle_input (void)
00386 {
00387 size_t bufsiz = 2*this->transport_->mtu ();
00388 ACE_Message_Block data (bufsiz);
00389
00390 int n = this->transport_->recv (data.rd_ptr (),bufsiz);
00391 if (n == 0)
00392 {
00393 if (TAO_debug_level > 0)
00394 ACE_DEBUG ((LM_ERROR, "TAO_AV_RTCP::handle_input:connection closed\n"));
00395 return -1;
00396 }
00397 if (n < 0)
00398 {
00399 if (TAO_debug_level > 0)
00400 ACE_DEBUG ((LM_ERROR,"TAO_AV_RTCP::handle_input:recv error\n"));
00401 return -1;
00402 }
00403 data.wr_ptr (n);
00404 ACE_Addr *peer_addr = this->transport_->get_peer_addr ();
00405 this->callback_->receive_control_frame (&data,*peer_addr);
00406 return 0;
00407 }
00408
00409 int
00410 TAO_AV_RTCP_Object::send_frame (ACE_Message_Block *frame,
00411 TAO_AV_frame_info * )
00412 {
00413 return this->transport_->send (frame);
00414 }
00415
00416 int
00417 TAO_AV_RTCP_Object::send_frame (const iovec *iov,
00418 int iovcnt,
00419 TAO_AV_frame_info * )
00420 {
00421 return this->transport_->send (iov,
00422 iovcnt);
00423 }
00424
00425 int
00426 TAO_AV_RTCP_Object::send_frame (const char*,
00427 size_t)
00428 {
00429 return 0;
00430 }
00431
00432 TAO_AV_RTCP_Object::TAO_AV_RTCP_Object (TAO_AV_Callback *client_cb,
00433 TAO_AV_RTCP_Callback *&rtcp_cb,
00434 TAO_AV_Transport *transport)
00435 :TAO_AV_Protocol_Object (&rtcp_cb_, transport)
00436 {
00437 rtcp_cb = &this->rtcp_cb_;
00438 this->client_cb_ = client_cb;
00439
00440 }
00441
00442 TAO_AV_RTCP_Object::~TAO_AV_RTCP_Object (void)
00443 {
00444 }
00445
00446 int
00447 TAO_AV_RTCP_Object::destroy (void)
00448 {
00449 this->callback_->handle_destroy ();
00450 delete this;
00451
00452 return 0;
00453 }
00454
00455 int
00456 TAO_AV_RTCP_Object::set_policies (const TAO_AV_PolicyList &)
00457 {
00458 return -1;
00459 }
00460
00461 int
00462 TAO_AV_RTCP_Object::start (void)
00463 {
00464 return this->callback_->handle_start ();
00465 }
00466
00467 int
00468 TAO_AV_RTCP_Object::stop (void)
00469 {
00470 return this->callback_->handle_stop ();
00471 }
00472
00473 int
00474 TAO_AV_RTCP_Object::handle_control_input (ACE_Message_Block *frame,
00475 const ACE_Addr &peer_address)
00476 {
00477 return this->callback_->receive_frame (frame,
00478 0,
00479 peer_address);
00480 }
00481
00482 int
00483 TAO_AV_RTCP_Object::handle_control_output (ACE_Message_Block *frame)
00484 {
00485 TAO_AV_RTCP_Callback *cb = dynamic_cast<TAO_AV_RTCP_Callback*> (this->callback_);
00486
00487 return cb->send_frame (frame);
00488 }
00489
00490 void
00491 TAO_AV_RTCP_Object::ts_offset (ACE_UINT32 ts_offset)
00492 {
00493 TAO_AV_RTCP_Callback *cb = dynamic_cast<TAO_AV_RTCP_Callback*> (this->callback_);
00494 cb->ts_offset (ts_offset);
00495 }
00496
00497
00498 TAO_AV_RTCP_Callback::TAO_AV_RTCP_Callback (void)
00499 :is_initial_timeout_(1),
00500 packet_size_(0)
00501 {
00502 char cname[256];
00503 char host[256];
00504 ACE_OS::hostname(host, sizeof(host));
00505
00506
00507 ACE_OS::sprintf(cname, "username@%s", host);
00508
00509 this->output_.cname(cname);
00510 }
00511
00512 TAO_AV_RTCP_Callback::~TAO_AV_RTCP_Callback (void)
00513 {
00514 }
00515
00516 void
00517 TAO_AV_RTCP_Callback::schedule (int ms)
00518 {
00519 this->timeout_ = ms;
00520 }
00521
00522 int
00523 TAO_AV_RTCP_Callback::handle_start (void)
00524 {
00525 return 0;
00526 }
00527
00528 int
00529 TAO_AV_RTCP_Callback::handle_stop (void)
00530 {
00531 return this->send_report(1);
00532 }
00533
00534 int
00535 TAO_AV_RTCP_Callback::handle_timeout (void * )
00536 {
00537 return this->send_report(0);
00538 }
00539
00540 int
00541 TAO_AV_RTCP_Callback::send_report (int bye)
00542 {
00543
00544 TAO_AV_RTCP_Object *rtcp_prot_obj = dynamic_cast<TAO_AV_RTCP_Object*> (this->protocol_object_);
00545 ACE_UINT32 my_ssrc = rtcp_prot_obj->ssrc ();
00546
00547 RTCP_Packet *cp;
00548 RTCP_SDES_Packet sdes;
00549 ACE_CString value = "";
00550 ACE_CString note = "";
00551 unsigned char sdes_type = 0;
00552 RTCP_BYE_Packet *bye_packet = 0;
00553 ACE_UINT32 ssrc_list[1];
00554
00555
00556 ACE_Hash_Map_Iterator<ACE_UINT32, RTCP_Channel_In*, ACE_Null_Mutex> iter (this->inputs_);
00557 iter = this->inputs_.begin();
00558
00559
00560 RR_Block *blocks = 0;
00561 RR_Block *b_iter = 0;
00562 RR_Block *b_ptr = 0;
00563
00564 while (iter != this->inputs_.end() )
00565 {
00566 if (!b_iter)
00567 {
00568 b_ptr = (*iter).int_id_->getRRBlock ();
00569 if (b_ptr)
00570 {
00571 blocks = b_ptr;
00572 b_iter = b_ptr;
00573 }
00574 }
00575 else
00576 {
00577 b_ptr = (*iter).int_id_->getRRBlock ();
00578 if (b_ptr)
00579 {
00580 b_iter->next_ = b_ptr;
00581 }
00582 }
00583
00584 iter++;
00585 }
00586
00587 if (b_iter)
00588 b_iter->next_ = 0;
00589
00590 if (this->output_.active ())
00591 {
00592
00593 ACE_Time_Value unix_now = ACE_OS::gettimeofday ();
00594 TAO_AV_RTCP::ntp64 ntp_now = ntp64time (unix_now);
00595 ACE_UINT32 rtp_ts = ACE_Utils::truncate_cast<ACE_UINT32> (
00596 unix_now.sec () * 8000 + unix_now.usec () / 125 +
00597 this->timestamp_offset_);
00598 ACE_NEW_RETURN(cp,
00599 RTCP_SR_Packet (my_ssrc,
00600 ntp_now.upper,
00601 ntp_now.lower,
00602 rtp_ts,
00603 this->output_.packets_sent (),
00604 this->output_.octets_sent (),
00605 blocks),
00606 -1);
00607 }
00608 else
00609 {
00610 ACE_NEW_RETURN(cp,
00611 RTCP_RR_Packet (my_ssrc,
00612 blocks),
00613 -1);
00614 }
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628 switch (this->sdes_count_%8)
00629 {
00630 case 0:
00631 case 4:
00632 value = "tao-users@wustl.edu";
00633 sdes_type = RTCP_SDES_EMAIL;
00634 break;
00635 case 2:
00636 if (note.length () > 0)
00637 {
00638 value = "Joe User";
00639 sdes_type = RTCP_SDES_NAME;
00640 }
00641 else
00642 {
00643 value = "An important note...";
00644 sdes_type = RTCP_SDES_NOTE;
00645 }
00646 break;
00647 case 6:
00648 value = "TAO A/V Service";
00649 sdes_type = RTCP_SDES_TOOL;
00650 break;
00651 case 1:
00652 case 3:
00653 case 5:
00654 case 7:
00655 if (note.length () == 0)
00656 {
00657 value = "Joe User";
00658 sdes_type = RTCP_SDES_NAME;
00659 }
00660 else
00661 {
00662 value = "An important note...";
00663 sdes_type = RTCP_SDES_NOTE;
00664 }
00665 break;
00666 }
00667
00668 ++this->sdes_count_;
00669
00670 sdes.add_item (my_ssrc,
00671 RTCP_SDES_CNAME,
00672 static_cast<unsigned char> (ACE_OS::strlen(this->output_.cname())),
00673 this->output_.cname());
00674 if (bye)
00675 {
00676 ssrc_list[0] = rtcp_prot_obj->ssrc ();
00677
00678 ACE_NEW_RETURN (bye_packet,
00679 RTCP_BYE_Packet(ssrc_list,
00680 sizeof(ssrc_list)/sizeof(ssrc_list[0]),
00681 "Got bored."),
00682 -1);
00683 }
00684 else
00685 {
00686 unsigned char length = (unsigned char)(value.length() & 0xFF);
00687 sdes.add_item (my_ssrc, sdes_type, length, value.c_str ());
00688 }
00689
00690
00691 char *cp_ptr;
00692 char *sdes_ptr;
00693 char *bye_ptr = 0;
00694 ACE_UINT16 cp_length;
00695 ACE_UINT16 sdes_length;
00696 ACE_UINT16 bye_length = 0;
00697 cp->get_packet_data (&cp_ptr, cp_length);
00698 sdes.get_packet_data (&sdes_ptr, sdes_length);
00699 if (bye_packet)
00700 bye_packet->get_packet_data(&bye_ptr, bye_length);
00701
00702 ACE_Message_Block mb (cp_length + sdes_length + bye_length);
00703
00704 ACE_OS::memcpy (mb.wr_ptr (), cp_ptr, cp_length);
00705 mb.wr_ptr (cp_length);
00706 ACE_OS::memcpy (mb.wr_ptr (), sdes_ptr, sdes_length);
00707 mb.wr_ptr (sdes_length);
00708 if (bye_length)
00709 {
00710 ACE_OS::memcpy (mb.wr_ptr (), bye_ptr, bye_length);
00711 mb.wr_ptr (bye_length);
00712 }
00713
00714
00715 this->protocol_object_->send_frame (&mb);
00716
00717 this->packet_size_ = cp_length + sdes_length + bye_length;
00718
00719 delete cp;
00720 if (bye_packet)
00721 delete bye_packet;
00722
00723 return 0;
00724 }
00725
00726 void
00727
00728 TAO_AV_RTCP_Callback::get_timeout (ACE_Time_Value *&tv,
00729 void *& )
00730 {
00731 int senders = 0;
00732 int members = 1;
00733
00734
00735 double rtcp_bw = 1000;
00736 double interval;
00737
00738 ACE_Hash_Map_Iterator<ACE_UINT32, RTCP_Channel_In*, ACE_Null_Mutex> iter (this->inputs_);
00739 iter = this->inputs_.begin();
00740
00741 if (this->output_.active ())
00742 senders++;
00743
00744
00745 while (iter != this->inputs_.end ())
00746 {
00747 if ((*iter).int_id_->active ())
00748 {
00749 if ((*iter).int_id_->sender ())
00750 senders++;
00751 members++;
00752 }
00753 iter++;
00754 }
00755
00756
00757 interval = TAO_AV_RTCP::rtcp_interval (members,
00758 senders,
00759 rtcp_bw,
00760 this->output_.active (),
00761 this->packet_size_,
00762 &this->avg_rtcp_size_,
00763 this->is_initial_timeout_);
00764
00765 this->is_initial_timeout_ = 0;
00766
00767 ACE_NEW (tv,
00768 ACE_Time_Value);
00769
00770 tv->sec ((int)interval);
00771 tv->usec ((int)((interval - (int)interval) * 1000000));
00772 }
00773
00774 int
00775 TAO_AV_RTCP_Callback::handle_destroy (void)
00776 {
00777 return 0;
00778 }
00779
00780 int
00781 TAO_AV_RTCP_Callback::receive_frame (ACE_Message_Block *frame,
00782 TAO_AV_frame_info *,
00783 const ACE_Addr &peer_address)
00784 {
00785 RTCP_Channel_In *c;
00786
00787 RTP_Packet packet (frame->rd_ptr(), static_cast<int> (frame->length()));
00788
00789 if (this->inputs_.find (packet.ssrc(), c) < 0)
00790 {
00791 ACE_NEW_RETURN (c,
00792 RTCP_Channel_In (packet.ssrc(),
00793 &peer_address),
00794 -1);
00795
00796 this->inputs_.bind (packet.ssrc(), c);
00797 }
00798
00799 c->recv_rtp_packet (frame, &peer_address);
00800 return 0;
00801 }
00802
00803 int
00804 TAO_AV_RTCP_Callback::send_frame (ACE_Message_Block *frame)
00805 {
00806 RTP_Packet packet (frame->rd_ptr(), static_cast<int> (frame->length()));
00807 this->output_.updateStatistics (&packet);
00808
00809 return 0;
00810 }
00811
00812 void
00813 TAO_AV_RTCP_Callback::ts_offset (ACE_UINT32 offset)
00814 {
00815 this->timestamp_offset_ = offset;
00816 }
00817
00818 TAO_END_VERSIONED_NAMESPACE_DECL
00819
00820 ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_RTCP_Flow_Factory)
00821 ACE_STATIC_SVC_DEFINE (TAO_AV_RTCP_Flow_Factory,
00822 ACE_TEXT ("RTCP_Flow_Factory"),
00823 ACE_SVC_OBJ_T,
00824 &ACE_SVC_NAME (TAO_AV_RTCP_Flow_Factory),
00825 ACE_Service_Type::DELETE_THIS |
00826 ACE_Service_Type::DELETE_OBJ,
00827 0)
00828