00001
00002
00003 #include "orbsvcs/AV/UDP.h"
00004 #include "orbsvcs/AV/AVStreams_i.h"
00005 #include "orbsvcs/AV/MCast.h"
00006
00007 #include "tao/debug.h"
00008 #include "ace/OS_NS_strings.h"
00009
00010 #if !defined (__ACE_INLINE__)
00011 #include "orbsvcs/AV/UDP.inl"
00012 #endif
00013
00014 ACE_RCSID (AV,
00015 UDP,
00016 "$Id: UDP.cpp 81401 2008-04-23 18:12:56Z elliott_c $")
00017
00018 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00019
00020
00021
00022
00023
00024 TAO_AV_UDP_Flow_Handler::TAO_AV_UDP_Flow_Handler (void)
00025 {
00026 ACE_NEW (this->transport_,
00027 TAO_AV_UDP_Transport (this));
00028 }
00029
00030 TAO_AV_UDP_Flow_Handler::~TAO_AV_UDP_Flow_Handler (void)
00031 {
00032
00033 TAO_AV_CORE::instance()->reactor ()->remove_handler (this->event_handler(),
00034 ACE_Event_Handler::READ_MASK);
00035
00036
00037 this->close ();
00038 delete this->transport_;
00039 }
00040
00041 TAO_AV_Transport *
00042 TAO_AV_UDP_Flow_Handler::transport (void)
00043 {
00044 return this->transport_;
00045 }
00046
00047 int
00048 TAO_AV_UDP_Flow_Handler::handle_input (ACE_HANDLE )
00049 {
00050 return this->protocol_object_->handle_input ();
00051 }
00052
00053 int
00054 TAO_AV_UDP_Flow_Handler::handle_timeout (const ACE_Time_Value &tv,
00055 const void *arg)
00056 {
00057 return TAO_AV_Flow_Handler::handle_timeout (tv,arg);
00058 }
00059
00060 int
00061 TAO_AV_UDP_Flow_Handler::set_remote_address (ACE_Addr *address)
00062 {
00063 if (TAO_debug_level > 0)
00064 ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Flow_Handler::set_remote_address\n"));
00065
00066 ACE_INET_Addr *inet_addr = dynamic_cast<ACE_INET_Addr*> (address);
00067 this->peer_addr_ = *inet_addr;
00068 TAO_AV_UDP_Transport *transport = dynamic_cast<TAO_AV_UDP_Transport*> (this->transport_);
00069
00070 return transport->set_remote_address (*inet_addr);
00071 }
00072
00073
00074 ACE_HANDLE
00075 TAO_AV_UDP_Flow_Handler::get_handle (void) const
00076 {
00077 if (TAO_debug_level > 0)
00078 ACE_DEBUG ((LM_DEBUG,
00079 "TAO_AV_UDP_Flow_Handler::get_handle:%d\n",
00080 this->sock_dgram_.get_handle ()));
00081
00082 return this->sock_dgram_.get_handle () ;
00083 }
00084
00085 int
00086 TAO_AV_UDP_Flow_Handler::change_qos(AVStreams::QoS qos)
00087 {
00088 if( TAO_debug_level > 0 )
00089 {
00090 ACE_DEBUG ((LM_DEBUG,
00091 "(%N,%l) TAO_AV_UDP_Flow_Handler::change_qos\n"));
00092 }
00093
00094 unsigned int i=0;
00095
00096 int ret = 0;
00097 CORBA::Long dscp = 0;
00098 CORBA::Long ecn = 0;
00099 int dscp_flag=0;
00100 for(i=0; i < qos.QoSParams.length(); i++)
00101 {
00102
00103 if( ACE_OS::strcmp( qos.QoSParams[i].property_name.in(), "Diffserv_Codepoint") == 0)
00104 {
00105 qos.QoSParams[i].property_value >>= dscp;
00106 dscp_flag=1;
00107
00108 if(!((dscp >= 0) && (dscp <= 63)))
00109 {
00110 dscp_flag = 0;
00111 ACE_DEBUG((LM_DEBUG, "(%N,%l) ECN value can only be (0-3) not %d\n", ecn));
00112 return -1;
00113 }
00114 }
00115
00116 if( ACE_OS::strcmp( qos.QoSParams[i].property_name.in(), "ECN") == 0)
00117 {
00118 qos.QoSParams[i].property_value >>= ecn;
00119
00120
00121 if(!((ecn >= 0) && (ecn <= 3)))
00122 {
00123 ACE_DEBUG((LM_DEBUG, "(%N,%l) ECN value can only be (0-3) not %d\n", ecn));
00124 ecn = 0;
00125 }
00126
00127 }
00128 }
00129
00130
00131
00132 if(dscp_flag || ecn)
00133 {
00134 int tos;
00135 tos = (int)(dscp << 2);
00136 if(ecn)
00137 {
00138 tos |= ecn;
00139 }
00140 ret = sock_dgram_.set_option(IPPROTO_IP, IP_TOS, (int *)&tos , (int)sizeof(tos));
00141
00142 if(TAO_debug_level > 1)
00143 {
00144 ACE_DEBUG((LM_DEBUG, "(%N,%l) set tos: ret: %d\n", ret));
00145 }
00146 }
00147
00148 if(TAO_debug_level > 1)
00149 {
00150 if(ret < 0 )
00151 {
00152 ACE_DEBUG((LM_DEBUG, "(%N,%l) errno: %p\n"));
00153 }
00154 }
00155 return ret;
00156 }
00157
00158
00159
00160
00161
00162 TAO_AV_UDP_Transport::TAO_AV_UDP_Transport (void)
00163 :handler_ (0)
00164 {
00165 }
00166
00167 TAO_AV_UDP_Transport::TAO_AV_UDP_Transport (TAO_AV_UDP_Flow_Handler *handler)
00168 :handler_ (handler),
00169 addr_ (0)
00170 {
00171 }
00172
00173 TAO_AV_UDP_Transport::~TAO_AV_UDP_Transport (void)
00174 {
00175 }
00176
00177 int
00178 TAO_AV_UDP_Transport::set_remote_address (const ACE_INET_Addr &address)
00179 {
00180 this->peer_addr_ = address;
00181 return 0;
00182 }
00183
00184 int
00185 TAO_AV_UDP_Transport::open (ACE_Addr * )
00186 {
00187 return 0;
00188 }
00189
00190 int
00191 TAO_AV_UDP_Transport::close (void)
00192 {
00193 return 0;
00194 }
00195
00196 int
00197 TAO_AV_UDP_Transport::mtu (void)
00198 {
00199 return 65535;
00200 }
00201
00202 ACE_Addr*
00203 TAO_AV_UDP_Transport::get_peer_addr (void)
00204 {
00205 return &this->peer_addr_;
00206 }
00207
00208 ssize_t
00209 TAO_AV_UDP_Transport::send (const ACE_Message_Block *mblk, ACE_Time_Value *)
00210 {
00211
00212
00213
00214 iovec iov[ACE_IOV_MAX];
00215 int iovcnt = 0;
00216 ssize_t n = 0;
00217 ssize_t nbytes = 0;
00218
00219 for (const ACE_Message_Block *i = mblk;
00220 i != 0;
00221 i = i->cont ())
00222 {
00223
00224 if (i->length () > 0)
00225 {
00226 iov[iovcnt].iov_base = i->rd_ptr ();
00227 iov[iovcnt].iov_len = static_cast<u_long> (i->length ());
00228 iovcnt++;
00229
00230
00231
00232
00233
00234
00235
00236 if (iovcnt == ACE_IOV_MAX)
00237 {
00238 n = this->handler_->get_socket ()->send ((const iovec *) iov,
00239 iovcnt,
00240 this->peer_addr_);
00241
00242 if (n < 1)
00243 return n;
00244
00245 nbytes += n;
00246 iovcnt = 0;
00247 }
00248 }
00249 }
00250
00251
00252 if (iovcnt != 0)
00253 {
00254 n = this->handler_->get_socket ()->send ((const iovec *) iov,
00255 iovcnt,
00256 this->peer_addr_);
00257
00258 if (n < 1)
00259 return n;
00260
00261 nbytes += n;
00262 }
00263
00264 return nbytes;
00265 }
00266
00267 ssize_t
00268 TAO_AV_UDP_Transport::send (const char *buf,
00269 size_t len,
00270 ACE_Time_Value *)
00271 {
00272 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Transport::send "));
00273 char addr [BUFSIZ];
00274 this->peer_addr_.addr_to_string (addr,BUFSIZ);
00275 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"to %s\n",addr));
00276
00277 return this->handler_->get_socket ()->send (buf, len,this->peer_addr_);
00278 }
00279
00280 ssize_t
00281 TAO_AV_UDP_Transport::send (const iovec *iov,
00282 int iovcnt,
00283 ACE_Time_Value *)
00284 {
00285 return this->handler_->get_socket ()->send ((const iovec *) iov,
00286 iovcnt,
00287 this->peer_addr_);
00288
00289 }
00290
00291 ssize_t
00292 TAO_AV_UDP_Transport::recv (char *buf,
00293 size_t len,
00294 ACE_Time_Value *)
00295 {
00296 return this->handler_->get_socket ()->recv (buf, len,this->peer_addr_);
00297 }
00298
00299 ssize_t
00300 TAO_AV_UDP_Transport::recv (char *buf,
00301 size_t len,
00302 int flags,
00303 ACE_Time_Value *timeout)
00304 {
00305 return this->handler_->get_socket ()->recv (buf,
00306 len,
00307 this->peer_addr_,
00308 flags,
00309 timeout);
00310 }
00311
00312 ssize_t
00313 TAO_AV_UDP_Transport::recv (iovec *iov,
00314 int ,
00315 ACE_Time_Value *timeout)
00316 {
00317 return handler_->get_socket ()->recv (iov,this->peer_addr_,0,timeout);
00318 }
00319
00320
00321
00322
00323
00324
00325 TAO_AV_UDP_Acceptor::TAO_AV_UDP_Acceptor (void)
00326 : address_ (0),
00327 control_inet_address_ (0)
00328 {
00329 }
00330
00331 TAO_AV_UDP_Acceptor::~TAO_AV_UDP_Acceptor (void)
00332 {
00333 if (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)
00334 delete this->entry_->control_handler ();
00335
00336 delete this->address_;
00337 delete this->control_inet_address_;
00338 }
00339
00340 int
00341 TAO_AV_UDP_Acceptor::activate_svc_handler (TAO_AV_Flow_Handler *handler)
00342 {
00343 ACE_Event_Handler *event_handler = handler->event_handler ();
00344 int result = this->av_core_->reactor ()->register_handler (event_handler,
00345 ACE_Event_Handler::READ_MASK);
00346
00347 if (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)
00348 handler->schedule_timer ();
00349
00350 return result;
00351 }
00352
00353 int
00354 TAO_AV_UDP_Acceptor::open (TAO_Base_StreamEndPoint *endpoint,
00355 TAO_AV_Core *av_core,
00356 TAO_FlowSpec_Entry *entry,
00357 TAO_AV_Flow_Protocol_Factory *factory,
00358 TAO_AV_Core::Flow_Component flow_comp)
00359 {
00360 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Acceptor::open\n"));
00361 this->av_core_ = av_core;
00362 this->endpoint_ = endpoint;
00363 this->entry_ = entry;
00364 this->flow_component_ = flow_comp;
00365 this->flow_protocol_factory_ = factory;
00366 ACE_INET_Addr *inet_addr;
00367 if (flow_comp == TAO_AV_Core::TAO_AV_CONTROL)
00368 {
00369 this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname ());
00370 inet_addr = (ACE_INET_Addr *) entry->control_address ();
00371 }
00372 else
00373 {
00374 this->flowname_ = entry->flowname ();
00375 inet_addr = (ACE_INET_Addr *) entry->address ();
00376 }
00377
00378 if (inet_addr != 0)
00379 {
00380 char buf[BUFSIZ];
00381 inet_addr->addr_to_string (buf,
00382 BUFSIZ);
00383
00384 if (TAO_debug_level > 0)
00385 ACE_DEBUG ((LM_DEBUG,
00386 "TAO_AV_UDP_Acceptor::open: %s\n",
00387 buf));
00388 }
00389
00390 int result = this->open_i (inet_addr, 0);
00391
00392 if (result < 0)
00393 return result;
00394 return 0;
00395 }
00396
00397 int
00398 TAO_AV_UDP_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint,
00399 TAO_AV_Core *av_core,
00400 TAO_FlowSpec_Entry *entry,
00401 TAO_AV_Flow_Protocol_Factory *factory,
00402 TAO_AV_Core::Flow_Component flow_comp)
00403 {
00404 this->av_core_ = av_core;
00405 this->endpoint_ = endpoint;
00406 this->entry_ = entry;
00407 this->flow_component_ = flow_comp;
00408 this->flow_protocol_factory_ = factory;
00409 if (flow_comp == TAO_AV_Core::TAO_AV_CONTROL)
00410 {
00411 this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname ());
00412 }
00413 else
00414 {
00415 this->flowname_ = entry->flowname ();
00416 ACE_NEW_RETURN (this->address_,
00417 ACE_INET_Addr ("0"),
00418 -1);
00419 }
00420
00421 int result = this->open_i (this->address_, 1);
00422 if (result < 0)
00423 return result;
00424
00425 return 0;
00426 }
00427
00428 int
00429 TAO_AV_UDP_Acceptor::open_i (ACE_INET_Addr *inet_addr,
00430 int is_default_addr)
00431 {
00432 int result = -1;
00433 ACE_INET_Addr *local_addr = 0;
00434
00435 TAO_AV_Flow_Handler *flow_handler = 0;
00436
00437
00438
00439 if (is_default_addr &&
00440 (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL) &&
00441 (ACE_OS::strcasecmp(this->entry_->flow_protocol_str (), "RTP") == 0))
00442 {
00443 flow_handler = this->entry_->control_handler ();
00444
00445 local_addr = dynamic_cast<ACE_INET_Addr*> (this->entry_->get_local_control_addr ());
00446 }
00447 else
00448 {
00449
00450
00451 int get_new_port = 1;
00452
00453 while (get_new_port)
00454 {
00455
00456 get_new_port = 0;
00457
00458 result = TAO_AV_UDP_Connection_Setup::setup (flow_handler,
00459 inet_addr,
00460 local_addr,
00461 this->entry_->is_multicast (),
00462 TAO_AV_UDP_Connection_Setup::ACCEPTOR);
00463
00464 if (result < 0)
00465 {
00466 ACE_DEBUG((LM_DEBUG,"(%N,%l) Error during connection setup: %d\n", result));
00467 }
00468
00469 local_addr->set (local_addr->get_port_number (),
00470 local_addr->get_host_name ());
00471
00472 if (is_default_addr)
00473 {
00474 if ((ACE_OS::strcasecmp(this->entry_->flow_protocol_str (), "RTP") == 0) &&
00475 (this->flow_component_ == TAO_AV_Core::TAO_AV_DATA))
00476 {
00477 if (is_default_addr && local_addr->get_port_number ()%2 != 0)
00478 {
00479
00480 delete local_addr;
00481 local_addr = 0;
00482 delete flow_handler;
00483 get_new_port = 1;
00484 }
00485 else
00486 {
00487 ACE_INET_Addr *local_control_addr = 0;
00488 TAO_AV_Flow_Handler *control_flow_handler = 0;
00489
00490 ACE_NEW_RETURN (this->control_inet_address_,
00491 ACE_INET_Addr ("0"),
00492 -1);
00493
00494 TAO_AV_UDP_Connection_Setup::setup(control_flow_handler,
00495 this->control_inet_address_,
00496 local_control_addr,
00497 this->entry_->is_multicast (),
00498 TAO_AV_UDP_Connection_Setup::ACCEPTOR);
00499
00500 if (local_control_addr->get_port_number () !=
00501 local_addr->get_port_number () +1)
00502 {
00503 delete this->control_inet_address_;
00504 delete local_addr;
00505 local_addr = 0;
00506 delete flow_handler;
00507 delete local_control_addr;
00508 delete control_flow_handler;
00509 get_new_port = 1;
00510 }
00511 else
00512 {
00513 this->entry_->control_address (this->control_inet_address_);
00514 this->entry_->set_local_control_addr (local_control_addr);
00515 this->entry_->control_handler (control_flow_handler);
00516 }
00517 }
00518 }
00519 }
00520 }
00521 }
00522
00523 TAO_AV_Protocol_Object *object =
00524 this->flow_protocol_factory_->make_protocol_object (this->entry_,
00525 this->endpoint_,
00526 flow_handler,
00527 flow_handler->transport ());
00528 flow_handler->protocol_object (object);
00529
00530 if (this->flow_component_ == TAO_AV_Core::TAO_AV_DATA)
00531 {
00532 this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler);
00533
00534 this->entry_->protocol_object (object);
00535 this->entry_->set_local_addr (local_addr);
00536 this->entry_->handler (flow_handler);
00537
00538 this->entry_->address (local_addr);
00539 }
00540 else
00541 {
00542 this->endpoint_->set_control_flow_handler (this->flowname_.c_str (),flow_handler);
00543
00544 this->entry_->control_protocol_object (object);
00545 this->entry_->set_local_control_addr (local_addr);
00546 this->entry_->control_handler (flow_handler);
00547 }
00548
00549 if (local_addr != 0)
00550 {
00551 char buf[BUFSIZ];
00552 local_addr->addr_to_string (buf,BUFSIZ);
00553 if (TAO_debug_level > 0)
00554 ACE_DEBUG ((LM_DEBUG,
00555 "TAO_AV_UDP_ACCEPTOR::open:%s \n",
00556 buf));
00557 }
00558
00559
00560 return this->activate_svc_handler (flow_handler);
00561 }
00562
00563 int
00564 TAO_AV_UDP_Acceptor::close (void)
00565 {
00566 return 0;
00567 }
00568
00569
00570
00571
00572 TAO_AV_UDP_Connector::TAO_AV_UDP_Connector (void)
00573 : control_inet_address_ (0)
00574 {
00575 }
00576
00577 TAO_AV_UDP_Connector::~TAO_AV_UDP_Connector (void)
00578 {
00579 if (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)
00580 {
00581 delete this->entry_->control_handler ();
00582 }
00583
00584 if (this->control_inet_address_ != 0)
00585 delete this->control_inet_address_;
00586 }
00587
00588 int
00589 TAO_AV_UDP_Connector::open (TAO_Base_StreamEndPoint *endpoint,
00590 TAO_AV_Core *av_core,
00591 TAO_AV_Flow_Protocol_Factory *factory)
00592
00593 {
00594 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Connector::open "));
00595 this->endpoint_ = endpoint;
00596 this->av_core_ = av_core;
00597 this->flow_protocol_factory_ = factory;
00598 return 0;
00599 }
00600
00601 int
00602 TAO_AV_UDP_Connector::connect (TAO_FlowSpec_Entry *entry,
00603 TAO_AV_Transport *&transport,
00604 TAO_AV_Core::Flow_Component flow_component)
00605 {
00606 ACE_INET_Addr *local_addr = 0;
00607 ACE_INET_Addr *control_inet_addr = 0;
00608
00609 this->entry_ = entry;
00610 this->flow_component_ = flow_component;
00611
00612 ACE_INET_Addr *inet_addr;
00613
00614 if (flow_component == TAO_AV_Core::TAO_AV_CONTROL)
00615 {
00616 this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname());
00617 inet_addr = dynamic_cast<ACE_INET_Addr*> (entry->control_address ());
00618 }
00619 else
00620 {
00621 this->flowname_ = entry->flowname ();
00622 inet_addr = dynamic_cast<ACE_INET_Addr*> (entry->address ());
00623 control_inet_addr = dynamic_cast<ACE_INET_Addr*> (entry->control_address ());
00624 }
00625
00626 TAO_AV_Flow_Handler *flow_handler = 0;
00627
00628
00629
00630 if ((flow_component == TAO_AV_Core::TAO_AV_CONTROL) &&
00631 (ACE_OS::strcasecmp(this->entry_->flow_protocol_str (), "RTP") == 0) &&
00632 !entry->is_multicast ())
00633 {
00634 flow_handler = this->entry_->control_handler ();
00635 flow_handler->set_remote_address (inet_addr);
00636
00637 local_addr = dynamic_cast<ACE_INET_Addr*> (this->entry_->get_local_control_addr ());
00638 }
00639 else
00640 {
00641
00642
00643 int get_new_port = 1;
00644
00645 while (get_new_port)
00646 {
00647
00648 get_new_port = 0;
00649
00650 ACE_Addr *addr;
00651 if ((addr = entry->get_peer_addr ()) != 0)
00652 {
00653 local_addr = dynamic_cast<ACE_INET_Addr*> (addr);
00654 char buf [BUFSIZ];
00655 local_addr->addr_to_string (buf, BUFSIZ);
00656 }
00657
00658 TAO_AV_UDP_Connection_Setup::setup (flow_handler,
00659 inet_addr,
00660 local_addr,
00661 entry->is_multicast (),
00662 TAO_AV_UDP_Connection_Setup::CONNECTOR);
00663
00664 if ((ACE_OS::strcasecmp(this->entry_->flow_protocol_str (), "RTP") == 0) &&
00665 (flow_component == TAO_AV_Core::TAO_AV_DATA) &&
00666 !entry->is_multicast ())
00667 {
00668 if (local_addr->get_port_number ()%2 != 0)
00669 {
00670
00671 delete local_addr;
00672 local_addr = 0;
00673 delete flow_handler;
00674 get_new_port = 1;
00675 }
00676 else
00677 {
00678 ACE_INET_Addr *local_control_addr = 0;
00679 TAO_AV_Flow_Handler *control_flow_handler = 0;
00680
00681 if (entry->is_multicast ())
00682 control_inet_addr = dynamic_cast<ACE_INET_Addr*> (entry->control_address ()) ;
00683 else
00684 {
00685
00686 if (local_addr != 0)
00687 {
00688 char buf [BUFSIZ];
00689 ACE_CString addr_str (local_addr->get_host_name ());
00690 addr_str += ":";
00691 addr_str += ACE_OS::itoa (local_addr->get_port_number () + 1, buf, 10);
00692 ACE_NEW_RETURN (local_control_addr,
00693 ACE_INET_Addr (addr_str.c_str ()),
00694 -1);
00695 local_control_addr->addr_to_string (buf, BUFSIZ);
00696 }
00697
00698
00699 if (entry->control_address () == 0)
00700 ACE_NEW_RETURN (this->control_inet_address_,
00701 ACE_INET_Addr ("0"),
00702 -1);
00703 else
00704 control_inet_address_ = dynamic_cast<ACE_INET_Addr*> (entry->control_address ());
00705 }
00706
00707 TAO_AV_UDP_Connection_Setup::setup (control_flow_handler,
00708 control_inet_addr,
00709 local_control_addr,
00710 entry->is_multicast (),
00711 TAO_AV_UDP_Connection_Setup::CONNECTOR);
00712
00713 if (local_control_addr->get_port_number () !=
00714 local_addr->get_port_number () +1)
00715 {
00716 delete local_addr;
00717 local_addr = 0;
00718 delete flow_handler;
00719 delete local_control_addr;
00720 delete control_flow_handler;
00721 get_new_port = 1;
00722 }
00723 else
00724 {
00725 this->entry_->set_local_control_addr (local_control_addr);
00726 this->entry_->control_handler (control_flow_handler);
00727 }
00728 }
00729 }
00730 }
00731 }
00732
00733 TAO_AV_Protocol_Object *object =
00734 this->flow_protocol_factory_->make_protocol_object (this->entry_,
00735 this->endpoint_,
00736 flow_handler,
00737 flow_handler->transport ());
00738
00739 flow_handler->protocol_object (object);
00740
00741 if (flow_component == TAO_AV_Core::TAO_AV_DATA)
00742 {
00743 this->endpoint_->set_flow_handler (this->flowname_.c_str (),
00744 flow_handler);
00745 this->entry_->protocol_object (object);
00746 entry->set_local_addr (local_addr);
00747 entry->handler (flow_handler);
00748 transport = flow_handler->transport ();
00749 }
00750 else
00751 {
00752 this->endpoint_->set_control_flow_handler (this->flowname_.c_str (),
00753 flow_handler);
00754 this->entry_->control_protocol_object (object);
00755 entry->set_local_control_addr (local_addr);
00756 entry->control_handler (flow_handler);
00757 transport = flow_handler->transport ();
00758 }
00759
00760 if (local_addr != 0)
00761 {
00762 char buf[BUFSIZ];
00763 local_addr->addr_to_string (buf,BUFSIZ);
00764
00765 if (TAO_debug_level > 0)
00766 ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_CONNECTOR::connect:%s \n",buf));
00767 }
00768
00769
00770 return this->activate_svc_handler (flow_handler);
00771 }
00772
00773 int
00774 TAO_AV_UDP_Connector::activate_svc_handler (TAO_AV_Flow_Handler *handler)
00775 {
00776 ACE_Event_Handler *event_handler = handler->event_handler ();
00777 int result = this->av_core_->reactor ()->register_handler (event_handler,
00778 ACE_Event_Handler::READ_MASK);
00779
00780 if (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)
00781 handler->schedule_timer ();
00782
00783 return result;
00784 }
00785
00786 int
00787 TAO_AV_UDP_Connector::close (void)
00788 {
00789 return 0;
00790 }
00791
00792
00793
00794
00795
00796 int
00797 TAO_AV_UDP_Connection_Setup::setup (TAO_AV_Flow_Handler *&flow_handler,
00798 ACE_INET_Addr *inet_addr,
00799 ACE_INET_Addr *&local_addr,
00800 int is_multicast,
00801 ConnectionType ct)
00802 {
00803 int result;
00804
00805 if (is_multicast)
00806 {
00807 TAO_AV_UDP_MCast_Flow_Handler *handler;
00808 ACE_NEW_RETURN (handler,
00809 TAO_AV_UDP_MCast_Flow_Handler,
00810 -1);
00811
00812 flow_handler = handler;
00813
00814 result = handler->get_mcast_socket ()->join (*inet_addr);
00815 if (result < 0)
00816 ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_UDP_MCast_connector::open failed\n"),-1);
00817
00818
00819
00820 #if defined (ACE_HAS_IP_MULTICAST)
00821 if (handler->get_mcast_socket ()->set_option (IP_MULTICAST_LOOP,
00822 0) < 0)
00823 if (TAO_debug_level > 0)
00824 ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_MCast_Acceptor::multicast loop disable failed\n"));
00825
00826 #endif
00827
00828 int bufsize = 80 * 1024;
00829 if (handler->get_mcast_socket ()->ACE_SOCK::set_option (SOL_SOCKET,
00830 SO_RCVBUF,
00831 (char *)&bufsize,
00832 sizeof(bufsize)) < 0)
00833 {
00834 bufsize = 32 * 1024;
00835 if (handler->get_mcast_socket ()->ACE_SOCK::set_option (SOL_SOCKET,
00836 SO_RCVBUF,
00837 (char *)&bufsize,
00838 sizeof(bufsize)) < 0)
00839 ACE_OS::perror("SO_RCVBUF");
00840 }
00841 ACE_NEW_RETURN (local_addr,
00842 ACE_INET_Addr ("0"),
00843 -1);
00844
00845 if (ct == ACCEPTOR)
00846 {
00847 result = handler->get_mcast_socket ()->get_local_addr (*local_addr);
00848 if (result < 0)
00849 ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Dgram_Connector::open: get_local_addr failed\n"),result);
00850
00851 local_addr->set (local_addr->get_port_number (),
00852 local_addr->get_host_name ());
00853 handler->set_peer_addr (local_addr);
00854 }
00855 }
00856 else
00857 {
00858 if (local_addr == 0)
00859 ACE_NEW_RETURN (local_addr,
00860 ACE_INET_Addr ("0"),
00861 -1);
00862
00863 TAO_AV_UDP_Flow_Handler *handler;
00864 ACE_NEW_RETURN (handler,
00865 TAO_AV_UDP_Flow_Handler,
00866 -1);
00867
00868 flow_handler = handler;
00869
00870 if (ct == ACCEPTOR)
00871 result = handler->open (*inet_addr);
00872 else
00873 result = handler->open (*local_addr);
00874 if (result < 0)
00875 ACE_ERROR_RETURN ((LM_ERROR,"handler::open failed\n"),-1);
00876
00877
00878 int sndbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00879 int rcvbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00880
00881 if (handler->get_socket ()->set_option (SOL_SOCKET,
00882 SO_SNDBUF,
00883 (void *) &sndbufsize,
00884 sizeof (sndbufsize)) == -1
00885 && errno != ENOTSUP)
00886 return 0;
00887 else if (handler->get_socket ()->set_option (SOL_SOCKET,
00888 SO_RCVBUF,
00889 (void *) &rcvbufsize,
00890 sizeof (rcvbufsize)) == -1
00891 && errno != ENOTSUP)
00892 return 0;
00893
00894 if (ct == CONNECTOR)
00895 handler->set_remote_address (inet_addr);
00896
00897 result = handler->get_socket ()->get_local_addr (*local_addr);
00898
00899 local_addr->set (local_addr->get_port_number (),
00900 local_addr->get_host_name ());
00901
00902 char buf [BUFSIZ];
00903 local_addr->addr_to_string (buf, BUFSIZ);
00904
00905 if (result < 0)
00906 ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Dgram_Connector::open: get_local_addr failed\n"),result);
00907 }
00908
00909 return 1;
00910 }
00911
00912
00913
00914
00915
00916 TAO_AV_UDP_Factory::TAO_AV_UDP_Factory (void)
00917 {
00918 }
00919
00920 TAO_AV_UDP_Factory::~TAO_AV_UDP_Factory (void)
00921 {
00922 }
00923
00924 int
00925 TAO_AV_UDP_Factory::match_protocol (const char *protocol_string)
00926 {
00927 if (ACE_OS::strcasecmp (protocol_string,"UDP") == 0)
00928 return 1;
00929 if (ACE_OS::strcasecmp (protocol_string,"RTP/UDP") == 0)
00930 return 1;
00931 return 0;
00932 }
00933
00934 TAO_AV_Acceptor*
00935 TAO_AV_UDP_Factory::make_acceptor (void)
00936 {
00937 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Factory::make_acceptor\n"));
00938 TAO_AV_Acceptor *acceptor = 0;
00939 ACE_NEW_RETURN (acceptor,
00940 TAO_AV_UDP_Acceptor,
00941 0);
00942 return acceptor;
00943 }
00944
00945 TAO_AV_Connector*
00946 TAO_AV_UDP_Factory::make_connector (void)
00947 {
00948 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Factory::make_connector\n"));
00949 TAO_AV_Connector *connector = 0;
00950 ACE_NEW_RETURN (connector,
00951 TAO_AV_UDP_Connector,
00952 0);
00953 return connector;
00954 }
00955
00956 int
00957 TAO_AV_UDP_Factory::init (int ,
00958 char * [])
00959 {
00960 return 0;
00961 }
00962
00963
00964
00965
00966
00967 int
00968 TAO_AV_UDP_Object::handle_input (void)
00969 {
00970 int n = this->transport_->recv (this->frame_.rd_ptr (),
00971 this->frame_.size ());
00972 if (n == -1)
00973 ACE_ERROR_RETURN ((LM_ERROR,"(%N,%l) TAO_AV_UDP_Flow_Handler::handle_input recv failed: errno: %m\n"),-1);
00974
00975 this->frame_.wr_ptr (this->frame_.rd_ptr () + n);
00976
00977 return this->callback_->receive_frame (&this->frame_);
00978 }
00979
00980 int
00981 TAO_AV_UDP_Object::send_frame (ACE_Message_Block *frame,
00982 TAO_AV_frame_info * )
00983 {
00984 if (TAO_debug_level > 0)
00985 ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Object::send_frame\n"));
00986 int const result = this->transport_->send (frame);
00987 if (result < 0)
00988 return result;
00989 return 0;
00990 }
00991
00992 int
00993 TAO_AV_UDP_Object::send_frame (const iovec *iov,
00994 int iovcnt,
00995 TAO_AV_frame_info * )
00996 {
00997 int const result = this->transport_->send (iov,iovcnt);
00998 if (result < 0)
00999 return result;
01000 return 0;
01001 }
01002
01003 int
01004 TAO_AV_UDP_Object::send_frame (const char*buf,
01005 size_t len)
01006 {
01007 int const result = this->transport_->send (buf, len, 0);
01008 if (result < 0)
01009 return result;
01010 return 0;
01011 }
01012
01013 TAO_AV_UDP_Object::TAO_AV_UDP_Object (TAO_AV_Callback *callback,
01014 TAO_AV_Transport *transport)
01015 :TAO_AV_Protocol_Object (callback,transport)
01016 {
01017 this->frame_.size (this->transport_->mtu ());
01018 }
01019
01020 TAO_AV_UDP_Object::~TAO_AV_UDP_Object (void)
01021 {
01022
01023 }
01024
01025 int
01026 TAO_AV_UDP_Object::destroy (void)
01027 {
01028 this->callback_->handle_destroy ();
01029 delete this;
01030
01031 return 0;
01032 }
01033
01034
01035
01036
01037
01038 TAO_AV_UDP_Flow_Factory::TAO_AV_UDP_Flow_Factory (void)
01039 {
01040 }
01041
01042 TAO_AV_UDP_Flow_Factory::~TAO_AV_UDP_Flow_Factory (void)
01043 {
01044 }
01045
01046 int
01047 TAO_AV_UDP_Flow_Factory::init (int ,
01048 char * [])
01049 {
01050 return 0;
01051 }
01052
01053 int
01054 TAO_AV_UDP_Flow_Factory::match_protocol (const char *flow_string)
01055 {
01056 if (ACE_OS::strcasecmp (flow_string,"UDP") == 0)
01057 return 1;
01058 return 0;
01059 }
01060
01061 TAO_AV_Protocol_Object*
01062 TAO_AV_UDP_Flow_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry,
01063 TAO_Base_StreamEndPoint *endpoint,
01064 TAO_AV_Flow_Handler *handler,
01065 TAO_AV_Transport *transport)
01066 {
01067 TAO_AV_Callback *callback = 0;
01068 if( endpoint->get_callback (entry->flowname (), callback) ) {
01069 ACE_ERROR_RETURN ((LM_ERROR, "(%N,%l) Invalid callback\n"), 0);
01070 }
01071
01072 TAO_AV_UDP_Object *object = 0;
01073 ACE_NEW_RETURN (object,
01074 TAO_AV_UDP_Object (callback,
01075 transport),
01076 0);
01077 callback->open (object,
01078 handler);
01079 endpoint->set_protocol_object (entry->flowname (),
01080 object);
01081
01082 endpoint->protocol_object_set ();
01083
01084 return object;
01085 }
01086
01087 TAO_END_VERSIONED_NAMESPACE_DECL
01088
01089 ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_UDP_Flow_Factory)
01090 ACE_STATIC_SVC_DEFINE (TAO_AV_UDP_Flow_Factory,
01091 ACE_TEXT ("UDP_Flow_Factory"),
01092 ACE_SVC_OBJ_T,
01093 &ACE_SVC_NAME (TAO_AV_UDP_Flow_Factory),
01094 ACE_Service_Type::DELETE_THIS |
01095 ACE_Service_Type::DELETE_OBJ,
01096 0)
01097
01098 ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_UDP_Factory)
01099 ACE_STATIC_SVC_DEFINE (TAO_AV_UDP_Factory,
01100 ACE_TEXT ("UDP_Factory"),
01101 ACE_SVC_OBJ_T,
01102 &ACE_SVC_NAME (TAO_AV_UDP_Factory),
01103 ACE_Service_Type::DELETE_THIS |
01104 ACE_Service_Type::DELETE_OBJ,
01105 0)