00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include "orbsvcs/AV/ntp-time.h"
00037 #include "orbsvcs/AV/RTCP.h"
00038 #include "orbsvcs/AV/media_timer.h"
00039 #include "tao/debug.h"
00040 #include "orbsvcs/AV/global.h"
00041 #include "orbsvcs/AV/md5.h"
00042
00043 #include "orbsvcs/AV/RTCP_Packet.h"
00044 #include "ace/OS_NS_time.h"
00045 #include "ace/OS_NS_strings.h"
00046
00047 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00048
00049 int
00050 TAO_AV_RTCP_Callback::receive_control_frame (ACE_Message_Block *data,
00051 const ACE_Addr &peer_address)
00052 {
00053 int length = static_cast<int> (data->length ());
00054 int more = length;
00055 char *buf_ptr = data->rd_ptr ();
00056 char first_rtcp_packet = 1;
00057 RTCP_Channel_In *c;
00058
00059
00060
00061
00062 while (more > 0)
00063 {
00064
00065 switch ((unsigned char)buf_ptr[length - more + 1])
00066 {
00067 case RTCP_PT_SR:
00068 {
00069 RTCP_SR_Packet sr(&buf_ptr[length-more],
00070 &more);
00071
00072 if (!sr.is_valid(first_rtcp_packet))
00073 ACE_DEBUG ((LM_DEBUG,
00074 "TAO_AV_RTCP_Callback::receive_control_frame - "
00075 "warning invalid rtcp packet\n"));
00076
00077 if (this->inputs_.find (sr.ssrc (), c) == -1)
00078 {
00079 ACE_NEW_RETURN (c,
00080 RTCP_Channel_In (sr.ssrc (),
00081 &peer_address),
00082 -1);
00083 this->inputs_.bind (sr.ssrc (), c);
00084 }
00085 c->updateStatistics (&sr);
00086
00087 if (TAO_debug_level > 0)
00088 sr.dump ();
00089 break;
00090 }
00091 case RTCP_PT_RR:
00092 {
00093 RTCP_RR_Packet rr(&buf_ptr[length-more],
00094 &more);
00095
00096 if (!rr.is_valid(first_rtcp_packet))
00097 ACE_DEBUG ((LM_DEBUG,
00098 "TAO_AV_RTCP_Callback::receive_control_frame - "
00099 "warning invalid rtcp packet\n"));
00100
00101 if (this->inputs_.find (rr.ssrc (), c) == -1)
00102 {
00103 ACE_NEW_RETURN (c,
00104 RTCP_Channel_In (rr.ssrc (),
00105 &peer_address),
00106 -1);
00107 this->inputs_.bind (rr.ssrc (), c);
00108 }
00109
00110 c->updateStatistics (&rr);
00111
00112 if (TAO_debug_level > 0)
00113 rr.dump ();
00114 break;
00115 }
00116 case RTCP_PT_SDES:
00117 {
00118 RTCP_SDES_Packet sdes (&buf_ptr[length-more],
00119 &more);
00120
00121 if (!sdes.is_valid(first_rtcp_packet))
00122 ACE_DEBUG ((LM_DEBUG,
00123 "TAO_AV_RTCP_Callback::receive_control_frame - "
00124 "warning invalid rtcp packet\n"));
00125
00126 if (TAO_debug_level > 0)
00127 sdes.dump ();
00128 break;
00129 }
00130 case RTCP_PT_BYE:
00131 {
00132 RTCP_BYE_Packet bye (&buf_ptr[length-more],
00133 &more);
00134
00135 if (!bye.is_valid(first_rtcp_packet))
00136 ACE_DEBUG ((LM_DEBUG,
00137 "TAO_AV_RTCP_Callback::receive_control_frame - "
00138 "warning invalid rtcp packet\n"));
00139
00140
00141 ACE_UINT32 *ssrc_list;
00142 unsigned char length;
00143
00144 bye.ssrc_list(&ssrc_list, length);
00145
00146 for (int i=0; i<length; i++)
00147 {
00148 RTCP_Channel_In *c = 0;
00149
00150
00151 this->inputs_.unbind(ssrc_list[i], c);
00152
00153 if (c != 0)
00154 delete c;
00155 }
00156
00157 if (TAO_debug_level > 0)
00158 bye.dump ();
00159
00160 break;
00161 }
00162 case RTCP_PT_APP:
00163
00164 ACE_DEBUG ((LM_DEBUG,
00165 "TAO_AV_RTCP_Callback::receive_control_frame - "
00166 "APP packet - ignore\n"));
00167 more -= (4 + (ACE_UINT16)buf_ptr[length - more + 2]);
00168 break;
00169 default:
00170 ACE_DEBUG ((LM_DEBUG,
00171 "TAO_AV_RTCP_Callback::receive_control_frame - "
00172 "UNKNOWN packet type %u; ignore the rest\n",
00173 (int)buf_ptr[length - more + 1]));
00174 more = 0;
00175 }
00176
00177 first_rtcp_packet = 0;
00178
00179 }
00180
00181 if (more != 0)
00182 ACE_DEBUG ((LM_DEBUG,
00183 "TAO_AV_RTCP_Callback::receive_control_frame - "
00184 "Error in overall packet length\n"));
00185 return 0;
00186 }
00187
00188 ACE_INT32 random32 (int);
00189
00190 ACE_UINT32
00191 TAO_AV_RTCP::alloc_srcid (ACE_UINT32 addr)
00192 {
00193 md5_string s;
00194
00195 s.type = addr;
00196 s.tv = ACE_OS::gettimeofday ();
00197 s.pid = ACE_OS::getpid();
00198 s.pgid = ACE_OS::getpgid(s.pid);
00199 s.ppid = ACE_OS::getppid();
00200 s.uid = ACE_OS::getuid();
00201 s.gid = ACE_OS::getgid();
00202
00203 unsigned char *string_val = (unsigned char *) &s;
00204 int length = sizeof(s);
00205
00206 MD5_CTX context;
00207 union
00208 {
00209 char c[16];
00210 u_long x[4];
00211 } digest;
00212 ACE_UINT32 r;
00213 int i;
00214
00215 MD5Init (&context);
00216 MD5Update (&context, string_val, length);
00217 MD5Final ((unsigned char*)&digest, &context);
00218 r=0;
00219 for (i=0; i<3; i++)
00220 r ^= digest.x[i];
00221
00222 return r;
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232 }
00233
00234
00235 double
00236 TAO_AV_RTCP::rtcp_interval (int members,
00237 int senders,
00238 double rtcp_bw,
00239 int we_sent,
00240 int packet_size,
00241 int *avg_rtcp_size,
00242 int initial)
00243 {
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262
00263
00264
00265 double t;
00266 double rtcp_min_time = RTCP_MIN_RPT_TIME;
00267 int n;
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277 if (initial)
00278 {
00279
00280 ACE_OS::srand(ACE_OS::time(0L));
00281
00282 rtcp_min_time /= 2;
00283 *avg_rtcp_size = 128;
00284 }
00285
00286
00287
00288
00289 n = members;
00290 if ((senders > 0) && (senders < members*RTCP_SENDER_BW_FRACTION))
00291 {
00292 if (we_sent)
00293 {
00294 rtcp_bw *= RTCP_SENDER_BW_FRACTION;
00295 n = senders;
00296 }
00297 else
00298 {
00299 rtcp_bw *= RTCP_RECEIVER_BW_FRACTION;
00300 n -= senders;
00301 }
00302 }
00303
00304
00305
00306 *avg_rtcp_size += (int)((packet_size - *avg_rtcp_size)*RTCP_SIZE_GAIN);
00307
00308
00309
00310
00311
00312
00313
00314
00315 t = (*avg_rtcp_size) * n / rtcp_bw;
00316 if (t < rtcp_min_time)
00317 t = rtcp_min_time;
00318
00319
00320
00321
00322
00323
00324 int max_rand = 32768;
00325
00326 return t * ((double)ACE_OS::rand()/max_rand + 0.5);
00327
00328 }
00329
00330
00331
00332
00333 TAO_AV_RTCP_Flow_Factory::TAO_AV_RTCP_Flow_Factory (void)
00334 {
00335 }
00336
00337 TAO_AV_RTCP_Flow_Factory::~TAO_AV_RTCP_Flow_Factory (void)
00338 {
00339 }
00340
00341 int
00342 TAO_AV_RTCP_Flow_Factory::match_protocol (const char *flow_string)
00343 {
00344 if (ACE_OS::strncasecmp (flow_string,"RTCP",4) == 0)
00345 return 1;
00346
00347 return 0;
00348 }
00349
00350 int
00351 TAO_AV_RTCP_Flow_Factory::init (int ,
00352 char * [])
00353 {
00354 return 0;
00355 }
00356
00357 TAO_AV_Protocol_Object*
00358 TAO_AV_RTCP_Flow_Factory::make_protocol_object (TAO_FlowSpec_Entry * ,
00359 TAO_Base_StreamEndPoint * ,
00360 TAO_AV_Flow_Handler *handler,
00361 TAO_AV_Transport *transport)
00362 {
00363 TAO_AV_Callback *client_cb = 0;
00364 TAO_AV_RTCP_Callback *rtcp_cb = 0;
00365
00366
00367
00368
00369
00370 TAO_AV_Protocol_Object *object = 0;
00371 ACE_NEW_RETURN (object,
00372 TAO_AV_RTCP_Object (client_cb,
00373 rtcp_cb,
00374 transport),
00375 0);
00376
00377 rtcp_cb->open (object, handler);
00378
00379 return object;
00380 }
00381
00382
00383 int
00384 TAO_AV_RTCP_Object::handle_input (void)
00385 {
00386 size_t bufsiz = 2*this->transport_->mtu ();
00387 ACE_Message_Block data (bufsiz);
00388
00389 int n = this->transport_->recv (data.rd_ptr (),bufsiz);
00390 if (n == 0)
00391 {
00392 if (TAO_debug_level > 0)
00393 ACE_DEBUG ((LM_ERROR, "TAO_AV_RTCP::handle_input:connection closed\n"));
00394 return -1;
00395 }
00396 if (n < 0)
00397 {
00398 if (TAO_debug_level > 0)
00399 ACE_DEBUG ((LM_ERROR,"TAO_AV_RTCP::handle_input:recv error\n"));
00400 return -1;
00401 }
00402 data.wr_ptr (n);
00403 ACE_Addr *peer_addr = this->transport_->get_peer_addr ();
00404 this->callback_->receive_control_frame (&data,*peer_addr);
00405 return 0;
00406 }
00407
00408 int
00409 TAO_AV_RTCP_Object::send_frame (ACE_Message_Block *frame,
00410 TAO_AV_frame_info * )
00411 {
00412 return this->transport_->send (frame);
00413 }
00414
00415 int
00416 TAO_AV_RTCP_Object::send_frame (const iovec *iov,
00417 int iovcnt,
00418 TAO_AV_frame_info * )
00419 {
00420 return this->transport_->send (iov,
00421 iovcnt);
00422 }
00423
00424 int
00425 TAO_AV_RTCP_Object::send_frame (const char*,
00426 size_t)
00427 {
00428 return 0;
00429 }
00430
00431 TAO_AV_RTCP_Object::TAO_AV_RTCP_Object (TAO_AV_Callback *client_cb,
00432 TAO_AV_RTCP_Callback *&rtcp_cb,
00433 TAO_AV_Transport *transport)
00434 :TAO_AV_Protocol_Object (&rtcp_cb_, transport)
00435 {
00436 rtcp_cb = &this->rtcp_cb_;
00437 this->client_cb_ = client_cb;
00438
00439 }
00440
00441 TAO_AV_RTCP_Object::~TAO_AV_RTCP_Object (void)
00442 {
00443 }
00444
00445 int
00446 TAO_AV_RTCP_Object::destroy (void)
00447 {
00448 this->callback_->handle_destroy ();
00449 delete this;
00450
00451 return 0;
00452 }
00453
00454 int
00455 TAO_AV_RTCP_Object::set_policies (const TAO_AV_PolicyList &)
00456 {
00457 return -1;
00458 }
00459
00460 int
00461 TAO_AV_RTCP_Object::start (void)
00462 {
00463 return this->callback_->handle_start ();
00464 }
00465
00466 int
00467 TAO_AV_RTCP_Object::stop (void)
00468 {
00469 return this->callback_->handle_stop ();
00470 }
00471
00472 int
00473 TAO_AV_RTCP_Object::handle_control_input (ACE_Message_Block *frame,
00474 const ACE_Addr &peer_address)
00475 {
00476 return this->callback_->receive_frame (frame,
00477 0,
00478 peer_address);
00479 }
00480
00481 int
00482 TAO_AV_RTCP_Object::handle_control_output (ACE_Message_Block *frame)
00483 {
00484 TAO_AV_RTCP_Callback *cb = dynamic_cast<TAO_AV_RTCP_Callback*> (this->callback_);
00485
00486 return cb->send_frame (frame);
00487 }
00488
00489 void
00490 TAO_AV_RTCP_Object::ts_offset (ACE_UINT32 ts_offset)
00491 {
00492 TAO_AV_RTCP_Callback *cb = dynamic_cast<TAO_AV_RTCP_Callback*> (this->callback_);
00493 cb->ts_offset (ts_offset);
00494 }
00495
00496
00497 TAO_AV_RTCP_Callback::TAO_AV_RTCP_Callback (void)
00498 :is_initial_timeout_(1),
00499 packet_size_(0)
00500 {
00501 char cname[256];
00502 char host[256];
00503 ACE_OS::hostname(host, sizeof(host));
00504
00505
00506 ACE_OS::sprintf(cname, "username@%s", host);
00507
00508 this->output_.cname(cname);
00509 }
00510
00511 TAO_AV_RTCP_Callback::~TAO_AV_RTCP_Callback (void)
00512 {
00513 }
00514
00515 void
00516 TAO_AV_RTCP_Callback::schedule (int ms)
00517 {
00518 this->timeout_ = ms;
00519 }
00520
00521 int
00522 TAO_AV_RTCP_Callback::handle_start (void)
00523 {
00524 return 0;
00525 }
00526
00527 int
00528 TAO_AV_RTCP_Callback::handle_stop (void)
00529 {
00530 return this->send_report(1);
00531 }
00532
00533 int
00534 TAO_AV_RTCP_Callback::handle_timeout (void * )
00535 {
00536 return this->send_report(0);
00537 }
00538
00539 int
00540 TAO_AV_RTCP_Callback::send_report (int bye)
00541 {
00542
00543 TAO_AV_RTCP_Object *rtcp_prot_obj = dynamic_cast<TAO_AV_RTCP_Object*> (this->protocol_object_);
00544 ACE_UINT32 my_ssrc = rtcp_prot_obj->ssrc ();
00545
00546 RTCP_Packet *cp;
00547 RTCP_SDES_Packet sdes;
00548 ACE_CString value = "";
00549 ACE_CString note = "";
00550 unsigned char sdes_type = 0;
00551 RTCP_BYE_Packet *bye_packet = 0;
00552 ACE_UINT32 ssrc_list[1];
00553
00554
00555 ACE_Hash_Map_Iterator<ACE_UINT32, RTCP_Channel_In*, ACE_Null_Mutex> iter (this->inputs_);
00556 iter = this->inputs_.begin();
00557
00558
00559 RR_Block *blocks = 0;
00560 RR_Block *b_iter = 0;
00561 RR_Block *b_ptr = 0;
00562
00563 while (iter != this->inputs_.end() )
00564 {
00565 if (!b_iter)
00566 {
00567 b_ptr = (*iter).int_id_->getRRBlock ();
00568 if (b_ptr)
00569 {
00570 blocks = b_ptr;
00571 b_iter = b_ptr;
00572 }
00573 }
00574 else
00575 {
00576 b_ptr = (*iter).int_id_->getRRBlock ();
00577 if (b_ptr)
00578 {
00579 b_iter->next_ = b_ptr;
00580 }
00581 }
00582
00583 iter++;
00584 }
00585
00586 if (b_iter)
00587 b_iter->next_ = 0;
00588
00589 if (this->output_.active ())
00590 {
00591
00592 ACE_Time_Value unix_now = ACE_OS::gettimeofday ();
00593 TAO_AV_RTCP::ntp64 ntp_now = ntp64time (unix_now);
00594 ACE_UINT32 rtp_ts = unix_now.sec () * 8000 + unix_now.usec () / 125 +
00595 this->timestamp_offset_;
00596 ACE_NEW_RETURN(cp,
00597 RTCP_SR_Packet (my_ssrc,
00598 ntp_now.upper,
00599 ntp_now.lower,
00600 rtp_ts,
00601 this->output_.packets_sent (),
00602 this->output_.octets_sent (),
00603 blocks),
00604 -1);
00605 }
00606 else
00607 {
00608 ACE_NEW_RETURN(cp,
00609 RTCP_RR_Packet (my_ssrc,
00610 blocks),
00611 -1);
00612 }
00613
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626 switch (this->sdes_count_%8)
00627 {
00628 case 0:
00629 case 4:
00630 value = "tao-users@wustl.edu";
00631 sdes_type = RTCP_SDES_EMAIL;
00632 break;
00633 case 2:
00634 if (note.length () > 0)
00635 {
00636 value = "Joe User";
00637 sdes_type = RTCP_SDES_NAME;
00638 }
00639 else
00640 {
00641 value = "An important note...";
00642 sdes_type = RTCP_SDES_NOTE;
00643 }
00644 break;
00645 case 6:
00646 value = "TAO A/V Service";
00647 sdes_type = RTCP_SDES_TOOL;
00648 break;
00649 case 1:
00650 case 3:
00651 case 5:
00652 case 7:
00653 if (note.length () == 0)
00654 {
00655 value = "Joe User";
00656 sdes_type = RTCP_SDES_NAME;
00657 }
00658 else
00659 {
00660 value = "An important note...";
00661 sdes_type = RTCP_SDES_NOTE;
00662 }
00663 break;
00664 }
00665
00666 ++this->sdes_count_;
00667
00668 sdes.add_item (my_ssrc,
00669 RTCP_SDES_CNAME,
00670 static_cast<unsigned char> (ACE_OS::strlen(this->output_.cname())),
00671 this->output_.cname());
00672 if (bye)
00673 {
00674 ssrc_list[0] = rtcp_prot_obj->ssrc ();
00675
00676 ACE_NEW_RETURN (bye_packet,
00677 RTCP_BYE_Packet(ssrc_list,
00678 sizeof(ssrc_list)/sizeof(ssrc_list[0]),
00679 "Got bored."),
00680 -1);
00681 }
00682 else
00683 {
00684 unsigned char length = (unsigned char)(value.length() & 0xFF);
00685 sdes.add_item (my_ssrc, sdes_type, length, value.c_str ());
00686 }
00687
00688
00689 char *cp_ptr;
00690 char *sdes_ptr;
00691 char *bye_ptr = 0;
00692 ACE_UINT16 cp_length;
00693 ACE_UINT16 sdes_length;
00694 ACE_UINT16 bye_length = 0;
00695 cp->get_packet_data (&cp_ptr, cp_length);
00696 sdes.get_packet_data (&sdes_ptr, sdes_length);
00697 if (bye_packet)
00698 bye_packet->get_packet_data(&bye_ptr, bye_length);
00699
00700 ACE_Message_Block mb (cp_length + sdes_length + bye_length);
00701
00702 ACE_OS::memcpy (mb.wr_ptr (), cp_ptr, cp_length);
00703 mb.wr_ptr (cp_length);
00704 ACE_OS::memcpy (mb.wr_ptr (), sdes_ptr, sdes_length);
00705 mb.wr_ptr (sdes_length);
00706 if (bye_length)
00707 {
00708 ACE_OS::memcpy (mb.wr_ptr (), bye_ptr, bye_length);
00709 mb.wr_ptr (bye_length);
00710 }
00711
00712
00713 this->protocol_object_->send_frame (&mb);
00714
00715 this->packet_size_ = cp_length + sdes_length + bye_length;
00716
00717 delete cp;
00718 if (bye_packet)
00719 delete bye_packet;
00720
00721 return 0;
00722 }
00723
00724 void
00725
00726 TAO_AV_RTCP_Callback::get_timeout (ACE_Time_Value *&tv,
00727 void *& )
00728 {
00729 int senders = 0;
00730 int members = 1;
00731
00732
00733 double rtcp_bw = 1000;
00734 double interval;
00735
00736 ACE_Hash_Map_Iterator<ACE_UINT32, RTCP_Channel_In*, ACE_Null_Mutex> iter (this->inputs_);
00737 iter = this->inputs_.begin();
00738
00739 if (this->output_.active ())
00740 senders++;
00741
00742
00743 while (iter != this->inputs_.end ())
00744 {
00745 if ((*iter).int_id_->active ())
00746 {
00747 if ((*iter).int_id_->sender ())
00748 senders++;
00749 members++;
00750 }
00751 iter++;
00752 }
00753
00754
00755 interval = TAO_AV_RTCP::rtcp_interval (members,
00756 senders,
00757 rtcp_bw,
00758 this->output_.active (),
00759 this->packet_size_,
00760 &this->avg_rtcp_size_,
00761 this->is_initial_timeout_);
00762
00763 this->is_initial_timeout_ = 0;
00764
00765 ACE_NEW (tv,
00766 ACE_Time_Value);
00767
00768 tv->sec ((int)interval);
00769 tv->usec ((int)((interval - (int)interval) * 1000000));
00770 }
00771
00772 int
00773 TAO_AV_RTCP_Callback::handle_destroy (void)
00774 {
00775 return 0;
00776 }
00777
00778 int
00779 TAO_AV_RTCP_Callback::receive_frame (ACE_Message_Block *frame,
00780 TAO_AV_frame_info *,
00781 const ACE_Addr &peer_address)
00782 {
00783 RTCP_Channel_In *c;
00784
00785 RTP_Packet packet (frame->rd_ptr(), static_cast<int> (frame->length()));
00786
00787 if (this->inputs_.find (packet.ssrc(), c) < 0)
00788 {
00789 ACE_NEW_RETURN (c,
00790 RTCP_Channel_In (packet.ssrc(),
00791 &peer_address),
00792 -1);
00793
00794 this->inputs_.bind (packet.ssrc(), c);
00795 }
00796
00797 c->recv_rtp_packet (frame, &peer_address);
00798 return 0;
00799 }
00800
00801 int
00802 TAO_AV_RTCP_Callback::send_frame (ACE_Message_Block *frame)
00803 {
00804 RTP_Packet packet (frame->rd_ptr(), static_cast<int> (frame->length()));
00805 this->output_.updateStatistics (&packet);
00806
00807 return 0;
00808 }
00809
00810 void
00811 TAO_AV_RTCP_Callback::ts_offset (ACE_UINT32 offset)
00812 {
00813 this->timestamp_offset_ = offset;
00814 }
00815
00816 TAO_END_VERSIONED_NAMESPACE_DECL
00817
00818 ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_RTCP_Flow_Factory)
00819 ACE_STATIC_SVC_DEFINE (TAO_AV_RTCP_Flow_Factory,
00820 ACE_TEXT ("RTCP_Flow_Factory"),
00821 ACE_SVC_OBJ_T,
00822 &ACE_SVC_NAME (TAO_AV_RTCP_Flow_Factory),
00823 ACE_Service_Type::DELETE_THIS |
00824 ACE_Service_Type::DELETE_OBJ,
00825 0)
00826