Transport.cpp

Go to the documentation of this file.
00001 // $Id: Transport.cpp 81401 2008-04-23 18:12:56Z elliott_c $
00002 
00003 #include "orbsvcs/AV/AVStreams_i.h"
00004 #include "orbsvcs/AV/sfp.h"
00005 #include "orbsvcs/AV/MCast.h"
00006 #include "orbsvcs/AV/RTCP.h"
00007 #include "orbsvcs/AV/RTP.h"
00008 #include "orbsvcs/AV/UDP.h"
00009 #include "orbsvcs/AV/TCP.h"
00010 #include "orbsvcs/AV/FlowSpec_Entry.h"
00011 #include "orbsvcs/AV/AV_Core.h"
00012 
00013 #if defined (ACE_HAS_RAPI) || defined (ACE_HAS_WINSOCK2_GQOS)
00014 #include "orbsvcs/AV/QoS_UDP.h"
00015 #endif /* defined (ACE_HAS_RAPI) || defined (ACE_HAS_WINSOCK2_GQOS) */
00016 
00017 #include "tao/debug.h"
00018 
00019 #include "ace/Dynamic_Service.h"
00020 
00021 #if !defined (__ACE_INLINE__)
00022 #include "orbsvcs/AV/Transport.inl"
00023 #endif /* __ACE_INLINE__ */
00024 
00025 
00026 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00027 
00028 //------------------------------------------------------------
00029 // TAO_AV_Transport_Item
00030 //------------------------------------------------------------
00031 TAO_AV_Transport_Item::TAO_AV_Transport_Item (const ACE_CString &name)
00032   :   name_ (name),
00033       factory_ (0)
00034 {
00035 }
00036 
00037 //------------------------------------------------------------
00038 // TAO_AV_Transport_Item
00039 //------------------------------------------------------------
00040 TAO_AV_Flow_Protocol_Item::TAO_AV_Flow_Protocol_Item (const ACE_CString &name)
00041   :   name_ (name),
00042       factory_ (0)
00043 {
00044 }
00045 
00046 //------------------------------------------------------------
00047 // TAO_AV_Connector_Registry
00048 //------------------------------------------------------------
00049 
00050 TAO_AV_Connector_Registry::TAO_AV_Connector_Registry (void)
00051 {
00052 }
00053 
00054 int
00055 TAO_AV_Connector_Registry::open (TAO_Base_StreamEndPoint *endpoint,
00056                                  TAO_AV_Core* av_core,
00057                                  TAO_AV_FlowSpecSet &flow_spec_set)
00058 {
00059 
00060   TAO_AV_FlowSpecSetItor last_flowspec =  flow_spec_set.end ();
00061 
00062   for (TAO_AV_FlowSpecSetItor flow_spec = flow_spec_set.begin ();
00063        flow_spec != last_flowspec;
00064        ++flow_spec)
00065     {
00066       TAO_FlowSpec_Entry *entry = (*flow_spec);
00067       ACE_Addr *address = entry->address ();
00068       const char *flow_protocol = entry->flow_protocol_str ();
00069       const char *transport_protocol = entry->carrier_protocol_str ();
00070 
00071       if (ACE_OS::strcmp (flow_protocol,"") == 0)
00072         flow_protocol = transport_protocol;
00073 
00074       if (address == 0)
00075         {
00076           // Protocol was specified without an endpoint.  According to
00077           // the "iioploc" spec, this is valid.  As such, we extend
00078           // this feature to all pluggable protocols.  All TAO
00079            // pluggable protocols are expected to have the ability to
00080           // create a default endpoint.
00081 
00082           ACE_ERROR_RETURN ((LM_ERROR,
00083                              "Protocol was specified without an endpoint\n"),
00084                             -1);
00085         }
00086       else
00087         {
00088           TAO_AV_Flow_Protocol_Factory *flow_factory =
00089             av_core->get_flow_protocol_factory (flow_protocol);
00090           TAO_AV_Transport_Factory *transport_factory =
00091             av_core->get_transport_factory (transport_protocol);
00092 
00093           if ((flow_factory != 0) && (transport_factory != 0))
00094             {
00095               // @@Naga:Instead of making a new connector every time we should try and see if a connector exists
00096               // for this transport already and hence we can reuse it.
00097 
00098               TAO_AV_Connector *connector = transport_factory->make_connector ();
00099 
00100               if (connector != 0)
00101                 {
00102                   // add connector to list.
00103                   this->connectors_.insert (connector);
00104 
00105                   if (connector->open (endpoint,
00106                                        av_core,
00107                                        flow_factory) == -1)
00108                     return -1;
00109 
00110                   TAO_AV_Transport *transport = 0;
00111                   if (connector->connect (entry,
00112                                           transport,
00113                                           TAO_AV_Core::TAO_AV_DATA) == -1)
00114                     return -1;
00115                   entry->transport (transport);
00116                 }
00117               else
00118                 ACE_ERROR_RETURN ((LM_ERROR,
00119                                    "(%P|%t) Unable to create an "
00120                                    "connector for <%s>\n",
00121                                    entry->flowname ()),
00122                                   -1);
00123 
00124               // Now see if the flow factory has a control flow factory.
00125               TAO_AV_Flow_Protocol_Factory *control_flow_factory =
00126                 av_core->get_flow_protocol_factory(flow_factory->control_flow_factory ());
00127 
00128               if (control_flow_factory != 0)
00129                 {
00130                   TAO_AV_Connector *control_connector =
00131                     transport_factory->make_connector ();
00132 
00133                   if (control_connector != 0)
00134                     {
00135                       // add connector to list.
00136                       this->connectors_.insert (control_connector);
00137 
00138                       if (control_connector->open (endpoint,
00139                                                    av_core,
00140                                                    control_flow_factory) == -1)
00141                         return -1;
00142 
00143                       TAO_AV_Transport *control_transport = 0;
00144                       if (control_connector->connect (entry,
00145                                                       control_transport,
00146                                                       TAO_AV_Core::TAO_AV_CONTROL) == -1)
00147                         return -1;
00148                       entry->control_transport (control_transport);
00149 
00150                       // Now set the control object on the data flow object.
00151                       entry->protocol_object ()->control_object (entry->control_protocol_object ());
00152                     }
00153                   else
00154                     ACE_ERROR_RETURN ((LM_ERROR,
00155                                        "(%P|%t) Unable to create an "
00156                                        "connector for <%s>\n",
00157                                        entry->flowname ()),
00158                                       -1);
00159                 }
00160             }
00161         }
00162     }
00163   return 0;
00164 }
00165 
00166 int
00167 TAO_AV_Connector_Registry::close (TAO_AV_Connector *connector)
00168 {
00169   this->connectors_.remove (connector);
00170 
00171   if (connector != 0)
00172     delete connector;
00173   return 0;
00174 }
00175 
00176 int
00177 TAO_AV_Connector_Registry::close_all (void)
00178 {
00179   for (TAO_AV_ConnectorSetItor i = this->connectors_.begin ();
00180        i != this->connectors_.end ();
00181        ++i)
00182     {
00183       if (*i != 0)
00184         continue;
00185 
00186       (*i)->close ();
00187 
00188       this->close (*i);
00189     }
00190 
00191   this->connectors_.reset ();
00192   return 0;
00193 }
00194 
00195 TAO_AV_Connector_Registry::~TAO_AV_Connector_Registry (void)
00196 {
00197   this->close_all ();
00198 }
00199 
00200 //------------------------------------------------------------
00201 // TAO_AV_Acceptor_Registry
00202 //------------------------------------------------------------
00203 
00204 TAO_AV_Acceptor_Registry::TAO_AV_Acceptor_Registry (void)
00205 {
00206 }
00207 
00208 TAO_AV_Acceptor_Registry::~TAO_AV_Acceptor_Registry (void)
00209 {
00210   this->close_all();
00211 }
00212 
00213 int
00214 TAO_AV_Acceptor_Registry::open (TAO_Base_StreamEndPoint *endpoint,
00215                                 TAO_AV_Core *av_core,
00216                                 TAO_AV_FlowSpecSet &flow_spec_set)
00217 {
00218   int retv = 0;
00219 
00220   if (TAO_debug_level > 0)
00221     ACE_DEBUG ((LM_DEBUG,
00222                 "TAO_AV_Acceptor_Registry::open \n"));
00223 
00224   TAO_AV_FlowSpecSetItor last_flowspec
00225     = flow_spec_set.end ();
00226 
00227   for (TAO_AV_FlowSpecSetItor flow_spec = flow_spec_set.begin ();
00228        flow_spec != last_flowspec;
00229        ++flow_spec)
00230     {
00231       TAO_FlowSpec_Entry *entry = (*flow_spec);
00232       ACE_Addr *address = entry->address ();
00233       const char *flow_protocol = entry->flow_protocol_str ();
00234       const char *transport_protocol = entry->carrier_protocol_str ();
00235 
00236       if (ACE_OS::strcmp (flow_protocol,"") == 0)
00237         flow_protocol = transport_protocol;
00238 
00239       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
00240                                            "TAO_AV_Acceptor_Registry::protocol for flow %s is %s\n",
00241                                            entry->flowname (),
00242                                            transport_protocol));
00243 
00244       if (address == 0)
00245         {
00246           retv = this->open_default (endpoint,
00247                                      av_core,
00248                                      entry);
00249           if(retv < 0)
00250                   return retv;
00251           continue;
00252         }
00253       else
00254         {
00255           TAO_AV_Flow_Protocol_Factory *flow_factory =
00256             av_core->get_flow_protocol_factory (flow_protocol);
00257 
00258           if (flow_protocol != 0)
00259             {
00260               TAO_AV_Transport_Factory *transport_factory =
00261                 av_core->get_transport_factory (transport_protocol);
00262 
00263               if (transport_protocol != 0)
00264                 {
00265                   TAO_AV_Acceptor *acceptor = transport_factory->make_acceptor ();
00266                   if (acceptor != 0)
00267                     {
00268                       // add acceptor to list.
00269                       this->acceptors_.insert (acceptor);
00270 
00271                       if (acceptor->open (endpoint,
00272                                           av_core,
00273                                           entry,
00274                                           flow_factory,
00275                                           TAO_AV_Core::TAO_AV_DATA) == -1)
00276                         return -1;
00277 
00278                       TAO_AV_Flow_Protocol_Factory *control_flow_factory =
00279                         av_core->get_flow_protocol_factory (flow_factory->control_flow_factory ());
00280 
00281                       if (control_flow_factory != 0)
00282                         {
00283                           TAO_AV_Acceptor *acceptor = transport_factory->make_acceptor ();
00284                           if (acceptor != 0)
00285                             {
00286                               if (acceptor->open (endpoint,
00287                                                   av_core,
00288                                                   entry,
00289                                                   control_flow_factory,
00290                                                   TAO_AV_Core::TAO_AV_CONTROL) == -1)
00291                                 return -1;
00292                               // add acceptor to list.
00293                               this->acceptors_.insert (acceptor);
00294 
00295                               entry->protocol_object ()->control_object (entry->control_protocol_object ());
00296 
00297                              }
00298                            else
00299                              ACE_ERROR_RETURN ((LM_ERROR,
00300                                                 "(%P|%t) Unable to create an "
00301                                                 "acceptor for <%s>\n",
00302                                                 entry->flowname ()),
00303                                                -1);
00304                         }
00305                     }
00306                   else
00307                     ACE_ERROR_RETURN ((LM_ERROR,
00308                                        "(%P|%t) Unable to create an "
00309                                        "acceptor for <%s>\n",
00310                                        entry->flowname ()),
00311                                       -1);
00312                 }
00313             }
00314         }
00315     }
00316   return 0;
00317 }
00318 
00319 int
00320 TAO_AV_Acceptor_Registry::open_default (TAO_Base_StreamEndPoint *endpoint,
00321                                         TAO_AV_Core *av_core,
00322                                         TAO_FlowSpec_Entry *entry)
00323 {
00324   if (TAO_debug_level > 0)
00325     ACE_DEBUG ((LM_DEBUG,
00326                 "TAO_AV_Acceptor_Registry::open_default "));
00327 
00328   // No endpoints were specified, we let each protocol pick its own
00329   // default...
00330 
00331   const char *flow_protocol = entry->flow_protocol_str ();
00332   const char *transport_protocol = entry->carrier_protocol_str ();
00333 
00334   if (ACE_OS::strcmp (flow_protocol,"") == 0)
00335     flow_protocol = transport_protocol;
00336 
00337   TAO_AV_Flow_Protocol_Factory *flow_factory =
00338     av_core->get_flow_protocol_factory (flow_protocol);
00339 
00340   // No matching flow protocol.
00341   if (flow_factory == 0)
00342     ACE_ERROR_RETURN ((LM_ERROR,
00343                        "TAO (%P|%t) (%N,%l) Unable to match protocol prefix "
00344                        "for <%s>\n",
00345                        flow_protocol),
00346                       -1);
00347 
00348   if (TAO_debug_level > 0)
00349     ACE_DEBUG((LM_DEBUG, "(%N,%l) Matched flow_protocol: %s, Looking for transport protocol: %s\n", flow_protocol, transport_protocol));
00350 
00351   TAO_AV_Transport_Factory *transport_factory =
00352     av_core->get_transport_factory (transport_protocol);
00353 
00354   if (transport_factory == 0)
00355     ACE_ERROR_RETURN ((LM_ERROR,
00356                         "TAO (%P|%t) (%N,%l) Unable to match protocol prefix "
00357                         "for <%s>\n",
00358                         transport_protocol),
00359                        -1);
00360 
00361   // make an acceptor
00362   TAO_AV_Acceptor *acceptor =
00363     transport_factory->make_acceptor();
00364 
00365   if (acceptor == 0)
00366     ACE_ERROR_RETURN ((LM_ERROR,
00367                         "TAO (%P|%t) unable to create "
00368                         "an acceptor for <%d>\n",
00369                         transport_protocol),
00370                        -1);
00371 
00372   if (acceptor->open_default (endpoint,
00373                               av_core,
00374                               entry,
00375                               flow_factory,
00376                               TAO_AV_Core::TAO_AV_DATA) == -1)
00377     ACE_ERROR_RETURN ((LM_ERROR,
00378                        "TAO (%P|%t) unable to open "
00379                        "default acceptor for <%s>%p\n",
00380                        flow_protocol),
00381                       -1);
00382 
00383   this->acceptors_.insert (acceptor);
00384 
00385   const char *control_flow_factory_name = flow_factory->control_flow_factory ();
00386 
00387   if (control_flow_factory_name != 0)
00388     {
00389 
00390       TAO_AV_Flow_Protocol_Factory *control_flow_factory =
00391         av_core->get_flow_protocol_factory (control_flow_factory_name);
00392 
00393       if (control_flow_factory == 0)
00394         ACE_ERROR_RETURN ((LM_ERROR,
00395                            "TAO (%P|%t) Unable to match control flow "
00396                            "for <%s>\n",
00397                            control_flow_factory_name),
00398                           -1);
00399 
00400       TAO_AV_Acceptor *control_acceptor = transport_factory->make_acceptor ();
00401 
00402       if (control_acceptor == 0)
00403         ACE_ERROR_RETURN ((LM_ERROR,
00404                            "TAO (%P|%t) unable to create "
00405                            "an acceptor for <%d>\n",
00406                            transport_protocol),
00407                           -1);
00408 
00409       if (control_acceptor->open_default (endpoint,
00410                                           av_core,
00411                                           entry,
00412                                           control_flow_factory,
00413                                           TAO_AV_Core::TAO_AV_CONTROL) == -1)
00414         ACE_ERROR_RETURN ((LM_ERROR,
00415                            "TAO (%P|%t) unable to open "
00416                            "default acceptor for <%s>%p\n",
00417                            transport_protocol),
00418                           -1);
00419 
00420       this->acceptors_.insert (control_acceptor);
00421 
00422       entry->protocol_object ()->control_object (entry->control_protocol_object ());
00423     }
00424 
00425   if (this->acceptors_.size () == 0)
00426     {
00427       if (TAO_debug_level > 0)
00428         ACE_ERROR ((LM_ERROR,
00429                     "TAO (%P%t) cannot create any default acceptor\n"));
00430       return -1;
00431     }
00432 
00433   return 0;
00434 }
00435 
00436 int
00437 TAO_AV_Acceptor_Registry::close (TAO_AV_Acceptor *acceptor)
00438 {
00439   this->acceptors_.remove (acceptor);
00440   delete acceptor;
00441 
00442   return 0;
00443 }
00444 
00445 int
00446 TAO_AV_Acceptor_Registry::close_all (void)
00447 {
00448   for (TAO_AV_AcceptorSetItor i = this->acceptors_.begin ();
00449        i != this->acceptors_.end ();
00450        ++i)
00451     {
00452       if (*i == 0)
00453         continue;
00454 
00455       (*i)->close ();
00456 
00457       delete *i;
00458     }
00459 
00460   this->acceptors_.reset ();
00461   return 0;
00462 }
00463 
00464 //----------------------------------------------------------------------
00465 // TAO_AV_Transport
00466 //----------------------------------------------------------------------
00467 
00468 TAO_AV_Transport::TAO_AV_Transport (void)
00469 {
00470 }
00471 
00472 // Virtual destructor.
00473 TAO_AV_Transport::~TAO_AV_Transport (void)
00474 {
00475 }
00476 
00477 ACE_Addr*
00478 TAO_AV_Transport::get_local_addr (void)
00479 {
00480   return 0;
00481 }
00482 
00483 //----------------------------------------------------------------------
00484 // TAO_AV_Flow_Handler
00485 //----------------------------------------------------------------------
00486 
00487 //TAO_AV_Flow_Handler::TAO_AV_Flow_Handler (TAO_AV_Callback *callback)
00488 TAO_AV_Flow_Handler::TAO_AV_Flow_Handler (void)
00489   :transport_ (0),
00490    callback_ (0),
00491    protocol_object_ (0),
00492    timer_id_ (-1)
00493 {
00494 }
00495 
00496 TAO_AV_Flow_Handler::~TAO_AV_Flow_Handler(void)
00497 {
00498   // cancel the timer (if there is one)
00499   this->cancel_timer();
00500 }
00501 
00502 int
00503 TAO_AV_Flow_Handler::set_remote_address (ACE_Addr * /* address */)
00504 {
00505   return 0;
00506 }
00507 
00508 int
00509 TAO_AV_Flow_Handler::start (TAO_FlowSpec_Entry::Role role)
00510 {
00511   this->callback_->handle_start ();
00512   switch (role)
00513     {
00514       // only for producer we register for the timeout.
00515     case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
00516       {
00517         this->schedule_timer ();
00518       }
00519       break;
00520     default:
00521       break;
00522     }
00523   return 0;
00524 }
00525 
00526 int
00527 TAO_AV_Flow_Handler::schedule_timer (void)
00528 {
00529   ACE_Event_Handler *event_handler = this->event_handler ();
00530   ACE_Time_Value *tv = 0;
00531 
00532   this->callback_->get_timeout (tv, this->timeout_arg_);
00533   if (tv == 0)
00534     return 0;
00535 
00536   this->timer_id_ =
00537       TAO_AV_CORE::instance()->reactor ()->schedule_timer (event_handler,
00538                                                            0,
00539                                                            *tv);
00540 
00541   if (this->timer_id_ < 0)
00542     return -1;
00543 
00544   return 0;
00545 }
00546 
00547 
00548 int
00549 TAO_AV_Flow_Handler::cancel_timer (void)
00550 {
00551   if (this->timer_id_ != -1)
00552   return TAO_AV_CORE::instance()->reactor ()->cancel_timer (this->timer_id_);
00553   else
00554     return 0;
00555 }
00556 
00557 
00558 int
00559 TAO_AV_Flow_Handler::stop (TAO_FlowSpec_Entry::Role role)
00560 {
00561   this->callback_->handle_stop ();
00562   switch (role)
00563     {
00564     case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
00565       {
00566         int result =  this->event_handler ()->reactor ()->cancel_timer (this->timer_id_);
00567         if (result <  0)
00568           if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_Flow_Handler::stop:cancel_timer failed\n"));
00569       }
00570       break;
00571     default:
00572       break;
00573     }
00574   return 0;
00575 }
00576 
00577 int
00578 TAO_AV_Flow_Handler::handle_timeout (const ACE_Time_Value & /*tv*/,
00579                                      const void * /*arg*/)
00580 {
00581   int result = this->callback_->handle_timeout (this->timeout_arg_);
00582   if (result < 0)
00583     return result;
00584   ACE_Event_Handler *event_handler = this->event_handler ();
00585   ACE_Time_Value *timeout = 0;
00586 
00587   this->callback_->get_timeout (timeout,  this->timeout_arg_);
00588   if (timeout == 0)
00589     return 0;
00590 
00591   this->timer_id_ =  event_handler->reactor ()->schedule_timer (event_handler,
00592                                                                 0,
00593                                                                 *timeout);
00594 
00595   if (this->timer_id_ < 0)
00596     return -1;
00597 
00598   return 0;
00599 }
00600 
00601 int
00602 TAO_AV_Flow_Handler::change_qos (AVStreams::QoS)
00603 {
00604   return 0;
00605 }
00606 
00607 TAO_AV_Transport*
00608 TAO_AV_Flow_Handler::transport (void)
00609 {
00610   return this->transport_;
00611 }
00612 
00613 void
00614 TAO_AV_Flow_Handler::protocol_object (TAO_AV_Protocol_Object *protocol_object)
00615 {
00616   this->protocol_object_ = protocol_object;
00617 }
00618 
00619 TAO_AV_Protocol_Object*
00620 TAO_AV_Flow_Handler::protocol_object (void)
00621 {
00622   return this->protocol_object_;
00623 }
00624 
00625 void
00626 TAO_AV_Flow_Handler::callback (TAO_AV_Callback *callback)
00627 {
00628   this->callback_ = callback;
00629 }
00630 
00631 // TAO_AV_Connector
00632 TAO_AV_Connector::TAO_AV_Connector (void)
00633 {
00634 }
00635 
00636 TAO_AV_Connector::~TAO_AV_Connector (void)
00637 {
00638 }
00639 
00640 // TAO_AV_Acceptor
00641 TAO_AV_Acceptor::TAO_AV_Acceptor (void)
00642 {
00643 }
00644 
00645 TAO_AV_Acceptor::~TAO_AV_Acceptor (void)
00646 {
00647 }
00648 
00649 TAO_AV_Transport_Factory::TAO_AV_Transport_Factory (void)
00650  : ref_count (0)
00651 {
00652 }
00653 
00654 TAO_AV_Transport_Factory::~TAO_AV_Transport_Factory (void)
00655 {
00656 }
00657 
00658 int
00659 TAO_AV_Transport_Factory::init (int /* argc */,
00660                                 char * /* argv */ [])
00661 {
00662   return -1;
00663 }
00664 
00665 int
00666 TAO_AV_Transport_Factory::match_protocol (const char * /* protocol_string */)
00667 {
00668   return 0;
00669 }
00670 
00671 TAO_AV_Acceptor *
00672 TAO_AV_Transport_Factory::make_acceptor (void)
00673 {
00674   return 0;
00675 }
00676 
00677 TAO_AV_Connector *
00678 TAO_AV_Transport_Factory::make_connector (void)
00679 {
00680   return 0;
00681 }
00682 
00683 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:47:49 2010 for TAO_AV by  doxygen 1.4.7