00001
00002
00003 #include "orbsvcs/AV/AVStreams_i.h"
00004 #include "orbsvcs/AV/sfp.h"
00005 #include "orbsvcs/AV/MCast.h"
00006 #include "orbsvcs/AV/RTCP.h"
00007 #include "orbsvcs/AV/RTP.h"
00008 #include "orbsvcs/AV/UDP.h"
00009 #include "orbsvcs/AV/TCP.h"
00010 #include "orbsvcs/AV/FlowSpec_Entry.h"
00011 #include "orbsvcs/AV/AV_Core.h"
00012
00013 #if defined (ACE_HAS_RAPI) || defined (ACE_HAS_WINSOCK2_GQOS)
00014 #include "orbsvcs/AV/QoS_UDP.h"
00015 #endif
00016
00017 #include "tao/debug.h"
00018
00019 #include "ace/Dynamic_Service.h"
00020
00021 #if !defined (__ACE_INLINE__)
00022 #include "orbsvcs/AV/Transport.inl"
00023 #endif
00024
00025
00026 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00027
00028
00029
00030
00031 TAO_AV_Transport_Item::TAO_AV_Transport_Item (const ACE_CString &name)
00032 : name_ (name),
00033 factory_ (0)
00034 {
00035 }
00036
00037
00038
00039
00040 TAO_AV_Flow_Protocol_Item::TAO_AV_Flow_Protocol_Item (const ACE_CString &name)
00041 : name_ (name),
00042 factory_ (0)
00043 {
00044 }
00045
00046
00047
00048
00049
00050 TAO_AV_Connector_Registry::TAO_AV_Connector_Registry (void)
00051 {
00052 }
00053
00054 int
00055 TAO_AV_Connector_Registry::open (TAO_Base_StreamEndPoint *endpoint,
00056 TAO_AV_Core* av_core,
00057 TAO_AV_FlowSpecSet &flow_spec_set)
00058 {
00059
00060 TAO_AV_FlowSpecSetItor last_flowspec = flow_spec_set.end ();
00061
00062 for (TAO_AV_FlowSpecSetItor flow_spec = flow_spec_set.begin ();
00063 flow_spec != last_flowspec;
00064 ++flow_spec)
00065 {
00066 TAO_FlowSpec_Entry *entry = (*flow_spec);
00067 ACE_Addr *address = entry->address ();
00068 const char *flow_protocol = entry->flow_protocol_str ();
00069 const char *transport_protocol = entry->carrier_protocol_str ();
00070
00071 if (ACE_OS::strcmp (flow_protocol,"") == 0)
00072 flow_protocol = transport_protocol;
00073
00074 if (address == 0)
00075 {
00076
00077
00078
00079
00080
00081
00082 ACE_ERROR_RETURN ((LM_ERROR,
00083 "Protocol was specified without an endpoint\n"),
00084 -1);
00085 }
00086 else
00087 {
00088 TAO_AV_Flow_Protocol_Factory *flow_factory =
00089 av_core->get_flow_protocol_factory (flow_protocol);
00090 TAO_AV_Transport_Factory *transport_factory =
00091 av_core->get_transport_factory (transport_protocol);
00092
00093 if ((flow_factory != 0) && (transport_factory != 0))
00094 {
00095
00096
00097
00098 TAO_AV_Connector *connector = transport_factory->make_connector ();
00099
00100 if (connector != 0)
00101 {
00102
00103 this->connectors_.insert (connector);
00104
00105 if (connector->open (endpoint,
00106 av_core,
00107 flow_factory) == -1)
00108 return -1;
00109
00110 TAO_AV_Transport *transport = 0;
00111 if (connector->connect (entry,
00112 transport,
00113 TAO_AV_Core::TAO_AV_DATA) == -1)
00114 return -1;
00115 entry->transport (transport);
00116 }
00117 else
00118 ACE_ERROR_RETURN ((LM_ERROR,
00119 "(%P|%t) Unable to create an "
00120 "connector for <%s>\n",
00121 entry->flowname ()),
00122 -1);
00123
00124
00125 TAO_AV_Flow_Protocol_Factory *control_flow_factory =
00126 av_core->get_flow_protocol_factory(flow_factory->control_flow_factory ());
00127
00128 if (control_flow_factory != 0)
00129 {
00130 TAO_AV_Connector *control_connector =
00131 transport_factory->make_connector ();
00132
00133 if (control_connector != 0)
00134 {
00135
00136 this->connectors_.insert (control_connector);
00137
00138 if (control_connector->open (endpoint,
00139 av_core,
00140 control_flow_factory) == -1)
00141 return -1;
00142
00143 TAO_AV_Transport *control_transport = 0;
00144 if (control_connector->connect (entry,
00145 control_transport,
00146 TAO_AV_Core::TAO_AV_CONTROL) == -1)
00147 return -1;
00148 entry->control_transport (control_transport);
00149
00150
00151 entry->protocol_object ()->control_object (entry->control_protocol_object ());
00152 }
00153 else
00154 ACE_ERROR_RETURN ((LM_ERROR,
00155 "(%P|%t) Unable to create an "
00156 "connector for <%s>\n",
00157 entry->flowname ()),
00158 -1);
00159 }
00160 }
00161 }
00162 }
00163 return 0;
00164 }
00165
00166 int
00167 TAO_AV_Connector_Registry::close (TAO_AV_Connector *connector)
00168 {
00169 this->connectors_.remove (connector);
00170
00171 if (connector != 0)
00172 delete connector;
00173 return 0;
00174 }
00175
00176 int
00177 TAO_AV_Connector_Registry::close_all (void)
00178 {
00179 for (TAO_AV_ConnectorSetItor i = this->connectors_.begin ();
00180 i != this->connectors_.end ();
00181 ++i)
00182 {
00183 if (*i != 0)
00184 continue;
00185
00186 (*i)->close ();
00187
00188 this->close (*i);
00189 }
00190
00191 this->connectors_.reset ();
00192 return 0;
00193 }
00194
00195 TAO_AV_Connector_Registry::~TAO_AV_Connector_Registry (void)
00196 {
00197 this->close_all ();
00198 }
00199
00200
00201
00202
00203
00204 TAO_AV_Acceptor_Registry::TAO_AV_Acceptor_Registry (void)
00205 {
00206 }
00207
00208 TAO_AV_Acceptor_Registry::~TAO_AV_Acceptor_Registry (void)
00209 {
00210 this->close_all();
00211 }
00212
00213 int
00214 TAO_AV_Acceptor_Registry::open (TAO_Base_StreamEndPoint *endpoint,
00215 TAO_AV_Core *av_core,
00216 TAO_AV_FlowSpecSet &flow_spec_set)
00217 {
00218 int retv = 0;
00219
00220 if (TAO_debug_level > 0)
00221 ACE_DEBUG ((LM_DEBUG,
00222 "TAO_AV_Acceptor_Registry::open \n"));
00223
00224 TAO_AV_FlowSpecSetItor last_flowspec
00225 = flow_spec_set.end ();
00226
00227 for (TAO_AV_FlowSpecSetItor flow_spec = flow_spec_set.begin ();
00228 flow_spec != last_flowspec;
00229 ++flow_spec)
00230 {
00231 TAO_FlowSpec_Entry *entry = (*flow_spec);
00232 ACE_Addr *address = entry->address ();
00233 const char *flow_protocol = entry->flow_protocol_str ();
00234 const char *transport_protocol = entry->carrier_protocol_str ();
00235
00236 if (ACE_OS::strcmp (flow_protocol,"") == 0)
00237 flow_protocol = transport_protocol;
00238
00239 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
00240 "TAO_AV_Acceptor_Registry::protocol for flow %s is %s\n",
00241 entry->flowname (),
00242 transport_protocol));
00243
00244 if (address == 0)
00245 {
00246 retv = this->open_default (endpoint,
00247 av_core,
00248 entry);
00249 if(retv < 0)
00250 return retv;
00251 continue;
00252 }
00253 else
00254 {
00255 TAO_AV_Flow_Protocol_Factory *flow_factory =
00256 av_core->get_flow_protocol_factory (flow_protocol);
00257
00258 if (flow_protocol != 0)
00259 {
00260 TAO_AV_Transport_Factory *transport_factory =
00261 av_core->get_transport_factory (transport_protocol);
00262
00263 if (transport_protocol != 0)
00264 {
00265 TAO_AV_Acceptor *acceptor = transport_factory->make_acceptor ();
00266 if (acceptor != 0)
00267 {
00268
00269 this->acceptors_.insert (acceptor);
00270
00271 if (acceptor->open (endpoint,
00272 av_core,
00273 entry,
00274 flow_factory,
00275 TAO_AV_Core::TAO_AV_DATA) == -1)
00276 return -1;
00277
00278 TAO_AV_Flow_Protocol_Factory *control_flow_factory =
00279 av_core->get_flow_protocol_factory (flow_factory->control_flow_factory ());
00280
00281 if (control_flow_factory != 0)
00282 {
00283 TAO_AV_Acceptor *acceptor = transport_factory->make_acceptor ();
00284 if (acceptor != 0)
00285 {
00286 if (acceptor->open (endpoint,
00287 av_core,
00288 entry,
00289 control_flow_factory,
00290 TAO_AV_Core::TAO_AV_CONTROL) == -1)
00291 return -1;
00292
00293 this->acceptors_.insert (acceptor);
00294
00295 entry->protocol_object ()->control_object (entry->control_protocol_object ());
00296
00297 }
00298 else
00299 ACE_ERROR_RETURN ((LM_ERROR,
00300 "(%P|%t) Unable to create an "
00301 "acceptor for <%s>\n",
00302 entry->flowname ()),
00303 -1);
00304 }
00305 }
00306 else
00307 ACE_ERROR_RETURN ((LM_ERROR,
00308 "(%P|%t) Unable to create an "
00309 "acceptor for <%s>\n",
00310 entry->flowname ()),
00311 -1);
00312 }
00313 }
00314 }
00315 }
00316 return 0;
00317 }
00318
00319 int
00320 TAO_AV_Acceptor_Registry::open_default (TAO_Base_StreamEndPoint *endpoint,
00321 TAO_AV_Core *av_core,
00322 TAO_FlowSpec_Entry *entry)
00323 {
00324 if (TAO_debug_level > 0)
00325 ACE_DEBUG ((LM_DEBUG,
00326 "TAO_AV_Acceptor_Registry::open_default "));
00327
00328
00329
00330
00331 const char *flow_protocol = entry->flow_protocol_str ();
00332 const char *transport_protocol = entry->carrier_protocol_str ();
00333
00334 if (ACE_OS::strcmp (flow_protocol,"") == 0)
00335 flow_protocol = transport_protocol;
00336
00337 TAO_AV_Flow_Protocol_Factory *flow_factory =
00338 av_core->get_flow_protocol_factory (flow_protocol);
00339
00340
00341 if (flow_factory == 0)
00342 ACE_ERROR_RETURN ((LM_ERROR,
00343 "TAO (%P|%t) (%N,%l) Unable to match protocol prefix "
00344 "for <%s>\n",
00345 flow_protocol),
00346 -1);
00347
00348 if (TAO_debug_level > 0)
00349 ACE_DEBUG((LM_DEBUG, "(%N,%l) Matched flow_protocol: %s, Looking for transport protocol: %s\n", flow_protocol, transport_protocol));
00350
00351 TAO_AV_Transport_Factory *transport_factory =
00352 av_core->get_transport_factory (transport_protocol);
00353
00354 if (transport_factory == 0)
00355 ACE_ERROR_RETURN ((LM_ERROR,
00356 "TAO (%P|%t) (%N,%l) Unable to match protocol prefix "
00357 "for <%s>\n",
00358 transport_protocol),
00359 -1);
00360
00361
00362 TAO_AV_Acceptor *acceptor =
00363 transport_factory->make_acceptor();
00364
00365 if (acceptor == 0)
00366 ACE_ERROR_RETURN ((LM_ERROR,
00367 "TAO (%P|%t) unable to create "
00368 "an acceptor for <%d>\n",
00369 transport_protocol),
00370 -1);
00371
00372 if (acceptor->open_default (endpoint,
00373 av_core,
00374 entry,
00375 flow_factory,
00376 TAO_AV_Core::TAO_AV_DATA) == -1)
00377 ACE_ERROR_RETURN ((LM_ERROR,
00378 "TAO (%P|%t) unable to open "
00379 "default acceptor for <%s>%p\n",
00380 flow_protocol),
00381 -1);
00382
00383 this->acceptors_.insert (acceptor);
00384
00385 const char *control_flow_factory_name = flow_factory->control_flow_factory ();
00386
00387 if (control_flow_factory_name != 0)
00388 {
00389
00390 TAO_AV_Flow_Protocol_Factory *control_flow_factory =
00391 av_core->get_flow_protocol_factory (control_flow_factory_name);
00392
00393 if (control_flow_factory == 0)
00394 ACE_ERROR_RETURN ((LM_ERROR,
00395 "TAO (%P|%t) Unable to match control flow "
00396 "for <%s>\n",
00397 control_flow_factory_name),
00398 -1);
00399
00400 TAO_AV_Acceptor *control_acceptor = transport_factory->make_acceptor ();
00401
00402 if (control_acceptor == 0)
00403 ACE_ERROR_RETURN ((LM_ERROR,
00404 "TAO (%P|%t) unable to create "
00405 "an acceptor for <%d>\n",
00406 transport_protocol),
00407 -1);
00408
00409 if (control_acceptor->open_default (endpoint,
00410 av_core,
00411 entry,
00412 control_flow_factory,
00413 TAO_AV_Core::TAO_AV_CONTROL) == -1)
00414 ACE_ERROR_RETURN ((LM_ERROR,
00415 "TAO (%P|%t) unable to open "
00416 "default acceptor for <%s>%p\n",
00417 transport_protocol),
00418 -1);
00419
00420 this->acceptors_.insert (control_acceptor);
00421
00422 entry->protocol_object ()->control_object (entry->control_protocol_object ());
00423 }
00424
00425 if (this->acceptors_.size () == 0)
00426 {
00427 if (TAO_debug_level > 0)
00428 ACE_ERROR ((LM_ERROR,
00429 "TAO (%P%t) cannot create any default acceptor\n"));
00430 return -1;
00431 }
00432
00433 return 0;
00434 }
00435
00436 int
00437 TAO_AV_Acceptor_Registry::close (TAO_AV_Acceptor *acceptor)
00438 {
00439 this->acceptors_.remove (acceptor);
00440 delete acceptor;
00441
00442 return 0;
00443 }
00444
00445 int
00446 TAO_AV_Acceptor_Registry::close_all (void)
00447 {
00448 for (TAO_AV_AcceptorSetItor i = this->acceptors_.begin ();
00449 i != this->acceptors_.end ();
00450 ++i)
00451 {
00452 if (*i == 0)
00453 continue;
00454
00455 (*i)->close ();
00456
00457 delete *i;
00458 }
00459
00460 this->acceptors_.reset ();
00461 return 0;
00462 }
00463
00464
00465
00466
00467
00468 TAO_AV_Transport::TAO_AV_Transport (void)
00469 {
00470 }
00471
00472
00473 TAO_AV_Transport::~TAO_AV_Transport (void)
00474 {
00475 }
00476
00477 ACE_Addr*
00478 TAO_AV_Transport::get_local_addr (void)
00479 {
00480 return 0;
00481 }
00482
00483
00484
00485
00486
00487
00488 TAO_AV_Flow_Handler::TAO_AV_Flow_Handler (void)
00489 :transport_ (0),
00490 callback_ (0),
00491 protocol_object_ (0),
00492 timer_id_ (-1)
00493 {
00494 }
00495
00496 TAO_AV_Flow_Handler::~TAO_AV_Flow_Handler(void)
00497 {
00498
00499 this->cancel_timer();
00500 }
00501
00502 int
00503 TAO_AV_Flow_Handler::set_remote_address (ACE_Addr * )
00504 {
00505 return 0;
00506 }
00507
00508 int
00509 TAO_AV_Flow_Handler::start (TAO_FlowSpec_Entry::Role role)
00510 {
00511 this->callback_->handle_start ();
00512 switch (role)
00513 {
00514
00515 case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
00516 {
00517 this->schedule_timer ();
00518 }
00519 break;
00520 default:
00521 break;
00522 }
00523 return 0;
00524 }
00525
00526 int
00527 TAO_AV_Flow_Handler::schedule_timer (void)
00528 {
00529 ACE_Event_Handler *event_handler = this->event_handler ();
00530 ACE_Time_Value *tv = 0;
00531
00532 this->callback_->get_timeout (tv, this->timeout_arg_);
00533 if (tv == 0)
00534 return 0;
00535
00536 this->timer_id_ =
00537 TAO_AV_CORE::instance()->reactor ()->schedule_timer (event_handler,
00538 0,
00539 *tv);
00540
00541 if (this->timer_id_ < 0)
00542 return -1;
00543
00544 return 0;
00545 }
00546
00547
00548 int
00549 TAO_AV_Flow_Handler::cancel_timer (void)
00550 {
00551 if (this->timer_id_ != -1)
00552 return TAO_AV_CORE::instance()->reactor ()->cancel_timer (this->timer_id_);
00553 else
00554 return 0;
00555 }
00556
00557
00558 int
00559 TAO_AV_Flow_Handler::stop (TAO_FlowSpec_Entry::Role role)
00560 {
00561 this->callback_->handle_stop ();
00562 switch (role)
00563 {
00564 case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
00565 {
00566 int result = this->event_handler ()->reactor ()->cancel_timer (this->timer_id_);
00567 if (result < 0)
00568 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_Flow_Handler::stop:cancel_timer failed\n"));
00569 }
00570 break;
00571 default:
00572 break;
00573 }
00574 return 0;
00575 }
00576
00577 int
00578 TAO_AV_Flow_Handler::handle_timeout (const ACE_Time_Value & ,
00579 const void * )
00580 {
00581 int result = this->callback_->handle_timeout (this->timeout_arg_);
00582 if (result < 0)
00583 return result;
00584 ACE_Event_Handler *event_handler = this->event_handler ();
00585 ACE_Time_Value *timeout = 0;
00586
00587 this->callback_->get_timeout (timeout, this->timeout_arg_);
00588 if (timeout == 0)
00589 return 0;
00590
00591 this->timer_id_ = event_handler->reactor ()->schedule_timer (event_handler,
00592 0,
00593 *timeout);
00594
00595 if (this->timer_id_ < 0)
00596 return -1;
00597
00598 return 0;
00599 }
00600
00601 int
00602 TAO_AV_Flow_Handler::change_qos (AVStreams::QoS)
00603 {
00604 return 0;
00605 }
00606
00607 TAO_AV_Transport*
00608 TAO_AV_Flow_Handler::transport (void)
00609 {
00610 return this->transport_;
00611 }
00612
00613 void
00614 TAO_AV_Flow_Handler::protocol_object (TAO_AV_Protocol_Object *protocol_object)
00615 {
00616 this->protocol_object_ = protocol_object;
00617 }
00618
00619 TAO_AV_Protocol_Object*
00620 TAO_AV_Flow_Handler::protocol_object (void)
00621 {
00622 return this->protocol_object_;
00623 }
00624
00625 void
00626 TAO_AV_Flow_Handler::callback (TAO_AV_Callback *callback)
00627 {
00628 this->callback_ = callback;
00629 }
00630
00631
00632 TAO_AV_Connector::TAO_AV_Connector (void)
00633 {
00634 }
00635
00636 TAO_AV_Connector::~TAO_AV_Connector (void)
00637 {
00638 }
00639
00640
00641 TAO_AV_Acceptor::TAO_AV_Acceptor (void)
00642 {
00643 }
00644
00645 TAO_AV_Acceptor::~TAO_AV_Acceptor (void)
00646 {
00647 }
00648
00649 TAO_AV_Transport_Factory::TAO_AV_Transport_Factory (void)
00650 : ref_count (0)
00651 {
00652 }
00653
00654 TAO_AV_Transport_Factory::~TAO_AV_Transport_Factory (void)
00655 {
00656 }
00657
00658 int
00659 TAO_AV_Transport_Factory::init (int ,
00660 char * [])
00661 {
00662 return -1;
00663 }
00664
00665 int
00666 TAO_AV_Transport_Factory::match_protocol (const char * )
00667 {
00668 return 0;
00669 }
00670
00671 TAO_AV_Acceptor *
00672 TAO_AV_Transport_Factory::make_acceptor (void)
00673 {
00674 return 0;
00675 }
00676
00677 TAO_AV_Connector *
00678 TAO_AV_Transport_Factory::make_connector (void)
00679 {
00680 return 0;
00681 }
00682
00683 TAO_END_VERSIONED_NAMESPACE_DECL