00001
00002
00003 #include "orbsvcs/AV/UDP.h"
00004 #include "orbsvcs/AV/AVStreams_i.h"
00005 #include "orbsvcs/AV/MCast.h"
00006
00007 #include "tao/debug.h"
00008 #include "ace/OS_NS_strings.h"
00009
00010 #if !defined (__ACE_INLINE__)
00011 #include "orbsvcs/AV/UDP.inl"
00012 #endif
00013
00014 ACE_RCSID (AV,
00015 UDP,
00016 "$Id: UDP.cpp 84563 2009-02-23 08:13:54Z johnnyw $")
00017
00018 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00019
00020
00021
00022
00023
00024 TAO_AV_UDP_Flow_Handler::TAO_AV_UDP_Flow_Handler (void)
00025 {
00026 ACE_NEW (this->transport_,
00027 TAO_AV_UDP_Transport (this));
00028 }
00029
00030 TAO_AV_UDP_Flow_Handler::~TAO_AV_UDP_Flow_Handler (void)
00031 {
00032
00033 TAO_AV_CORE::instance()->reactor ()->remove_handler (this->event_handler(),
00034 ACE_Event_Handler::READ_MASK);
00035
00036
00037 this->close ();
00038 delete this->transport_;
00039 }
00040
00041 TAO_AV_Transport *
00042 TAO_AV_UDP_Flow_Handler::transport (void)
00043 {
00044 return this->transport_;
00045 }
00046
00047 int
00048 TAO_AV_UDP_Flow_Handler::handle_input (ACE_HANDLE )
00049 {
00050 return this->protocol_object_->handle_input ();
00051 }
00052
00053 int
00054 TAO_AV_UDP_Flow_Handler::handle_timeout (const ACE_Time_Value &tv,
00055 const void *arg)
00056 {
00057 return TAO_AV_Flow_Handler::handle_timeout (tv,arg);
00058 }
00059
00060 int
00061 TAO_AV_UDP_Flow_Handler::set_remote_address (ACE_Addr *address)
00062 {
00063 if (TAO_debug_level > 0)
00064 ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Flow_Handler::set_remote_address\n"));
00065
00066 ACE_INET_Addr *inet_addr = dynamic_cast<ACE_INET_Addr*> (address);
00067 this->peer_addr_ = *inet_addr;
00068 TAO_AV_UDP_Transport *transport = dynamic_cast<TAO_AV_UDP_Transport*> (this->transport_);
00069
00070 return transport->set_remote_address (*inet_addr);
00071 }
00072
00073
00074 ACE_HANDLE
00075 TAO_AV_UDP_Flow_Handler::get_handle (void) const
00076 {
00077 if (TAO_debug_level > 0)
00078 ACE_DEBUG ((LM_DEBUG,
00079 "TAO_AV_UDP_Flow_Handler::get_handle:%d\n",
00080 this->sock_dgram_.get_handle ()));
00081
00082 return this->sock_dgram_.get_handle () ;
00083 }
00084
00085 int
00086 TAO_AV_UDP_Flow_Handler::change_qos(AVStreams::QoS qos)
00087 {
00088 if( TAO_debug_level > 0 )
00089 {
00090 ACE_DEBUG ((LM_DEBUG,
00091 "(%N,%l) TAO_AV_UDP_Flow_Handler::change_qos\n"));
00092 }
00093
00094 unsigned int i=0;
00095
00096 int ret = 0;
00097 CORBA::Long dscp = 0;
00098 CORBA::Long ecn = 0;
00099 int dscp_flag=0;
00100 for(i=0; i < qos.QoSParams.length(); i++)
00101 {
00102
00103 if( ACE_OS::strcmp( qos.QoSParams[i].property_name.in(), "Diffserv_Codepoint") == 0)
00104 {
00105 qos.QoSParams[i].property_value >>= dscp;
00106 dscp_flag=1;
00107
00108 if(!((dscp >= 0) && (dscp <= 63)))
00109 {
00110 dscp_flag = 0;
00111 ACE_DEBUG((LM_DEBUG, "(%N,%l) ECN value can only be (0-3) not %d\n", ecn));
00112 return -1;
00113 }
00114 }
00115
00116 if( ACE_OS::strcmp( qos.QoSParams[i].property_name.in(), "ECN") == 0)
00117 {
00118 qos.QoSParams[i].property_value >>= ecn;
00119
00120
00121 if(!((ecn >= 0) && (ecn <= 3)))
00122 {
00123 ACE_DEBUG((LM_DEBUG, "(%N,%l) ECN value can only be (0-3) not %d\n", ecn));
00124 ecn = 0;
00125 }
00126
00127 }
00128 }
00129
00130
00131
00132 if(dscp_flag || ecn)
00133 {
00134 int tos;
00135 tos = (int)(dscp << 2);
00136 if(ecn)
00137 {
00138 tos |= ecn;
00139 }
00140 ret = sock_dgram_.set_option(IPPROTO_IP, IP_TOS, (int *)&tos , (int)sizeof(tos));
00141
00142 if(TAO_debug_level > 1)
00143 {
00144 ACE_DEBUG((LM_DEBUG, "(%N,%l) set tos: ret: %d\n", ret));
00145 }
00146 }
00147
00148 if(TAO_debug_level > 1)
00149 {
00150 if(ret < 0 )
00151 {
00152 ACE_DEBUG((LM_DEBUG, "(%N,%l) errno: %p\n"));
00153 }
00154 }
00155 return ret;
00156 }
00157
00158
00159
00160
00161
00162 TAO_AV_UDP_Transport::TAO_AV_UDP_Transport (void)
00163 :handler_ (0)
00164 {
00165 }
00166
00167 TAO_AV_UDP_Transport::TAO_AV_UDP_Transport (TAO_AV_UDP_Flow_Handler *handler)
00168 :handler_ (handler),
00169 addr_ (0)
00170 {
00171 }
00172
00173 TAO_AV_UDP_Transport::~TAO_AV_UDP_Transport (void)
00174 {
00175 }
00176
00177 int
00178 TAO_AV_UDP_Transport::set_remote_address (const ACE_INET_Addr &address)
00179 {
00180 this->peer_addr_ = address;
00181 return 0;
00182 }
00183
00184 int
00185 TAO_AV_UDP_Transport::open (ACE_Addr * )
00186 {
00187 return 0;
00188 }
00189
00190 int
00191 TAO_AV_UDP_Transport::close (void)
00192 {
00193 return 0;
00194 }
00195
00196 int
00197 TAO_AV_UDP_Transport::mtu (void)
00198 {
00199 return 65535;
00200 }
00201
00202 ACE_Addr*
00203 TAO_AV_UDP_Transport::get_peer_addr (void)
00204 {
00205 return &this->peer_addr_;
00206 }
00207
00208 ssize_t
00209 TAO_AV_UDP_Transport::send (const ACE_Message_Block *mblk, ACE_Time_Value *)
00210 {
00211
00212
00213
00214 iovec iov[ACE_IOV_MAX];
00215 int iovcnt = 0;
00216 ssize_t n = 0;
00217 ssize_t nbytes = 0;
00218
00219 for (const ACE_Message_Block *i = mblk;
00220 i != 0;
00221 i = i->cont ())
00222 {
00223
00224 if (i->length () > 0)
00225 {
00226 iov[iovcnt].iov_base = i->rd_ptr ();
00227 iov[iovcnt].iov_len = static_cast<u_long> (i->length ());
00228 iovcnt++;
00229
00230
00231
00232
00233
00234
00235
00236 if (iovcnt == ACE_IOV_MAX)
00237 {
00238 n = this->handler_->get_socket ()->send ((const iovec *) iov,
00239 iovcnt,
00240 this->peer_addr_);
00241
00242 if (n < 1)
00243 return n;
00244
00245 nbytes += n;
00246 iovcnt = 0;
00247 }
00248 }
00249 }
00250
00251
00252 if (iovcnt != 0)
00253 {
00254 n = this->handler_->get_socket ()->send ((const iovec *) iov,
00255 iovcnt,
00256 this->peer_addr_);
00257
00258 if (n < 1)
00259 return n;
00260
00261 nbytes += n;
00262 }
00263
00264 return nbytes;
00265 }
00266
00267 ssize_t
00268 TAO_AV_UDP_Transport::send (const char *buf,
00269 size_t len,
00270 ACE_Time_Value *)
00271 {
00272 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Transport::send "));
00273 ACE_TCHAR addr [BUFSIZ];
00274 this->peer_addr_.addr_to_string (addr,BUFSIZ);
00275 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"to %s\n",addr));
00276
00277 return this->handler_->get_socket ()->send (buf, len,this->peer_addr_);
00278 }
00279
00280 ssize_t
00281 TAO_AV_UDP_Transport::send (const iovec *iov,
00282 int iovcnt,
00283 ACE_Time_Value *)
00284 {
00285 return this->handler_->get_socket ()->send ((const iovec *) iov,
00286 iovcnt,
00287 this->peer_addr_);
00288
00289 }
00290
00291 ssize_t
00292 TAO_AV_UDP_Transport::recv (char *buf,
00293 size_t len,
00294 ACE_Time_Value *)
00295 {
00296 return this->handler_->get_socket ()->recv (buf, len,this->peer_addr_);
00297 }
00298
00299 ssize_t
00300 TAO_AV_UDP_Transport::recv (char *buf,
00301 size_t len,
00302 int flags,
00303 ACE_Time_Value *timeout)
00304 {
00305 return this->handler_->get_socket ()->recv (buf,
00306 len,
00307 this->peer_addr_,
00308 flags,
00309 timeout);
00310 }
00311
00312 ssize_t
00313 TAO_AV_UDP_Transport::recv (iovec *iov,
00314 int ,
00315 ACE_Time_Value *timeout)
00316 {
00317 return handler_->get_socket ()->recv (iov,this->peer_addr_,0,timeout);
00318 }
00319
00320
00321
00322
00323
00324
00325 TAO_AV_UDP_Acceptor::TAO_AV_UDP_Acceptor (void)
00326 : address_ (0),
00327 control_inet_address_ (0)
00328 {
00329 }
00330
00331 TAO_AV_UDP_Acceptor::~TAO_AV_UDP_Acceptor (void)
00332 {
00333 if (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)
00334 delete this->entry_->control_handler ();
00335
00336 delete this->address_;
00337 delete this->control_inet_address_;
00338 }
00339
00340 int
00341 TAO_AV_UDP_Acceptor::activate_svc_handler (TAO_AV_Flow_Handler *handler)
00342 {
00343 ACE_Event_Handler *event_handler = handler->event_handler ();
00344 int result = this->av_core_->reactor ()->register_handler (event_handler,
00345 ACE_Event_Handler::READ_MASK);
00346
00347 if (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)
00348 handler->schedule_timer ();
00349
00350 return result;
00351 }
00352
00353 int
00354 TAO_AV_UDP_Acceptor::open (TAO_Base_StreamEndPoint *endpoint,
00355 TAO_AV_Core *av_core,
00356 TAO_FlowSpec_Entry *entry,
00357 TAO_AV_Flow_Protocol_Factory *factory,
00358 TAO_AV_Core::Flow_Component flow_comp)
00359 {
00360 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Acceptor::open\n"));
00361 this->av_core_ = av_core;
00362 this->endpoint_ = endpoint;
00363 this->entry_ = entry;
00364 this->flow_component_ = flow_comp;
00365 this->flow_protocol_factory_ = factory;
00366 ACE_INET_Addr *inet_addr;
00367 if (flow_comp == TAO_AV_Core::TAO_AV_CONTROL)
00368 {
00369 this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname ());
00370 inet_addr = (ACE_INET_Addr *) entry->control_address ();
00371 }
00372 else
00373 {
00374 this->flowname_ = entry->flowname ();
00375 inet_addr = (ACE_INET_Addr *) entry->address ();
00376 }
00377
00378 if (inet_addr != 0)
00379 {
00380 ACE_TCHAR buf[BUFSIZ];
00381 inet_addr->addr_to_string (buf, BUFSIZ);
00382
00383 if (TAO_debug_level > 0)
00384 ACE_DEBUG ((LM_DEBUG,
00385 "TAO_AV_UDP_Acceptor::open: %s\n",
00386 buf));
00387 }
00388
00389 int result = this->open_i (inet_addr, 0);
00390
00391 if (result < 0)
00392 return result;
00393 return 0;
00394 }
00395
00396 int
00397 TAO_AV_UDP_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint,
00398 TAO_AV_Core *av_core,
00399 TAO_FlowSpec_Entry *entry,
00400 TAO_AV_Flow_Protocol_Factory *factory,
00401 TAO_AV_Core::Flow_Component flow_comp)
00402 {
00403 this->av_core_ = av_core;
00404 this->endpoint_ = endpoint;
00405 this->entry_ = entry;
00406 this->flow_component_ = flow_comp;
00407 this->flow_protocol_factory_ = factory;
00408 if (flow_comp == TAO_AV_Core::TAO_AV_CONTROL)
00409 {
00410 this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname ());
00411 }
00412 else
00413 {
00414 this->flowname_ = entry->flowname ();
00415 ACE_NEW_RETURN (this->address_,
00416 ACE_INET_Addr ("0"),
00417 -1);
00418 }
00419
00420 int result = this->open_i (this->address_, 1);
00421 if (result < 0)
00422 return result;
00423
00424 return 0;
00425 }
00426
00427 int
00428 TAO_AV_UDP_Acceptor::open_i (ACE_INET_Addr *inet_addr,
00429 int is_default_addr)
00430 {
00431 int result = -1;
00432 ACE_INET_Addr *local_addr = 0;
00433
00434 TAO_AV_Flow_Handler *flow_handler = 0;
00435
00436
00437
00438 if (is_default_addr &&
00439 (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL) &&
00440 (ACE_OS::strcasecmp(this->entry_->flow_protocol_str (), "RTP") == 0))
00441 {
00442 flow_handler = this->entry_->control_handler ();
00443
00444 local_addr = dynamic_cast<ACE_INET_Addr*> (this->entry_->get_local_control_addr ());
00445 }
00446 else
00447 {
00448
00449
00450 int get_new_port = 1;
00451
00452 while (get_new_port)
00453 {
00454
00455 get_new_port = 0;
00456
00457 result = TAO_AV_UDP_Connection_Setup::setup (flow_handler,
00458 inet_addr,
00459 local_addr,
00460 this->entry_->is_multicast (),
00461 TAO_AV_UDP_Connection_Setup::ACCEPTOR);
00462
00463 if (result < 0)
00464 {
00465 ACE_DEBUG((LM_DEBUG,"(%N,%l) Error during connection setup: %d\n", result));
00466 }
00467
00468 local_addr->set (local_addr->get_port_number (),
00469 local_addr->get_host_name ());
00470
00471 if (is_default_addr)
00472 {
00473 if ((ACE_OS::strcasecmp(this->entry_->flow_protocol_str (), "RTP") == 0) &&
00474 (this->flow_component_ == TAO_AV_Core::TAO_AV_DATA))
00475 {
00476 if (is_default_addr && local_addr->get_port_number ()%2 != 0)
00477 {
00478
00479 delete local_addr;
00480 local_addr = 0;
00481 delete flow_handler;
00482 get_new_port = 1;
00483 }
00484 else
00485 {
00486 ACE_INET_Addr *local_control_addr = 0;
00487 TAO_AV_Flow_Handler *control_flow_handler = 0;
00488
00489 ACE_NEW_RETURN (this->control_inet_address_,
00490 ACE_INET_Addr ("0"),
00491 -1);
00492
00493 TAO_AV_UDP_Connection_Setup::setup(control_flow_handler,
00494 this->control_inet_address_,
00495 local_control_addr,
00496 this->entry_->is_multicast (),
00497 TAO_AV_UDP_Connection_Setup::ACCEPTOR);
00498
00499 if (local_control_addr->get_port_number () !=
00500 local_addr->get_port_number () +1)
00501 {
00502 delete this->control_inet_address_;
00503 delete local_addr;
00504 local_addr = 0;
00505 delete flow_handler;
00506 delete local_control_addr;
00507 delete control_flow_handler;
00508 get_new_port = 1;
00509 }
00510 else
00511 {
00512 this->entry_->control_address (this->control_inet_address_);
00513 this->entry_->set_local_control_addr (local_control_addr);
00514 this->entry_->control_handler (control_flow_handler);
00515 }
00516 }
00517 }
00518 }
00519 }
00520 }
00521
00522 TAO_AV_Protocol_Object *object =
00523 this->flow_protocol_factory_->make_protocol_object (this->entry_,
00524 this->endpoint_,
00525 flow_handler,
00526 flow_handler->transport ());
00527 flow_handler->protocol_object (object);
00528
00529 if (this->flow_component_ == TAO_AV_Core::TAO_AV_DATA)
00530 {
00531 this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler);
00532
00533 this->entry_->protocol_object (object);
00534 this->entry_->set_local_addr (local_addr);
00535 this->entry_->handler (flow_handler);
00536
00537 this->entry_->address (local_addr);
00538 }
00539 else
00540 {
00541 this->endpoint_->set_control_flow_handler (this->flowname_.c_str (),flow_handler);
00542
00543 this->entry_->control_protocol_object (object);
00544 this->entry_->set_local_control_addr (local_addr);
00545 this->entry_->control_handler (flow_handler);
00546 }
00547
00548 if (local_addr != 0)
00549 {
00550 ACE_TCHAR buf[BUFSIZ];
00551 local_addr->addr_to_string (buf,BUFSIZ);
00552 if (TAO_debug_level > 0)
00553 ACE_DEBUG ((LM_DEBUG,
00554 "TAO_AV_UDP_ACCEPTOR::open:%s\n",
00555 buf));
00556 }
00557
00558
00559 return this->activate_svc_handler (flow_handler);
00560 }
00561
00562 int
00563 TAO_AV_UDP_Acceptor::close (void)
00564 {
00565 return 0;
00566 }
00567
00568
00569
00570
00571 TAO_AV_UDP_Connector::TAO_AV_UDP_Connector (void)
00572 : control_inet_address_ (0)
00573 {
00574 }
00575
00576 TAO_AV_UDP_Connector::~TAO_AV_UDP_Connector (void)
00577 {
00578 if (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)
00579 {
00580 delete this->entry_->control_handler ();
00581 }
00582
00583 if (this->control_inet_address_ != 0)
00584 delete this->control_inet_address_;
00585 }
00586
00587 int
00588 TAO_AV_UDP_Connector::open (TAO_Base_StreamEndPoint *endpoint,
00589 TAO_AV_Core *av_core,
00590 TAO_AV_Flow_Protocol_Factory *factory)
00591
00592 {
00593 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Connector::open "));
00594 this->endpoint_ = endpoint;
00595 this->av_core_ = av_core;
00596 this->flow_protocol_factory_ = factory;
00597 return 0;
00598 }
00599
00600 int
00601 TAO_AV_UDP_Connector::connect (TAO_FlowSpec_Entry *entry,
00602 TAO_AV_Transport *&transport,
00603 TAO_AV_Core::Flow_Component flow_component)
00604 {
00605 ACE_INET_Addr *local_addr = 0;
00606 ACE_INET_Addr *control_inet_addr = 0;
00607
00608 this->entry_ = entry;
00609 this->flow_component_ = flow_component;
00610
00611 ACE_INET_Addr *inet_addr;
00612
00613 if (flow_component == TAO_AV_Core::TAO_AV_CONTROL)
00614 {
00615 this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname());
00616 inet_addr = dynamic_cast<ACE_INET_Addr*> (entry->control_address ());
00617 }
00618 else
00619 {
00620 this->flowname_ = entry->flowname ();
00621 inet_addr = dynamic_cast<ACE_INET_Addr*> (entry->address ());
00622 control_inet_addr = dynamic_cast<ACE_INET_Addr*> (entry->control_address ());
00623 }
00624
00625 TAO_AV_Flow_Handler *flow_handler = 0;
00626
00627
00628
00629 if ((flow_component == TAO_AV_Core::TAO_AV_CONTROL) &&
00630 (ACE_OS::strcasecmp(this->entry_->flow_protocol_str (), "RTP") == 0) &&
00631 !entry->is_multicast ())
00632 {
00633 flow_handler = this->entry_->control_handler ();
00634 flow_handler->set_remote_address (inet_addr);
00635
00636 local_addr = dynamic_cast<ACE_INET_Addr*> (this->entry_->get_local_control_addr ());
00637 }
00638 else
00639 {
00640
00641
00642 int get_new_port = 1;
00643
00644 while (get_new_port)
00645 {
00646
00647 get_new_port = 0;
00648
00649 ACE_Addr *addr;
00650 if ((addr = entry->get_peer_addr ()) != 0)
00651 {
00652 local_addr = dynamic_cast<ACE_INET_Addr*> (addr);
00653 ACE_TCHAR buf [BUFSIZ];
00654 local_addr->addr_to_string (buf, BUFSIZ);
00655 }
00656
00657 TAO_AV_UDP_Connection_Setup::setup (flow_handler,
00658 inet_addr,
00659 local_addr,
00660 entry->is_multicast (),
00661 TAO_AV_UDP_Connection_Setup::CONNECTOR);
00662
00663 if ((ACE_OS::strcasecmp(this->entry_->flow_protocol_str (), "RTP") == 0) &&
00664 (flow_component == TAO_AV_Core::TAO_AV_DATA) &&
00665 !entry->is_multicast ())
00666 {
00667 if (local_addr->get_port_number ()%2 != 0)
00668 {
00669
00670 delete local_addr;
00671 local_addr = 0;
00672 delete flow_handler;
00673 get_new_port = 1;
00674 }
00675 else
00676 {
00677 ACE_INET_Addr *local_control_addr = 0;
00678 TAO_AV_Flow_Handler *control_flow_handler = 0;
00679
00680 if (entry->is_multicast ())
00681 control_inet_addr = dynamic_cast<ACE_INET_Addr*> (entry->control_address ()) ;
00682 else
00683 {
00684
00685 if (local_addr != 0)
00686 {
00687 ACE_TCHAR buf [BUFSIZ];
00688 ACE_TString addr_str (ACE_TEXT_CHAR_TO_TCHAR(local_addr->get_host_name ()));
00689 addr_str += ACE_TEXT(":");
00690 addr_str += ACE_OS::itoa (local_addr->get_port_number () + 1, buf, 10);
00691 ACE_NEW_RETURN (local_control_addr,
00692 ACE_INET_Addr (addr_str.c_str ()),
00693 -1);
00694 local_control_addr->addr_to_string (buf, BUFSIZ);
00695 }
00696
00697
00698 if (entry->control_address () == 0)
00699 ACE_NEW_RETURN (this->control_inet_address_,
00700 ACE_INET_Addr ("0"),
00701 -1);
00702 else
00703 control_inet_address_ = dynamic_cast<ACE_INET_Addr*> (entry->control_address ());
00704 }
00705
00706 TAO_AV_UDP_Connection_Setup::setup (control_flow_handler,
00707 control_inet_addr,
00708 local_control_addr,
00709 entry->is_multicast (),
00710 TAO_AV_UDP_Connection_Setup::CONNECTOR);
00711
00712 if (local_control_addr->get_port_number () !=
00713 local_addr->get_port_number () +1)
00714 {
00715 delete local_addr;
00716 local_addr = 0;
00717 delete flow_handler;
00718 delete local_control_addr;
00719 delete control_flow_handler;
00720 get_new_port = 1;
00721 }
00722 else
00723 {
00724 this->entry_->set_local_control_addr (local_control_addr);
00725 this->entry_->control_handler (control_flow_handler);
00726 }
00727 }
00728 }
00729 }
00730 }
00731
00732 TAO_AV_Protocol_Object *object =
00733 this->flow_protocol_factory_->make_protocol_object (this->entry_,
00734 this->endpoint_,
00735 flow_handler,
00736 flow_handler->transport ());
00737
00738 flow_handler->protocol_object (object);
00739
00740 if (flow_component == TAO_AV_Core::TAO_AV_DATA)
00741 {
00742 this->endpoint_->set_flow_handler (this->flowname_.c_str (),
00743 flow_handler);
00744 this->entry_->protocol_object (object);
00745 entry->set_local_addr (local_addr);
00746 entry->handler (flow_handler);
00747 transport = flow_handler->transport ();
00748 }
00749 else
00750 {
00751 this->endpoint_->set_control_flow_handler (this->flowname_.c_str (),
00752 flow_handler);
00753 this->entry_->control_protocol_object (object);
00754 entry->set_local_control_addr (local_addr);
00755 entry->control_handler (flow_handler);
00756 transport = flow_handler->transport ();
00757 }
00758
00759 if (local_addr != 0)
00760 {
00761 ACE_TCHAR buf[BUFSIZ];
00762 local_addr->addr_to_string (buf,BUFSIZ);
00763
00764 if (TAO_debug_level > 0)
00765 ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_CONNECTOR::connect:%s\n",buf));
00766 }
00767
00768
00769 return this->activate_svc_handler (flow_handler);
00770 }
00771
00772 int
00773 TAO_AV_UDP_Connector::activate_svc_handler (TAO_AV_Flow_Handler *handler)
00774 {
00775 ACE_Event_Handler *event_handler = handler->event_handler ();
00776 int result = this->av_core_->reactor ()->register_handler (event_handler,
00777 ACE_Event_Handler::READ_MASK);
00778
00779 if (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)
00780 handler->schedule_timer ();
00781
00782 return result;
00783 }
00784
00785 int
00786 TAO_AV_UDP_Connector::close (void)
00787 {
00788 return 0;
00789 }
00790
00791
00792
00793
00794
00795 int
00796 TAO_AV_UDP_Connection_Setup::setup (TAO_AV_Flow_Handler *&flow_handler,
00797 ACE_INET_Addr *inet_addr,
00798 ACE_INET_Addr *&local_addr,
00799 int is_multicast,
00800 ConnectionType ct)
00801 {
00802 int result;
00803
00804 if (is_multicast)
00805 {
00806 TAO_AV_UDP_MCast_Flow_Handler *handler;
00807 ACE_NEW_RETURN (handler,
00808 TAO_AV_UDP_MCast_Flow_Handler,
00809 -1);
00810
00811 flow_handler = handler;
00812
00813 result = handler->get_mcast_socket ()->join (*inet_addr);
00814 if (result < 0)
00815 ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_UDP_MCast_connector::open failed\n"),-1);
00816
00817
00818
00819 #if defined (ACE_HAS_IP_MULTICAST)
00820 if (handler->get_mcast_socket ()->set_option (IP_MULTICAST_LOOP,
00821 0) < 0)
00822 if (TAO_debug_level > 0)
00823 ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_MCast_Acceptor::multicast loop disable failed\n"));
00824
00825 #endif
00826
00827 int bufsize = 80 * 1024;
00828 if (handler->get_mcast_socket ()->ACE_SOCK::set_option (SOL_SOCKET,
00829 SO_RCVBUF,
00830 (char *)&bufsize,
00831 sizeof(bufsize)) < 0)
00832 {
00833 bufsize = 32 * 1024;
00834 if (handler->get_mcast_socket ()->ACE_SOCK::set_option (SOL_SOCKET,
00835 SO_RCVBUF,
00836 (char *)&bufsize,
00837 sizeof(bufsize)) < 0)
00838 ACE_OS::perror("SO_RCVBUF");
00839 }
00840 ACE_NEW_RETURN (local_addr,
00841 ACE_INET_Addr ("0"),
00842 -1);
00843
00844 if (ct == ACCEPTOR)
00845 {
00846 result = handler->get_mcast_socket ()->get_local_addr (*local_addr);
00847 if (result < 0)
00848 ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Dgram_Connector::open: get_local_addr failed\n"),result);
00849
00850 local_addr->set (local_addr->get_port_number (),
00851 local_addr->get_host_name ());
00852 handler->set_peer_addr (local_addr);
00853 }
00854 }
00855 else
00856 {
00857 if (local_addr == 0)
00858 ACE_NEW_RETURN (local_addr,
00859 ACE_INET_Addr ("0"),
00860 -1);
00861
00862 TAO_AV_UDP_Flow_Handler *handler;
00863 ACE_NEW_RETURN (handler,
00864 TAO_AV_UDP_Flow_Handler,
00865 -1);
00866
00867 flow_handler = handler;
00868
00869 if (ct == ACCEPTOR)
00870 result = handler->open (*inet_addr);
00871 else
00872 result = handler->open (*local_addr);
00873 if (result < 0)
00874 ACE_ERROR_RETURN ((LM_ERROR,"handler::open failed\n"),-1);
00875
00876
00877 int sndbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00878 int rcvbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00879
00880 if (handler->get_socket ()->set_option (SOL_SOCKET,
00881 SO_SNDBUF,
00882 (void *) &sndbufsize,
00883 sizeof (sndbufsize)) == -1
00884 && errno != ENOTSUP)
00885 return 0;
00886 else if (handler->get_socket ()->set_option (SOL_SOCKET,
00887 SO_RCVBUF,
00888 (void *) &rcvbufsize,
00889 sizeof (rcvbufsize)) == -1
00890 && errno != ENOTSUP)
00891 return 0;
00892
00893 if (ct == CONNECTOR)
00894 handler->set_remote_address (inet_addr);
00895
00896 result = handler->get_socket ()->get_local_addr (*local_addr);
00897
00898 local_addr->set (local_addr->get_port_number (),
00899 local_addr->get_host_name ());
00900
00901 ACE_TCHAR buf [BUFSIZ];
00902 local_addr->addr_to_string (buf, BUFSIZ);
00903
00904 if (result < 0)
00905 ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Dgram_Connector::open: get_local_addr failed\n"),result);
00906 }
00907
00908 return 1;
00909 }
00910
00911
00912
00913
00914
00915 TAO_AV_UDP_Factory::TAO_AV_UDP_Factory (void)
00916 {
00917 }
00918
00919 TAO_AV_UDP_Factory::~TAO_AV_UDP_Factory (void)
00920 {
00921 }
00922
00923 int
00924 TAO_AV_UDP_Factory::match_protocol (const char *protocol_string)
00925 {
00926 if (ACE_OS::strcasecmp (protocol_string,"UDP") == 0)
00927 return 1;
00928 if (ACE_OS::strcasecmp (protocol_string,"RTP/UDP") == 0)
00929 return 1;
00930 return 0;
00931 }
00932
00933 TAO_AV_Acceptor*
00934 TAO_AV_UDP_Factory::make_acceptor (void)
00935 {
00936 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Factory::make_acceptor\n"));
00937 TAO_AV_Acceptor *acceptor = 0;
00938 ACE_NEW_RETURN (acceptor,
00939 TAO_AV_UDP_Acceptor,
00940 0);
00941 return acceptor;
00942 }
00943
00944 TAO_AV_Connector*
00945 TAO_AV_UDP_Factory::make_connector (void)
00946 {
00947 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Factory::make_connector\n"));
00948 TAO_AV_Connector *connector = 0;
00949 ACE_NEW_RETURN (connector,
00950 TAO_AV_UDP_Connector,
00951 0);
00952 return connector;
00953 }
00954
00955 int
00956 TAO_AV_UDP_Factory::init (int ,
00957 ACE_TCHAR * [])
00958 {
00959 return 0;
00960 }
00961
00962
00963
00964
00965
00966 int
00967 TAO_AV_UDP_Object::handle_input (void)
00968 {
00969 int n = this->transport_->recv (this->frame_.rd_ptr (),
00970 this->frame_.size ());
00971 if (n == -1)
00972 ACE_ERROR_RETURN ((LM_ERROR,"(%N,%l) TAO_AV_UDP_Flow_Handler::handle_input recv failed: errno: %m\n"),-1);
00973
00974 this->frame_.wr_ptr (this->frame_.rd_ptr () + n);
00975
00976 return this->callback_->receive_frame (&this->frame_);
00977 }
00978
00979 int
00980 TAO_AV_UDP_Object::send_frame (ACE_Message_Block *frame,
00981 TAO_AV_frame_info * )
00982 {
00983 if (TAO_debug_level > 0)
00984 ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Object::send_frame\n"));
00985 int const result = this->transport_->send (frame);
00986 if (result < 0)
00987 return result;
00988 return 0;
00989 }
00990
00991 int
00992 TAO_AV_UDP_Object::send_frame (const iovec *iov,
00993 int iovcnt,
00994 TAO_AV_frame_info * )
00995 {
00996 int const result = this->transport_->send (iov,iovcnt);
00997 if (result < 0)
00998 return result;
00999 return 0;
01000 }
01001
01002 int
01003 TAO_AV_UDP_Object::send_frame (const char*buf,
01004 size_t len)
01005 {
01006 int const result = this->transport_->send (buf, len, 0);
01007 if (result < 0)
01008 return result;
01009 return 0;
01010 }
01011
01012 TAO_AV_UDP_Object::TAO_AV_UDP_Object (TAO_AV_Callback *callback,
01013 TAO_AV_Transport *transport)
01014 :TAO_AV_Protocol_Object (callback,transport)
01015 {
01016 this->frame_.size (this->transport_->mtu ());
01017 }
01018
01019 TAO_AV_UDP_Object::~TAO_AV_UDP_Object (void)
01020 {
01021
01022 }
01023
01024 int
01025 TAO_AV_UDP_Object::destroy (void)
01026 {
01027 this->callback_->handle_destroy ();
01028 delete this;
01029
01030 return 0;
01031 }
01032
01033
01034
01035
01036
01037 TAO_AV_UDP_Flow_Factory::TAO_AV_UDP_Flow_Factory (void)
01038 {
01039 }
01040
01041 TAO_AV_UDP_Flow_Factory::~TAO_AV_UDP_Flow_Factory (void)
01042 {
01043 }
01044
01045 int
01046 TAO_AV_UDP_Flow_Factory::init (int ,
01047 ACE_TCHAR * [])
01048 {
01049 return 0;
01050 }
01051
01052 int
01053 TAO_AV_UDP_Flow_Factory::match_protocol (const char *flow_string)
01054 {
01055 if (ACE_OS::strcasecmp (flow_string,"UDP") == 0)
01056 return 1;
01057 return 0;
01058 }
01059
01060 TAO_AV_Protocol_Object*
01061 TAO_AV_UDP_Flow_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry,
01062 TAO_Base_StreamEndPoint *endpoint,
01063 TAO_AV_Flow_Handler *handler,
01064 TAO_AV_Transport *transport)
01065 {
01066 TAO_AV_Callback *callback = 0;
01067 if( endpoint->get_callback (entry->flowname (), callback) ) {
01068 ACE_ERROR_RETURN ((LM_ERROR, "(%N,%l) Invalid callback\n"), 0);
01069 }
01070
01071 TAO_AV_UDP_Object *object = 0;
01072 ACE_NEW_RETURN (object,
01073 TAO_AV_UDP_Object (callback,
01074 transport),
01075 0);
01076 callback->open (object,
01077 handler);
01078 endpoint->set_protocol_object (entry->flowname (),
01079 object);
01080
01081 endpoint->protocol_object_set ();
01082
01083 return object;
01084 }
01085
01086 TAO_END_VERSIONED_NAMESPACE_DECL
01087
01088 ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_UDP_Flow_Factory)
01089 ACE_STATIC_SVC_DEFINE (TAO_AV_UDP_Flow_Factory,
01090 ACE_TEXT ("UDP_Flow_Factory"),
01091 ACE_SVC_OBJ_T,
01092 &ACE_SVC_NAME (TAO_AV_UDP_Flow_Factory),
01093 ACE_Service_Type::DELETE_THIS |
01094 ACE_Service_Type::DELETE_OBJ,
01095 0)
01096
01097 ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_UDP_Factory)
01098 ACE_STATIC_SVC_DEFINE (TAO_AV_UDP_Factory,
01099 ACE_TEXT ("UDP_Factory"),
01100 ACE_SVC_OBJ_T,
01101 &ACE_SVC_NAME (TAO_AV_UDP_Factory),
01102 ACE_Service_Type::DELETE_THIS |
01103 ACE_Service_Type::DELETE_OBJ,
01104 0)