00001
00002
00003 #include "orbsvcs/AV/UDP.h"
00004 #include "orbsvcs/AV/AVStreams_i.h"
00005 #include "orbsvcs/AV/MCast.h"
00006
00007 #include "tao/debug.h"
00008 #include "ace/OS_NS_strings.h"
00009
00010 #if !defined (__ACE_INLINE__)
00011 #include "orbsvcs/AV/UDP.i"
00012 #endif
00013
00014 ACE_RCSID (AV,
00015 UDP,
00016 "UDP.cpp,v 5.35 2006/03/14 06:14:24 jtc Exp")
00017
00018 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00019
00020
00021
00022
00023
00024 TAO_AV_UDP_Flow_Handler::TAO_AV_UDP_Flow_Handler (void)
00025 {
00026 ACE_NEW (this->transport_,
00027 TAO_AV_UDP_Transport (this));
00028 }
00029
00030 TAO_AV_UDP_Flow_Handler::~TAO_AV_UDP_Flow_Handler (void)
00031 {
00032
00033 TAO_AV_CORE::instance()->reactor ()->remove_handler (this->event_handler(),
00034 ACE_Event_Handler::READ_MASK);
00035
00036
00037 this->close ();
00038 delete this->transport_;
00039 }
00040
00041 TAO_AV_Transport *
00042 TAO_AV_UDP_Flow_Handler::transport (void)
00043 {
00044 return this->transport_;
00045 }
00046
00047 int
00048 TAO_AV_UDP_Flow_Handler::handle_input (ACE_HANDLE )
00049 {
00050 return this->protocol_object_->handle_input ();
00051 }
00052
00053 int
00054 TAO_AV_UDP_Flow_Handler::handle_timeout (const ACE_Time_Value &tv,
00055 const void *arg)
00056 {
00057 return TAO_AV_Flow_Handler::handle_timeout (tv,arg);
00058 }
00059
00060 int
00061 TAO_AV_UDP_Flow_Handler::set_remote_address (ACE_Addr *address)
00062 {
00063 if (TAO_debug_level > 0)
00064 ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Flow_Handler::set_remote_address\n"));
00065
00066 ACE_INET_Addr *inet_addr = dynamic_cast<ACE_INET_Addr*> (address);
00067 this->peer_addr_ = *inet_addr;
00068 TAO_AV_UDP_Transport *transport = dynamic_cast<TAO_AV_UDP_Transport*> (this->transport_);
00069
00070 return transport->set_remote_address (*inet_addr);
00071 }
00072
00073
00074 ACE_HANDLE
00075 TAO_AV_UDP_Flow_Handler::get_handle (void) const
00076 {
00077 if (TAO_debug_level > 0)
00078 ACE_DEBUG ((LM_DEBUG,
00079 "TAO_AV_UDP_Flow_Handler::get_handle:%d\n",
00080 this->sock_dgram_.get_handle ()));
00081
00082 return this->sock_dgram_.get_handle () ;
00083 }
00084
00085 int
00086 TAO_AV_UDP_Flow_Handler::change_qos(AVStreams::QoS qos)
00087 {
00088 if( TAO_debug_level > 0 )
00089 {
00090 ACE_DEBUG ((LM_DEBUG,
00091 "(%N,%l) TAO_AV_UDP_Flow_Handler::change_qos\n"));
00092 }
00093
00094 unsigned int i=0;
00095
00096 int ret = 0;
00097 CORBA::Long dscp = 0;
00098 CORBA::Long ecn = 0;
00099 int dscp_flag=0;
00100 for(i=0; i < qos.QoSParams.length(); i++)
00101 {
00102
00103 if( ACE_OS::strcmp( qos.QoSParams[i].property_name.in(), "Diffserv_Codepoint") == 0)
00104 {
00105 qos.QoSParams[i].property_value >>= dscp;
00106 dscp_flag=1;
00107
00108 if(!((dscp >= 0) && (dscp <= 63)))
00109 {
00110 dscp_flag = 0;
00111 ACE_DEBUG((LM_DEBUG, "(%N,%l) ECN value can only be (0-3) not %d\n", ecn));
00112 return -1;
00113 }
00114 }
00115
00116 if( ACE_OS::strcmp( qos.QoSParams[i].property_name.in(), "ECN") == 0)
00117 {
00118 qos.QoSParams[i].property_value >>= ecn;
00119
00120
00121 if(!((ecn >= 0) && (ecn <= 3)))
00122 {
00123 ACE_DEBUG((LM_DEBUG, "(%N,%l) ECN value can only be (0-3) not %d\n", ecn));
00124 ecn = 0;
00125 }
00126
00127 }
00128 }
00129
00130
00131
00132 if(dscp_flag || ecn)
00133 {
00134 int tos;
00135 tos = (int)(dscp << 2);
00136 if(ecn)
00137 {
00138 tos |= ecn;
00139 }
00140 ret = sock_dgram_.set_option(IPPROTO_IP, IP_TOS, (int *)&tos , (int)sizeof(tos));
00141
00142 if(TAO_debug_level > 1)
00143 {
00144 ACE_DEBUG((LM_DEBUG, "(%N,%l) set tos: ret: %d\n", ret));
00145 }
00146 }
00147
00148 if(TAO_debug_level > 1)
00149 {
00150 if(ret < 0 )
00151 {
00152 ACE_DEBUG((LM_DEBUG, "(%N,%l) errno: %p\n"));
00153 }
00154 }
00155 return ret;
00156 }
00157
00158
00159
00160
00161
00162 TAO_AV_UDP_Transport::TAO_AV_UDP_Transport (void)
00163 :handler_ (0)
00164 {
00165 }
00166
00167 TAO_AV_UDP_Transport::TAO_AV_UDP_Transport (TAO_AV_UDP_Flow_Handler *handler)
00168 :handler_ (handler),
00169 addr_ (0)
00170 {
00171 }
00172
00173 TAO_AV_UDP_Transport::~TAO_AV_UDP_Transport (void)
00174 {
00175 }
00176
00177 int
00178 TAO_AV_UDP_Transport::set_remote_address (const ACE_INET_Addr &address)
00179 {
00180 this->peer_addr_ = address;
00181 return 0;
00182 }
00183
00184 int
00185 TAO_AV_UDP_Transport::open (ACE_Addr * )
00186 {
00187 return 0;
00188 }
00189
00190 int
00191 TAO_AV_UDP_Transport::close (void)
00192 {
00193 return 0;
00194 }
00195
00196 int
00197 TAO_AV_UDP_Transport::mtu (void)
00198 {
00199 return 65535;
00200 }
00201
00202 ACE_Addr*
00203 TAO_AV_UDP_Transport::get_peer_addr (void)
00204 {
00205 return &this->peer_addr_;
00206 }
00207
00208 ssize_t
00209 TAO_AV_UDP_Transport::send (const ACE_Message_Block *mblk, ACE_Time_Value *)
00210 {
00211
00212
00213
00214 iovec iov[ACE_IOV_MAX];
00215 int iovcnt = 0;
00216 ssize_t n = 0;
00217 ssize_t nbytes = 0;
00218
00219 for (const ACE_Message_Block *i = mblk;
00220 i != 0;
00221 i = i->cont ())
00222 {
00223
00224 if (i->length () > 0)
00225 {
00226 iov[iovcnt].iov_base = i->rd_ptr ();
00227 iov[iovcnt].iov_len = static_cast<u_long> (i->length ());
00228 iovcnt++;
00229
00230
00231
00232
00233
00234
00235
00236 if (iovcnt == ACE_IOV_MAX)
00237 {
00238 n = this->handler_->get_socket ()->send ((const iovec *) iov,
00239 iovcnt,
00240 this->peer_addr_);
00241
00242 if (n < 1)
00243 return n;
00244
00245 nbytes += n;
00246 iovcnt = 0;
00247 }
00248 }
00249 }
00250
00251
00252 if (iovcnt != 0)
00253 {
00254 n = this->handler_->get_socket ()->send ((const iovec *) iov,
00255 iovcnt,
00256 this->peer_addr_);
00257
00258 if (n < 1)
00259 return n;
00260
00261 nbytes += n;
00262 }
00263
00264 return nbytes;
00265 }
00266
00267 ssize_t
00268 TAO_AV_UDP_Transport::send (const char *buf,
00269 size_t len,
00270 ACE_Time_Value *)
00271 {
00272 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Transport::send "));
00273 char addr [BUFSIZ];
00274 this->peer_addr_.addr_to_string (addr,BUFSIZ);
00275 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"to %s\n",addr));
00276
00277 return this->handler_->get_socket ()->send (buf, len,this->peer_addr_);
00278 }
00279
00280 ssize_t
00281 TAO_AV_UDP_Transport::send (const iovec *iov,
00282 int iovcnt,
00283 ACE_Time_Value *)
00284 {
00285 return this->handler_->get_socket ()->send ((const iovec *) iov,
00286 iovcnt,
00287 this->peer_addr_);
00288
00289 }
00290
00291 ssize_t
00292 TAO_AV_UDP_Transport::recv (char *buf,
00293 size_t len,
00294 ACE_Time_Value *)
00295 {
00296 return this->handler_->get_socket ()->recv (buf, len,this->peer_addr_);
00297 }
00298
00299 ssize_t
00300 TAO_AV_UDP_Transport::recv (char *buf,
00301 size_t len,
00302 int flags,
00303 ACE_Time_Value *timeout)
00304 {
00305 return this->handler_->get_socket ()->recv (buf,
00306 len,
00307 this->peer_addr_,
00308 flags,
00309 timeout);
00310 }
00311
00312 ssize_t
00313 TAO_AV_UDP_Transport::recv (iovec *iov,
00314 int ,
00315 ACE_Time_Value *timeout)
00316 {
00317 return handler_->get_socket ()->recv (iov,this->peer_addr_,0,timeout);
00318 }
00319
00320
00321
00322
00323
00324
00325 TAO_AV_UDP_Acceptor::TAO_AV_UDP_Acceptor (void)
00326 : address_ (0),
00327 control_inet_address_ (0)
00328 {
00329 }
00330
00331 TAO_AV_UDP_Acceptor::~TAO_AV_UDP_Acceptor (void)
00332 {
00333 if (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)
00334 delete this->entry_->control_handler ();
00335
00336 delete this->address_;
00337 delete this->control_inet_address_;
00338 }
00339
00340 int
00341 TAO_AV_UDP_Acceptor::activate_svc_handler (TAO_AV_Flow_Handler *handler)
00342 {
00343 ACE_Event_Handler *event_handler = handler->event_handler ();
00344 int result = this->av_core_->reactor ()->register_handler (event_handler,
00345 ACE_Event_Handler::READ_MASK);
00346
00347 if (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)
00348 handler->schedule_timer ();
00349
00350 return result;
00351 }
00352
00353 int
00354 TAO_AV_UDP_Acceptor::open (TAO_Base_StreamEndPoint *endpoint,
00355 TAO_AV_Core *av_core,
00356 TAO_FlowSpec_Entry *entry,
00357 TAO_AV_Flow_Protocol_Factory *factory,
00358 TAO_AV_Core::Flow_Component flow_comp)
00359 {
00360 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Acceptor::open\n"));
00361 this->av_core_ = av_core;
00362 this->endpoint_ = endpoint;
00363 this->entry_ = entry;
00364 this->flow_component_ = flow_comp;
00365 this->flow_protocol_factory_ = factory;
00366 ACE_INET_Addr *inet_addr;
00367 if (flow_comp == TAO_AV_Core::TAO_AV_CONTROL)
00368 {
00369 this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname ());
00370 inet_addr = (ACE_INET_Addr *) entry->control_address ();
00371 }
00372 else
00373 {
00374 this->flowname_ = entry->flowname ();
00375 inet_addr = (ACE_INET_Addr *) entry->address ();
00376 }
00377
00378 if (inet_addr != 0)
00379 {
00380 char buf[BUFSIZ];
00381 inet_addr->addr_to_string (buf,
00382 BUFSIZ);
00383
00384 if (TAO_debug_level > 0)
00385 ACE_DEBUG ((LM_DEBUG,
00386 "TAO_AV_UDP_Acceptor::open: %s\n",
00387 buf));
00388 }
00389
00390 int result = this->open_i (inet_addr, 0);
00391
00392 if (result < 0)
00393 return result;
00394 return 0;
00395 }
00396
00397 int
00398 TAO_AV_UDP_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint,
00399 TAO_AV_Core *av_core,
00400 TAO_FlowSpec_Entry *entry,
00401 TAO_AV_Flow_Protocol_Factory *factory,
00402 TAO_AV_Core::Flow_Component flow_comp)
00403 {
00404 this->av_core_ = av_core;
00405 this->endpoint_ = endpoint;
00406 this->entry_ = entry;
00407 this->flow_component_ = flow_comp;
00408 this->flow_protocol_factory_ = factory;
00409 if (flow_comp == TAO_AV_Core::TAO_AV_CONTROL)
00410 {
00411 this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname ());
00412 }
00413 else
00414 {
00415 this->flowname_ = entry->flowname ();
00416 ACE_NEW_RETURN (this->address_,
00417 ACE_INET_Addr ("0"),
00418 -1);
00419 }
00420
00421 int result = this->open_i (this->address_, 1);
00422 if (result < 0)
00423 return result;
00424
00425 return 0;
00426 }
00427
00428 int
00429 TAO_AV_UDP_Acceptor::open_i (ACE_INET_Addr *inet_addr,
00430 int is_default_addr)
00431 {
00432 int result = -1;
00433 ACE_INET_Addr *local_addr = 0;
00434
00435 TAO_AV_Flow_Handler *flow_handler = 0;
00436
00437
00438
00439 if (is_default_addr &&
00440 (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL) &&
00441 (ACE_OS::strcasecmp(this->entry_->flow_protocol_str (), "RTP") == 0))
00442 {
00443 flow_handler = this->entry_->control_handler ();
00444
00445 local_addr = dynamic_cast<ACE_INET_Addr*> (this->entry_->get_local_control_addr ());
00446 }
00447 else
00448 {
00449
00450
00451 int get_new_port = 1;
00452
00453 while (get_new_port)
00454 {
00455
00456 get_new_port = 0;
00457
00458 result = TAO_AV_UDP_Connection_Setup::setup (flow_handler,
00459 inet_addr,
00460 local_addr,
00461 this->entry_->is_multicast (),
00462 TAO_AV_UDP_Connection_Setup::ACCEPTOR);
00463
00464 if (result < 0)
00465 {
00466 ACE_DEBUG((LM_DEBUG,"(%N,%l) Error during connection setup: %d\n", result));
00467 }
00468
00469 local_addr->set (local_addr->get_port_number (),
00470 local_addr->get_host_name ());
00471
00472 if (is_default_addr)
00473 {
00474 if ((ACE_OS::strcasecmp(this->entry_->flow_protocol_str (), "RTP") == 0) &&
00475 (this->flow_component_ == TAO_AV_Core::TAO_AV_DATA))
00476 {
00477 if (is_default_addr && local_addr->get_port_number ()%2 != 0)
00478 {
00479
00480 delete local_addr;
00481 delete flow_handler;
00482 get_new_port = 1;
00483 }
00484 else
00485 {
00486 ACE_INET_Addr *local_control_addr;
00487 TAO_AV_Flow_Handler *control_flow_handler = 0;
00488
00489 ACE_NEW_RETURN (this->control_inet_address_,
00490 ACE_INET_Addr ("0"),
00491 -1);
00492
00493 TAO_AV_UDP_Connection_Setup::setup(control_flow_handler,
00494 this->control_inet_address_,
00495 local_control_addr,
00496 this->entry_->is_multicast (),
00497 TAO_AV_UDP_Connection_Setup::ACCEPTOR);
00498
00499 if (local_control_addr->get_port_number () !=
00500 local_addr->get_port_number () +1)
00501 {
00502 delete this->control_inet_address_;
00503 delete local_addr;
00504 delete flow_handler;
00505 delete local_control_addr;
00506 delete control_flow_handler;
00507 get_new_port = 1;
00508 }
00509 else
00510 {
00511 this->entry_->control_address (this->control_inet_address_);
00512 this->entry_->set_local_control_addr (local_control_addr);
00513 this->entry_->control_handler (control_flow_handler);
00514 }
00515 }
00516 }
00517 }
00518 }
00519 }
00520
00521 TAO_AV_Protocol_Object *object =
00522 this->flow_protocol_factory_->make_protocol_object (this->entry_,
00523 this->endpoint_,
00524 flow_handler,
00525 flow_handler->transport ());
00526 flow_handler->protocol_object (object);
00527
00528 if (this->flow_component_ == TAO_AV_Core::TAO_AV_DATA)
00529 {
00530 this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler);
00531
00532 this->entry_->protocol_object (object);
00533 this->entry_->set_local_addr (local_addr);
00534 this->entry_->handler (flow_handler);
00535
00536 this->entry_->address (local_addr);
00537 }
00538 else
00539 {
00540 this->endpoint_->set_control_flow_handler (this->flowname_.c_str (),flow_handler);
00541
00542 this->entry_->control_protocol_object (object);
00543 this->entry_->set_local_control_addr (local_addr);
00544 this->entry_->control_handler (flow_handler);
00545 }
00546
00547 char buf[BUFSIZ];
00548 local_addr->addr_to_string (buf,BUFSIZ);
00549 if (TAO_debug_level > 0)
00550 ACE_DEBUG ((LM_DEBUG,
00551 "TAO_AV_UDP_ACCEPTOR::open:%s \n",
00552 buf));
00553
00554
00555 return this->activate_svc_handler (flow_handler);
00556 }
00557
00558 int
00559 TAO_AV_UDP_Acceptor::close (void)
00560 {
00561 return 0;
00562 }
00563
00564
00565
00566
00567 TAO_AV_UDP_Connector::TAO_AV_UDP_Connector (void)
00568 : control_inet_address_ (0)
00569 {
00570 }
00571
00572 TAO_AV_UDP_Connector::~TAO_AV_UDP_Connector (void)
00573 {
00574 if (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)
00575 {
00576 delete this->entry_->control_handler ();
00577 }
00578
00579 if (this->control_inet_address_ != 0)
00580 delete this->control_inet_address_;
00581 }
00582
00583 int
00584 TAO_AV_UDP_Connector::open (TAO_Base_StreamEndPoint *endpoint,
00585 TAO_AV_Core *av_core,
00586 TAO_AV_Flow_Protocol_Factory *factory)
00587
00588 {
00589 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Connector::open "));
00590 this->endpoint_ = endpoint;
00591 this->av_core_ = av_core;
00592 this->flow_protocol_factory_ = factory;
00593 return 0;
00594 }
00595
00596 int
00597 TAO_AV_UDP_Connector::connect (TAO_FlowSpec_Entry *entry,
00598 TAO_AV_Transport *&transport,
00599 TAO_AV_Core::Flow_Component flow_component)
00600 {
00601 ACE_INET_Addr *local_addr = 0;
00602 ACE_INET_Addr *control_inet_addr = 0;
00603
00604 this->entry_ = entry;
00605 this->flow_component_ = flow_component;
00606
00607 ACE_INET_Addr *inet_addr;
00608
00609 if (flow_component == TAO_AV_Core::TAO_AV_CONTROL)
00610 {
00611 this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname());
00612 inet_addr = dynamic_cast<ACE_INET_Addr*> (entry->control_address ());
00613 }
00614 else
00615 {
00616 this->flowname_ = entry->flowname ();
00617 inet_addr = dynamic_cast<ACE_INET_Addr*> (entry->address ());
00618 control_inet_addr = dynamic_cast<ACE_INET_Addr*> (entry->control_address ());
00619 }
00620
00621 TAO_AV_Flow_Handler *flow_handler = 0;
00622
00623
00624
00625 if ((flow_component == TAO_AV_Core::TAO_AV_CONTROL) &&
00626 (ACE_OS::strcasecmp(this->entry_->flow_protocol_str (), "RTP") == 0) &&
00627 !entry->is_multicast ())
00628 {
00629 flow_handler = this->entry_->control_handler ();
00630 flow_handler->set_remote_address (inet_addr);
00631
00632 local_addr = dynamic_cast<ACE_INET_Addr*> (this->entry_->get_local_control_addr ());
00633 }
00634 else
00635 {
00636
00637
00638 int get_new_port = 1;
00639
00640 while (get_new_port)
00641 {
00642
00643 get_new_port = 0;
00644
00645 ACE_Addr *addr;
00646 if ((addr = entry->get_peer_addr ()) != 0)
00647 {
00648 local_addr = dynamic_cast<ACE_INET_Addr*> (addr);
00649 char buf [BUFSIZ];
00650 local_addr->addr_to_string (buf, BUFSIZ);
00651 }
00652
00653 TAO_AV_UDP_Connection_Setup::setup (flow_handler,
00654 inet_addr,
00655 local_addr,
00656 entry->is_multicast (),
00657 TAO_AV_UDP_Connection_Setup::CONNECTOR);
00658
00659 if ((ACE_OS::strcasecmp(this->entry_->flow_protocol_str (), "RTP") == 0) &&
00660 (flow_component == TAO_AV_Core::TAO_AV_DATA) &&
00661 !entry->is_multicast ())
00662 {
00663 if (local_addr->get_port_number ()%2 != 0)
00664 {
00665
00666 delete local_addr;
00667 delete flow_handler;
00668 get_new_port = 1;
00669 }
00670 else
00671 {
00672 ACE_INET_Addr *local_control_addr;
00673 TAO_AV_Flow_Handler *control_flow_handler = 0;
00674
00675 if (entry->is_multicast ())
00676 control_inet_addr = dynamic_cast<ACE_INET_Addr*> (entry->control_address ()) ;
00677 else
00678 {
00679
00680 if (local_addr != 0)
00681 {
00682 char buf [BUFSIZ];
00683 ACE_CString addr_str (local_addr->get_host_name ());
00684 addr_str += ":";
00685 addr_str += ACE_OS::itoa (local_addr->get_port_number () + 1, buf, 10);
00686 ACE_NEW_RETURN (local_control_addr,
00687 ACE_INET_Addr (addr_str.c_str ()),
00688 -1);
00689 local_control_addr->addr_to_string (buf, BUFSIZ);
00690 }
00691
00692
00693 if (entry->control_address () == 0)
00694 ACE_NEW_RETURN (this->control_inet_address_,
00695 ACE_INET_Addr ("0"),
00696 -1);
00697 else
00698 control_inet_address_ = dynamic_cast<ACE_INET_Addr*> (entry->control_address ());
00699 }
00700
00701 TAO_AV_UDP_Connection_Setup::setup (control_flow_handler,
00702 control_inet_addr,
00703 local_control_addr,
00704 entry->is_multicast (),
00705 TAO_AV_UDP_Connection_Setup::CONNECTOR);
00706
00707 if (local_control_addr->get_port_number () !=
00708 local_addr->get_port_number () +1)
00709 {
00710 delete local_addr;
00711 delete flow_handler;
00712 delete local_control_addr;
00713 delete control_flow_handler;
00714 get_new_port = 1;
00715 }
00716 else
00717 {
00718 this->entry_->set_local_control_addr (local_control_addr);
00719 this->entry_->control_handler (control_flow_handler);
00720 }
00721 }
00722 }
00723 }
00724 }
00725
00726 TAO_AV_Protocol_Object *object =
00727 this->flow_protocol_factory_->make_protocol_object (this->entry_,
00728 this->endpoint_,
00729 flow_handler,
00730 flow_handler->transport ());
00731
00732 flow_handler->protocol_object (object);
00733
00734 if (flow_component == TAO_AV_Core::TAO_AV_DATA)
00735 {
00736 this->endpoint_->set_flow_handler (this->flowname_.c_str (),
00737 flow_handler);
00738 this->entry_->protocol_object (object);
00739 entry->set_local_addr (local_addr);
00740 entry->handler (flow_handler);
00741 transport = flow_handler->transport ();
00742 }
00743 else
00744 {
00745 this->endpoint_->set_control_flow_handler (this->flowname_.c_str (),
00746 flow_handler);
00747 this->entry_->control_protocol_object (object);
00748 entry->set_local_control_addr (local_addr);
00749 entry->control_handler (flow_handler);
00750 transport = flow_handler->transport ();
00751 }
00752
00753 char buf[BUFSIZ];
00754 local_addr->addr_to_string (buf,BUFSIZ);
00755
00756 if (TAO_debug_level > 0)
00757 ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_CONNECTOR::connect:%s \n",buf));
00758
00759
00760 return this->activate_svc_handler (flow_handler);
00761 }
00762
00763 int
00764 TAO_AV_UDP_Connector::activate_svc_handler (TAO_AV_Flow_Handler *handler)
00765 {
00766 ACE_Event_Handler *event_handler = handler->event_handler ();
00767 int result = this->av_core_->reactor ()->register_handler (event_handler,
00768 ACE_Event_Handler::READ_MASK);
00769
00770 if (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)
00771 handler->schedule_timer ();
00772
00773 return result;
00774 }
00775
00776 int
00777 TAO_AV_UDP_Connector::close (void)
00778 {
00779 return 0;
00780 }
00781
00782
00783
00784
00785
00786 int
00787 TAO_AV_UDP_Connection_Setup::setup (TAO_AV_Flow_Handler *&flow_handler,
00788 ACE_INET_Addr *inet_addr,
00789 ACE_INET_Addr *&local_addr,
00790 int is_multicast,
00791 ConnectionType ct)
00792 {
00793 int result;
00794
00795 if (is_multicast)
00796 {
00797 TAO_AV_UDP_MCast_Flow_Handler *handler;
00798 ACE_NEW_RETURN (handler,
00799 TAO_AV_UDP_MCast_Flow_Handler,
00800 -1);
00801
00802 flow_handler = handler;
00803
00804 result = handler->get_mcast_socket ()->subscribe (*inet_addr);
00805 if (result < 0)
00806 ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_UDP_MCast_connector::open failed\n"),-1);
00807
00808
00809
00810 #if defined (ACE_HAS_IP_MULTICAST)
00811 if (handler->get_mcast_socket ()->set_option (IP_MULTICAST_LOOP,
00812 0) < 0)
00813 if (TAO_debug_level > 0)
00814 ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_MCast_Acceptor::multicast loop disable failed\n"));
00815
00816 #endif
00817
00818 int bufsize = 80 * 1024;
00819 if (handler->get_mcast_socket ()->ACE_SOCK::set_option (SOL_SOCKET,
00820 SO_RCVBUF,
00821 (char *)&bufsize,
00822 sizeof(bufsize)) < 0)
00823 {
00824 bufsize = 32 * 1024;
00825 if (handler->get_mcast_socket ()->ACE_SOCK::set_option (SOL_SOCKET,
00826 SO_RCVBUF,
00827 (char *)&bufsize,
00828 sizeof(bufsize)) < 0)
00829 perror("SO_RCVBUF");
00830 }
00831 ACE_NEW_RETURN (local_addr,
00832 ACE_INET_Addr ("0"),
00833 -1);
00834
00835 if (ct == ACCEPTOR)
00836 {
00837 result = handler->get_mcast_socket ()->get_local_addr (*local_addr);
00838 if (result < 0)
00839 ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Dgram_Connector::open: get_local_addr failed\n"),result);
00840
00841 local_addr->set (local_addr->get_port_number (),
00842 local_addr->get_host_name ());
00843 handler->set_peer_addr (local_addr);
00844 }
00845 }
00846 else
00847 {
00848 if (local_addr == 0)
00849 ACE_NEW_RETURN (local_addr,
00850 ACE_INET_Addr ("0"),
00851 -1);
00852
00853 TAO_AV_UDP_Flow_Handler *handler;
00854 ACE_NEW_RETURN (handler,
00855 TAO_AV_UDP_Flow_Handler,
00856 -1);
00857
00858 flow_handler = handler;
00859
00860 if (ct == ACCEPTOR)
00861 result = handler->open (*inet_addr);
00862 else
00863 result = handler->open (*local_addr);
00864 if (result < 0)
00865 ACE_ERROR_RETURN ((LM_ERROR,"handler::open failed\n"),-1);
00866
00867
00868 int sndbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00869 int rcvbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00870
00871 if (handler->get_socket ()->set_option (SOL_SOCKET,
00872 SO_SNDBUF,
00873 (void *) &sndbufsize,
00874 sizeof (sndbufsize)) == -1
00875 && errno != ENOTSUP)
00876 return 0;
00877 else if (handler->get_socket ()->set_option (SOL_SOCKET,
00878 SO_RCVBUF,
00879 (void *) &rcvbufsize,
00880 sizeof (rcvbufsize)) == -1
00881 && errno != ENOTSUP)
00882 return 0;
00883
00884 if (ct == CONNECTOR)
00885 handler->set_remote_address (inet_addr);
00886
00887 result = handler->get_socket ()->get_local_addr (*local_addr);
00888
00889 local_addr->set (local_addr->get_port_number (),
00890 local_addr->get_host_name ());
00891
00892 char buf [BUFSIZ];
00893 local_addr->addr_to_string (buf, BUFSIZ);
00894
00895 if (result < 0)
00896 ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Dgram_Connector::open: get_local_addr failed\n"),result);
00897 }
00898
00899 return 1;
00900 }
00901
00902
00903
00904
00905
00906 TAO_AV_UDP_Factory::TAO_AV_UDP_Factory (void)
00907 {
00908 }
00909
00910 TAO_AV_UDP_Factory::~TAO_AV_UDP_Factory (void)
00911 {
00912 }
00913
00914 int
00915 TAO_AV_UDP_Factory::match_protocol (const char *protocol_string)
00916 {
00917 if (ACE_OS::strcasecmp (protocol_string,"UDP") == 0)
00918 return 1;
00919 if (ACE_OS::strcasecmp (protocol_string,"RTP/UDP") == 0)
00920 return 1;
00921 return 0;
00922 }
00923
00924 TAO_AV_Acceptor*
00925 TAO_AV_UDP_Factory::make_acceptor (void)
00926 {
00927 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Factory::make_acceptor\n"));
00928 TAO_AV_Acceptor *acceptor = 0;
00929 ACE_NEW_RETURN (acceptor,
00930 TAO_AV_UDP_Acceptor,
00931 0);
00932 return acceptor;
00933 }
00934
00935 TAO_AV_Connector*
00936 TAO_AV_UDP_Factory::make_connector (void)
00937 {
00938 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Factory::make_connector\n"));
00939 TAO_AV_Connector *connector = 0;
00940 ACE_NEW_RETURN (connector,
00941 TAO_AV_UDP_Connector,
00942 0);
00943 return connector;
00944 }
00945
00946 int
00947 TAO_AV_UDP_Factory::init (int ,
00948 char * [])
00949 {
00950 return 0;
00951 }
00952
00953
00954
00955
00956
00957 int
00958 TAO_AV_UDP_Object::handle_input (void)
00959 {
00960 int n = this->transport_->recv (this->frame_.rd_ptr (),
00961 this->frame_.size ());
00962 if (n == -1)
00963 ACE_ERROR_RETURN ((LM_ERROR,"(%N,%l) TAO_AV_UDP_Flow_Handler::handle_input recv failed: errno: %m\n"),-1);
00964
00965 this->frame_.wr_ptr (this->frame_.rd_ptr () + n);
00966
00967 return this->callback_->receive_frame (&this->frame_);
00968 }
00969
00970 int
00971 TAO_AV_UDP_Object::send_frame (ACE_Message_Block *frame,
00972 TAO_AV_frame_info * )
00973 {
00974 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Object::send_frame\n"));
00975 int result = this->transport_->send (frame);
00976 if (result < 0)
00977 return result;
00978 return 0;
00979 }
00980
00981 int
00982 TAO_AV_UDP_Object::send_frame (const iovec *iov,
00983 int iovcnt,
00984 TAO_AV_frame_info * )
00985 {
00986 int result = this->transport_->send (iov,iovcnt);
00987 if (result < 0)
00988 return result;
00989 return 0;
00990 }
00991
00992 int
00993 TAO_AV_UDP_Object::send_frame (const char*buf,
00994 size_t len)
00995 {
00996 int result = this->transport_->send (buf, len, 0);
00997 if (result < 0)
00998 return result;
00999 return 0;
01000 }
01001
01002 TAO_AV_UDP_Object::TAO_AV_UDP_Object (TAO_AV_Callback *callback,
01003 TAO_AV_Transport *transport)
01004 :TAO_AV_Protocol_Object (callback,transport)
01005 {
01006 this->frame_.size (this->transport_->mtu ());
01007 }
01008
01009 TAO_AV_UDP_Object::~TAO_AV_UDP_Object (void)
01010 {
01011
01012 }
01013
01014 int
01015 TAO_AV_UDP_Object::destroy (void)
01016 {
01017 this->callback_->handle_destroy ();
01018 delete this;
01019
01020 return 0;
01021 }
01022
01023
01024
01025
01026
01027 TAO_AV_UDP_Flow_Factory::TAO_AV_UDP_Flow_Factory (void)
01028 {
01029 }
01030
01031 TAO_AV_UDP_Flow_Factory::~TAO_AV_UDP_Flow_Factory (void)
01032 {
01033 }
01034
01035 int
01036 TAO_AV_UDP_Flow_Factory::init (int ,
01037 char * [])
01038 {
01039 return 0;
01040 }
01041
01042 int
01043 TAO_AV_UDP_Flow_Factory::match_protocol (const char *flow_string)
01044 {
01045 if (ACE_OS::strcasecmp (flow_string,"UDP") == 0)
01046 return 1;
01047 return 0;
01048 }
01049
01050 TAO_AV_Protocol_Object*
01051 TAO_AV_UDP_Flow_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry,
01052 TAO_Base_StreamEndPoint *endpoint,
01053 TAO_AV_Flow_Handler *handler,
01054 TAO_AV_Transport *transport)
01055 {
01056 TAO_AV_Callback *callback = 0;
01057 if( endpoint->get_callback (entry->flowname (), callback) ) {
01058 ACE_ERROR_RETURN ((LM_ERROR, "(%N,%l) Invalid callback\n"), 0);
01059 }
01060
01061 TAO_AV_UDP_Object *object = 0;
01062 ACE_NEW_RETURN (object,
01063 TAO_AV_UDP_Object (callback,
01064 transport),
01065 0);
01066 callback->open (object,
01067 handler);
01068 endpoint->set_protocol_object (entry->flowname (),
01069 object);
01070
01071 endpoint->protocol_object_set ();
01072
01073 return object;
01074 }
01075
01076 TAO_END_VERSIONED_NAMESPACE_DECL
01077
01078 ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_UDP_Flow_Factory)
01079 ACE_STATIC_SVC_DEFINE (TAO_AV_UDP_Flow_Factory,
01080 ACE_TEXT ("UDP_Flow_Factory"),
01081 ACE_SVC_OBJ_T,
01082 &ACE_SVC_NAME (TAO_AV_UDP_Flow_Factory),
01083 ACE_Service_Type::DELETE_THIS |
01084 ACE_Service_Type::DELETE_OBJ,
01085 0)
01086
01087 ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_UDP_Factory)
01088 ACE_STATIC_SVC_DEFINE (TAO_AV_UDP_Factory,
01089 ACE_TEXT ("UDP_Factory"),
01090 ACE_SVC_OBJ_T,
01091 &ACE_SVC_NAME (TAO_AV_UDP_Factory),
01092 ACE_Service_Type::DELETE_THIS |
01093 ACE_Service_Type::DELETE_OBJ,
01094 0)