AVStreams_i.cpp

Go to the documentation of this file.
00001 
00002 //=============================================================================
00003 /**
00004  *  @file   AVStreams_i.cpp
00005  *
00006  *  $Id: AVStreams_i.cpp 78820 2007-07-07 20:06:46Z sowayaa $
00007  *
00008  *  @author Sumedh Mungee <sumedh@cs.wustl.edu> Nagarajan Surendran <naga@cs.wustl.edu>
00009  */
00010 //=============================================================================
00011 
00012 
00013 #include "orbsvcs/AV/AVStreams_i.h"
00014 #include "orbsvcs/AV/sfp.h"
00015 #include "orbsvcs/AV/MCast.h"
00016 #include "orbsvcs/AV/RTCP.h"
00017 
00018 #include "tao/debug.h"
00019 #include "tao/ORB_Core.h"
00020 #include "tao/AnyTypeCode/Any.h"
00021 #include "ace/OS_NS_arpa_inet.h"
00022 
00023 #if !defined (__ACE_INLINE__)
00024 #include "orbsvcs/AV/AVStreams_i.inl"
00025 #endif /* __ACE_INLINE__ */
00026 
00027 ACE_RCSID (AV,
00028            AVStreams_i,
00029            "$Id: AVStreams_i.cpp 78820 2007-07-07 20:06:46Z sowayaa $")
00030 
00031 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00032 
00033 //------------------------------------------------------------
00034 // TAO_AV_Qos
00035 //------------------------------------------------------------
00036 
00037 TAO_AV_QoS::TAO_AV_QoS (void)
00038 {
00039 }
00040 
00041 TAO_AV_QoS::TAO_AV_QoS (AVStreams::streamQoS &stream_qos)
00042 {
00043   this->set (stream_qos);
00044 }
00045 
00046 int
00047 TAO_AV_QoS::convert (AVStreams::streamQoS &/*network_qos*/)
00048 {
00049   return -1;
00050 }
00051 
00052 
00053 // ----------------------------------------------------------------------
00054 // AV_Null_MediaCtrl
00055 // ----------------------------------------------------------------------
00056 AV_Null_MediaCtrl::AV_Null_MediaCtrl (void)
00057 {
00058 }
00059 
00060 AV_Null_MediaCtrl::~AV_Null_MediaCtrl (void)
00061 {
00062 }
00063 
00064 
00065 // ----------------------------------------------------------------------
00066 // TAO_Basic_StreamCtrl
00067 // ----------------------------------------------------------------------
00068 
00069 // Constructor
00070 TAO_Basic_StreamCtrl::TAO_Basic_StreamCtrl (void)
00071   :flow_count_ (0)
00072 {
00073 }
00074 
00075 
00076 // Stop the transfer of data of the stream
00077 // Empty the_spec means apply operation to all flows
00078 void
00079 TAO_Basic_StreamCtrl::stop (const AVStreams::flowSpec &flow_spec)
00080 {
00081   try
00082     {
00083       // @@Call stop on the Related MediaCtrl.  call stop on the flow
00084       // connections.
00085       if (this->flow_connection_map_.current_size () > 0)
00086         {
00087           if (flow_spec.length () > 0)
00088             for (u_int i=0;i<flow_spec.length ();i++)
00089               {
00090                 char *flowname = TAO_AV_Core::get_flowname (flow_spec[i]);
00091                 ACE_CString flow_name_key (flowname);
00092                 AVStreams::FlowConnection_var flow_connection_entry;
00093                 if (this->flow_connection_map_.find (flow_name_key,
00094                                                      flow_connection_entry) == 0)
00095                   {
00096                     flow_connection_entry->stop ();
00097                   }
00098               }
00099           else
00100             {
00101               // call stop on all the flows.
00102               FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
00103               FlowConnection_Map_Entry *entry;
00104               for (;iterator.next (entry) !=  0;iterator.advance ())
00105                 {
00106                   entry->int_id_->stop ();
00107                 }
00108             }
00109         }
00110     }
00111   catch(AVStreams::noSuchFlow& ex)
00112       {
00113       throw; //ACS mod 2007-08
00114       }
00115   catch (const CORBA::Exception& ex)
00116     {
00117     ex._tao_print_exception ("TAO_Basic_StreamCtrl::stop");
00118     throw;  //ACS mod 2007-08
00119     }
00120   catch(...)
00121     {
00122       printf ("TAO_Basic_StreamCtrl::stop - unknown exception\n");
00123     }
00124 }
00125 
00126 // Start the transfer of data in the stream.
00127 // Empty the_spec means apply operation to all flows
00128 void
00129 TAO_Basic_StreamCtrl::start (const AVStreams::flowSpec &flow_spec)
00130 {
00131   try
00132     {
00133       // @@Call start on the Related MediaCtrl.
00134 
00135       // call start on the flow connections.
00136       if (this->flow_connection_map_.current_size () > 0)
00137         {
00138           if (flow_spec.length () > 0)
00139             for (u_int i = 0; i < flow_spec.length (); i++)
00140               {
00141                 char *flowname = TAO_AV_Core::get_flowname (flow_spec[i]);
00142                 ACE_CString flow_name_key (flowname);
00143                 FlowConnection_Map::ENTRY *flow_connection_entry = 0;
00144                 if (this->flow_connection_map_.find (flow_name_key,
00145                                                      flow_connection_entry) == 0)
00146                   {
00147                     flow_connection_entry->int_id_->start ();
00148                   }
00149               }
00150           else
00151             {
00152               // call start on all the flows.
00153               FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
00154               FlowConnection_Map_Entry *entry = 0;
00155               for (;iterator.next (entry) !=  0;iterator.advance ())
00156                 {
00157                   entry->int_id_->start ();
00158                 }
00159             }
00160         }
00161     }
00162  catch(AVStreams::noSuchFlow& ex)
00163       {
00164       throw; //ACS mod 2007-08
00165       }
00166   catch (const CORBA::Exception& ex)
00167     {
00168     ex._tao_print_exception ("TAO_Basic_StreamCtrl::start");
00169     throw;  //ACS mod 2007-08
00170     }
00171   catch(...)
00172     {
00173       printf ("TAO_Basic_StreamCtrl::start - unknown exception\n");
00174     }
00175 }
00176 
00177 // Tears down the stream. This will close the connection, and delete
00178 // the streamendpoint and vdev associated with this stream Empty
00179 // the_spec means apply operation to all flows
00180 
00181 void
00182 TAO_Basic_StreamCtrl::destroy (const AVStreams::flowSpec &flow_spec)
00183 {
00184   try
00185     {
00186       // call stop on the flow connections.
00187       if (this->flow_connection_map_.current_size () > 0)
00188         {
00189           if (flow_spec.length () > 0)
00190           {
00191             for (u_int i=0;i<flow_spec.length ();i++)
00192               {
00193                 char *flowname = TAO_AV_Core::get_flowname (flow_spec[i]);
00194                 ACE_CString flow_name_key (flowname);
00195                 FlowConnection_Map::ENTRY *flow_connection_entry = 0;
00196                 if (this->flow_connection_map_.find (flow_name_key, flow_connection_entry) == 0)
00197                   {
00198                     flow_connection_entry->int_id_->destroy ();
00199                   }
00200               }
00201           }
00202           else
00203             {
00204               // call destroy on all the flows.
00205               FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
00206               FlowConnection_Map_Entry *entry = 0;
00207               for (;iterator.next (entry) !=  0;iterator.advance ())
00208                 {
00209                   entry->int_id_->destroy ();
00210                 }
00211             }
00212         }
00213     }
00214   catch (const CORBA::Exception& ex)
00215     {
00216       ex._tao_print_exception ("TAO_Basic_StreamCtrl::destroy");
00217       return;
00218     }
00219 }
00220 
00221 // Changes the QoS associated with the stream
00222 // Empty the_spec means apply operation to all flows
00223 CORBA::Boolean
00224 
00225 TAO_Basic_StreamCtrl::modify_QoS (AVStreams::streamQoS & /*new_qos*/,
00226                                   const AVStreams::flowSpec &/*flowspec*/)
00227 {
00228   return 1;
00229 }
00230 
00231 // Used by StreamEndPoint and VDev to inform StreamCtrl of events.
00232 // E.g., loss of flow, reestablishment of flow, etc..
00233 void
00234 TAO_Basic_StreamCtrl::push_event (const struct CosPropertyService::Property &/*the_event*/)
00235 {
00236   if (TAO_debug_level > 0)
00237     ACE_DEBUG ((LM_DEBUG, "\n(%P|%t) Recieved event \""));
00238 }
00239 
00240 // Sets the flow protocol status.
00241 void
00242 TAO_Basic_StreamCtrl::set_FPStatus (const AVStreams::flowSpec &flow_spec,
00243                                     const char  *fp_name,
00244                                     const CORBA::Any &fp_settings)
00245 
00246 {
00247   if (!CORBA::is_nil (this->sep_a_.in ()))
00248     {
00249       this->sep_a_->set_FPStatus (flow_spec, fp_name, fp_settings);
00250     }
00251 }
00252 
00253 // Gets the flow connection.
00254 CORBA::Object_ptr
00255 TAO_Basic_StreamCtrl::get_flow_connection (const char *flow_name)
00256 {
00257   ACE_CString flow_name_key (flow_name);
00258   AVStreams::FlowConnection_var flow_connection_entry;
00259 
00260   if (this->flow_connection_map_.find (flow_name_key, flow_connection_entry) == 0){
00261     return flow_connection_entry._retn();
00262   }
00263   else{
00264     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%N,%l) Cannot find flow: %s\n", flow_name ));
00265     throw AVStreams::noSuchFlow ();
00266   }
00267 }
00268 
00269 // Sets the flow connection.
00270 void
00271 TAO_Basic_StreamCtrl::set_flow_connection (const char *flow_name,
00272                                            CORBA::Object_ptr flow_connection_obj)
00273 {
00274   AVStreams::FlowConnection_var flow_connection;
00275   try
00276     {
00277       flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj);
00278     }
00279   catch (const CORBA::Exception& ex)
00280     {
00281       ex._tao_print_exception (
00282         "TAO_Basic_StreamCtrl::set_flow_connection");
00283       return;
00284     }
00285   // add the flowname and the flowconnection to the hashtable.
00286   this->flows_.length (this->flow_count_ + 1);
00287   this->flows_ [this->flow_count_++] = CORBA::string_dup (flow_name);
00288   ACE_CString flow_name_key (flow_name);
00289   if (this->flow_connection_map_.bind (flow_name_key, flow_connection) != 0)
00290   {
00291     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%N,%l) Cannot find flow: %s\n", flow_name ));
00292     throw AVStreams::noSuchFlow ();// is this right?
00293   }
00294 }
00295 
00296 TAO_Basic_StreamCtrl::~TAO_Basic_StreamCtrl (void)
00297 {
00298 }
00299 
00300 // ----------------------------------------------------------------------
00301 // TAO_Negotiator
00302 // ----------------------------------------------------------------------
00303 
00304 CORBA::Boolean
00305 TAO_Negotiator::negotiate (AVStreams::Negotiator_ptr /* remote_negotiator */,
00306                            const AVStreams::streamQoS &/* qos_spec */)
00307 {
00308   ACE_DEBUG ((LM_DEBUG,
00309               "TAO_Negotiator::negotiate\n"));
00310   return 0;
00311 }
00312 
00313 // ----------------------------------------------------------------------
00314 // MMDevice_Map_Hash_Key
00315 // ----------------------------------------------------------------------
00316 
00317 const int MMDevice_Map_Hash_Key::hash_maximum_ = 10000;
00318 
00319 //default constructor.
00320 MMDevice_Map_Hash_Key::MMDevice_Map_Hash_Key (void)
00321 {
00322   this->mmdevice_ = AVStreams::MMDevice::_nil ();
00323 }
00324 
00325 // constructor.
00326 MMDevice_Map_Hash_Key::MMDevice_Map_Hash_Key (AVStreams::MMDevice_ptr mmdevice)
00327 {
00328   this->mmdevice_ = AVStreams::MMDevice::_duplicate (mmdevice);
00329 }
00330 
00331 // copy constructor.
00332 MMDevice_Map_Hash_Key::MMDevice_Map_Hash_Key (const MMDevice_Map_Hash_Key& hash_key)
00333 {
00334   this->mmdevice_ = AVStreams::MMDevice::_duplicate (hash_key.mmdevice_);
00335 }
00336 
00337 // destructor.
00338 MMDevice_Map_Hash_Key::~MMDevice_Map_Hash_Key (void)
00339 {
00340   CORBA::release (this->mmdevice_);
00341 }
00342 
00343 bool
00344 MMDevice_Map_Hash_Key::operator == (const MMDevice_Map_Hash_Key &hash_key) const
00345 {
00346   CORBA::Boolean result = 0;
00347   try
00348     {
00349       result =
00350         this->mmdevice_->_is_equivalent (hash_key.mmdevice_);
00351     }
00352   catch (const CORBA::Exception& ex)
00353     {
00354       ex._tao_print_exception (
00355         "MMDevice_Map_Hash_Key::operator == ");
00356       return false;
00357     }
00358 
00359   return result;
00360 }
00361 
00362 bool
00363 operator < (const MMDevice_Map_Hash_Key &left,
00364             const MMDevice_Map_Hash_Key &right)
00365 {
00366   bool result = false;
00367 
00368   try
00369     {
00370       const CORBA::ULong left_hash =
00371         left.mmdevice_->_hash (left.hash_maximum_);
00372 
00373       const CORBA::ULong right_hash =
00374         right.mmdevice_->_hash (right.hash_maximum_);
00375 
00376       result = left_hash < right_hash;
00377     }
00378   catch (const CORBA::Exception& ex)
00379     {
00380       ex._tao_print_exception ("operator < for MMDevice_Map_Hash_Key");
00381       return false;
00382     }
00383 
00384   return result;
00385 }
00386 
00387 u_long
00388 MMDevice_Map_Hash_Key::hash (void)  const
00389 {
00390   u_long result = 0;
00391   try
00392     {
00393       result = this->mmdevice_->_hash (this->hash_maximum_);
00394     }
00395   catch (const CORBA::Exception& ex)
00396     {
00397       ex._tao_print_exception ("MMDevice_Map_Hash_Key::hash");
00398       return 0;
00399     }
00400   return result;
00401 }
00402 
00403 // ----------------------------------------------------------------------
00404 // TAO_StreamCtrl
00405 // ----------------------------------------------------------------------
00406 
00407 TAO_StreamCtrl::TAO_StreamCtrl (void)
00408   :mcastconfigif_ (0)
00409 {
00410   try
00411     {
00412       this->streamctrl_ = this->_this ();
00413       char buf [BUFSIZ];
00414       int result = ACE_OS::hostname (buf, BUFSIZ);
00415       unsigned long ipaddr = 0;
00416       if (result == 0)
00417         ipaddr = ACE_OS::inet_addr (buf);
00418       this->source_id_ = TAO_AV_RTCP::alloc_srcid (ipaddr);
00419     }
00420   catch (const CORBA::Exception& ex)
00421     {
00422       ex._tao_print_exception ("TAO_StreamCtrl::TAO_StreamCtrl");
00423     }
00424 }
00425 
00426 TAO_StreamCtrl::~TAO_StreamCtrl (void)
00427 {
00428   delete this->mcastconfigif_;
00429 }
00430 
00431 
00432 // Stop the transfer of data of the stream
00433 // Empty the_spec means apply operation to all flows
00434 void
00435 TAO_StreamCtrl::stop (const AVStreams::flowSpec &flow_spec)
00436 {
00437   try
00438     {
00439       TAO_Basic_StreamCtrl::stop (flow_spec);
00440       if (this->flow_connection_map_.current_size () > 0)
00441         return;
00442       MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
00443       MMDevice_Map::ENTRY *entry = 0;
00444       for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
00445         {
00446           entry->int_id_.sep_->stop (flow_spec);
00447         }
00448       MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
00449       for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
00450         {
00451           entry->int_id_.sep_->stop (flow_spec);
00452         }
00453     }
00454   catch(AVStreams::noSuchFlow& ex)
00455     {
00456     throw; //ACS mod 2007-08
00457     }
00458   catch (const CORBA::Exception& ex)
00459     {
00460     ex._tao_print_exception ("TAO_StreamCtrl::stop");
00461     throw;  //ACS mod 2007-08
00462     }
00463   catch(...)
00464     {
00465       printf ("TAO_StreamCtrl::stop - unknow exception\n");
00466     }
00467 }
00468 
00469 // Start the transfer of data in the stream.
00470 // Empty the_spec means apply operation to all flows
00471 void
00472 TAO_StreamCtrl::start (const AVStreams::flowSpec &flow_spec)
00473 {
00474   try
00475     {
00476       TAO_Basic_StreamCtrl::start (flow_spec);
00477       if (this->flow_connection_map_.current_size () > 0)
00478         return;
00479 
00480       MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
00481       MMDevice_Map::ENTRY *entry = 0;
00482       for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
00483         {
00484           entry->int_id_.sep_->start (flow_spec);
00485         }
00486       MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
00487       for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
00488         {
00489           entry->int_id_.sep_->start (flow_spec);
00490         }
00491     }
00492  catch(AVStreams::noSuchFlow& ex)
00493     {
00494     throw; //ACS mod 2007-08
00495     }
00496   catch (const CORBA::Exception& ex)
00497     {
00498     ex._tao_print_exception ("TAO_StreamCtrl::start");
00499     throw;  //ACS mod 2007-08
00500     }
00501   catch(...)
00502     {
00503       printf ("TAO_StreamCtrl::start - unknow exception\n");
00504     }
00505 }
00506 
00507 // Tears down the stream. This will close the connection, and delete
00508 // the streamendpoint and vdev associated with this stream
00509 // Empty the_spec means apply operation to all flows
00510 void
00511 TAO_StreamCtrl::destroy (const AVStreams::flowSpec &flow_spec)
00512 {
00513   try
00514     {
00515       TAO_Basic_StreamCtrl::destroy (flow_spec);
00516       if (this->flow_connection_map_.current_size () > 0)
00517         return;
00518 
00519       MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
00520       MMDevice_Map::ENTRY *entry = 0;
00521       for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
00522         {
00523           entry->int_id_.sep_->destroy (flow_spec);
00524         }
00525       MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
00526       for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
00527         {
00528           entry->int_id_.sep_->destroy (flow_spec);
00529         }
00530     }
00531   catch (const CORBA::Exception& ex)
00532     {
00533       ex._tao_print_exception ("TAO_StreamCtrl::destroy");
00534       return;
00535     }
00536 
00537   int result = TAO_AV_Core::deactivate_servant (this);
00538   if (result < 0)
00539     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamCtrl::destroy failed\n"));
00540 }
00541 
00542 // request the two MMDevices to create vdev and stream endpoints. save
00543 // the references returned.
00544 
00545 // The interaction diagram for this method is on page 13 of the spec
00546 CORBA::Boolean
00547 TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party,
00548                            AVStreams::MMDevice_ptr b_party,
00549                            AVStreams::streamQoS &the_qos,
00550                            const AVStreams::flowSpec &the_flows)
00551 {
00552   try
00553     {
00554       if (CORBA::is_nil (a_party) && CORBA::is_nil (b_party))
00555         ACE_ERROR_RETURN ((LM_ERROR, "Both parties are nil\n"), 0);
00556       // Check to see if we have non-nil parties to bind!
00557       if (TAO_debug_level > 0)
00558         if (CORBA::is_nil (a_party) ||
00559             CORBA::is_nil (b_party))
00560           if (TAO_debug_level > 0)
00561             ACE_DEBUG ((LM_DEBUG,
00562                         "(%P|%t) TAO_StreamCtrl::bind_devs: "
00563                         "a_party or b_party is null"
00564                         "Multicast mode\n"));
00565 
00566       // Request a_party to create the endpoint and vdev
00567       CORBA::Boolean met_qos;
00568       CORBA::String_var named_vdev;
00569 
00570       if (!CORBA::is_nil (a_party))
00571         {
00572           MMDevice_Map_Hash_Key find_key (a_party);
00573           MMDevice_Map_Entry find_entry;
00574           int result =
00575             this->mmdevice_a_map_.find (find_key, find_entry);
00576           if (result == 0)
00577             {
00578               if (TAO_debug_level > 0)
00579                 {
00580                   // Already in the map.
00581                   if (TAO_debug_level > 0)
00582                     ACE_DEBUG ((LM_DEBUG, "mmdevice a_party is already bound\n"));
00583                 }
00584               return 1;
00585             }
00586           else
00587             {
00588               this->sep_a_ =
00589                 a_party-> create_A (this->streamctrl_.in (),
00590                                     this->vdev_a_.out (),
00591                                     the_qos,
00592                                     met_qos,
00593                                     named_vdev.inout (),
00594                                     the_flows);
00595 
00596               if (TAO_debug_level > 0)
00597                 ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_StreamCtrl::create_A: succeeded\n"));
00598               // Define ourselves as the related_streamctrl property of the sep.
00599               CORBA::Any streamctrl_any;
00600               streamctrl_any <<= this->streamctrl_.in ();
00601               this->sep_a_->define_property ("Related_StreamCtrl",
00602                                              streamctrl_any);
00603 
00604               CORBA::Any vdev_a_any;
00605               vdev_a_any <<= this->vdev_a_.in ();
00606               this->sep_a_->define_property ("Related_VDev",
00607                                              vdev_a_any);
00608 
00609               CORBA::Any streamendpoint_a_any;
00610               streamendpoint_a_any <<= this->sep_a_.in ();
00611               this->vdev_a_->define_property ("Related_StreamEndpoint",
00612                                               streamendpoint_a_any);
00613 
00614 
00615               CORBA::Any mmdevice_a_any;
00616               mmdevice_a_any <<= a_party;
00617               this->vdev_a_->define_property ("Related_MMDevice",
00618                                               mmdevice_a_any);
00619 
00620               // add the mmdevice, sep and vdev to the map.
00621               MMDevice_Map_Entry map_entry;
00622               MMDevice_Map_Hash_Key key (a_party);
00623               map_entry.sep_ = AVStreams::StreamEndPoint_A::_duplicate (this->sep_a_.in ());
00624               map_entry.vdev_ = AVStreams::VDev::_duplicate (this->vdev_a_.in ());
00625               map_entry.flowspec_ = the_flows;
00626               map_entry.qos_ = the_qos;
00627               result =
00628                 this->mmdevice_a_map_.bind (key, map_entry);
00629               if (result < 0)
00630                 if (TAO_debug_level > 0)
00631                   ACE_DEBUG ((LM_DEBUG, "Error binding mmdevice entry in the a_map"));
00632             }
00633         }
00634       // Request b_party to create the endpoint and vdev
00635       if (!CORBA::is_nil (b_party))
00636         {
00637           MMDevice_Map_Hash_Key find_key (b_party);
00638           MMDevice_Map_Entry find_entry;
00639           int result =
00640             this->mmdevice_b_map_.find (find_key, find_entry);
00641           if (result == 0)
00642             {
00643               // Already in the map.
00644               if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "mmdevice b_party is already bound\n"));
00645               return 1;
00646             }
00647           else
00648             {
00649               this->sep_b_ =
00650                 b_party-> create_B (this->streamctrl_.in (),
00651                                     this->vdev_b_.out (),
00652                                     the_qos,
00653                                     met_qos,
00654                                     named_vdev.inout (),
00655                                     the_flows);
00656 
00657               if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_StreamCtrl::create_B: succeeded\n"));
00658 
00659               if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
00660                           "\n(%P|%t)stream_endpoint_b_ = %s",
00661                           TAO_ORB_Core_instance ()->orb ()->object_to_string (this->sep_b_.in ())));
00662               // Define ourselves as the related_streamctrl property of the sep.
00663               CORBA::Any streamctrl_any;
00664               streamctrl_any <<= this->streamctrl_.in ();
00665               this->sep_b_->define_property ("Related_StreamCtrl",
00666                                              streamctrl_any);
00667 
00668               CORBA::Any vdev_b_any;
00669               vdev_b_any <<= this->vdev_b_.in ();
00670               this->sep_b_->define_property ("Related_VDev",
00671                                              vdev_b_any);
00672 
00673               CORBA::Any streamendpoint_b_any;
00674               streamendpoint_b_any <<= this->sep_b_.in ();
00675               this->vdev_b_->define_property ("Related_StreamEndpoint",
00676                                               streamendpoint_b_any);
00677 
00678 
00679               CORBA::Any mmdevice_b_any;
00680               mmdevice_b_any <<= b_party;
00681               this->vdev_b_->define_property ("Related_MMDevice",
00682                                               mmdevice_b_any);
00683               // add the mmdevice, sep and vdev to the map.
00684               MMDevice_Map_Entry map_entry;
00685               MMDevice_Map_Hash_Key key (b_party);
00686               map_entry.sep_ = AVStreams::StreamEndPoint::_duplicate (this->sep_b_.in ());
00687               map_entry.vdev_ = AVStreams::VDev::_duplicate(this->vdev_b_.in ());
00688               map_entry.flowspec_ = the_flows;
00689               map_entry.qos_ = the_qos;
00690               int result =
00691                 this->mmdevice_b_map_.bind (key, map_entry);
00692               if (result < 0)
00693                 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Error binding mmdevice entry in the b_map"));
00694             }
00695         }
00696 
00697       // Tell the endpoints about each other.
00698       if ((!CORBA::is_nil (a_party)) && (!CORBA::is_nil (b_party)))
00699         {
00700           CORBA::Any sep_a_peer_any;
00701           CORBA::Any sep_b_peer_any;
00702 
00703           sep_a_peer_any <<= this->sep_b_.in();
00704           sep_b_peer_any <<= this->sep_a_.in();
00705           this->sep_a_->define_property ("PeerAdapter",
00706                                           sep_a_peer_any);
00707 
00708           this->sep_b_->define_property ("PeerAdapter",
00709                                          sep_b_peer_any);
00710         }
00711 
00712       // In the full profile case there's no VDev.
00713       if (CORBA::is_nil (b_party) && (!CORBA::is_nil (this->vdev_a_.in ())))
00714         {
00715           // Now set the source id for this A endpoint.
00716           // If the sep contains flow producers then set the source ids for those
00717           // instead.
00718           try
00719             {
00720               CORBA::Any_ptr flows_any = this->sep_a_->get_property_value ("Flows");
00721               AVStreams::flowSpec_var flows;
00722               *flows_any >>= flows.out ();
00723               for (CORBA::ULong i=0; i< flows->length ();++i)
00724                 {
00725                   CORBA::Object_var fep_obj =
00726                     this->sep_a_->get_fep (flows [i]);
00727                   try
00728                     {
00729                       AVStreams::FlowProducer_var producer =
00730                         AVStreams::FlowProducer::_narrow (fep_obj.in ());
00731                       producer->set_source_id (this->source_id_++);
00732                     }
00733                   catch (const CORBA::Exception& ex)
00734                     {
00735                       if (TAO_debug_level > 0)
00736                         ACE_DEBUG ((LM_DEBUG, " %s ", static_cast<char const*>(flows[i])));
00737 
00738                       ex._tao_print_exception (
00739                         "producer_check: not a producer");
00740 
00741                     }
00742                 }
00743             }
00744           catch (const CORBA::Exception&)
00745             {
00746               // Since the full profile failed try setting the source id
00747               // for the sep instead.
00748               // @@Naga: What do we do if in the light profile the sep has
00749               // many producers who do not have flow interfaces. Then
00750               // the streamctrl has to give an array of source ids to
00751               // the sep.
00752               this->sep_a_->set_source_id (this->source_id_++);
00753             }
00754           if (!this->mcastconfigif_)
00755             {
00756               ACE_NEW_RETURN (this->mcastconfigif_,
00757                               TAO_MCastConfigIf,
00758                               0);
00759               // @@: Deactivating the object thru poa means calling remove_ref after _this.
00760               this->mcastconfigif_ptr_ = this->mcastconfigif_->_this ();
00761             }
00762           // Multicast source being added.
00763           CORBA::Boolean result = this->vdev_a_->set_Mcast_peer (this->streamctrl_.in (),
00764                                                                  this->mcastconfigif_ptr_.in (),
00765                                                                  the_qos,
00766                                                                  the_flows);
00767           if (!result)
00768             ACE_ERROR_RETURN ((LM_ERROR, "set_Mcast_peer failed\n"), 0);
00769         }
00770 
00771       if (CORBA::is_nil (a_party))
00772         {
00773           if (!CORBA::is_nil (this->vdev_b_.in ()))
00774             {
00775               // Multicast sink being added.
00776               if (!this->mcastconfigif_)
00777                 ACE_ERROR_RETURN ((LM_ERROR, "first add a source and then a sink\n"), 0);
00778               this->mcastconfigif_->set_peer (this->vdev_b_.in (),
00779                                               the_qos,
00780                                               the_flows);
00781             }
00782 
00783           int connect_leaf_success = 0;
00784           try
00785             {
00786               // @@: define null interfaces for Atm so that they can be implemented once
00787               //  ACE adds support for ATM multicast.
00788               connect_leaf_success = this->sep_a_->connect_leaf (this->sep_b_.in (),
00789                                                                  the_qos,
00790                                                                  the_flows);
00791               connect_leaf_success = 1;
00792             }
00793           catch (const AVStreams::notSupported&)
00794             {
00795               if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "connect_leaf failed\n"));
00796               connect_leaf_success = 0;
00797             }
00798           catch (const CORBA::Exception& ex)
00799             {
00800               ex._tao_print_exception (
00801                 "TAO_StreamCtrl::bind_devs");
00802             }
00803           if (!connect_leaf_success)
00804             {
00805               if (TAO_debug_level > 0)
00806                 ACE_DEBUG ((LM_DEBUG,"TAO_StreamCtrl::bind_devs Multiconnect\n"));
00807               AVStreams::flowSpec connect_flows = the_flows;
00808               this->sep_a_->multiconnect (the_qos, connect_flows);
00809               this->sep_b_->multiconnect (the_qos, connect_flows);
00810             }
00811         }
00812 
00813       if (!CORBA::is_nil (a_party) && !CORBA::is_nil (b_party))
00814         {
00815           // Check to see if the MMDevice contains FDev objects
00816           // If it contains FDev objects, then we are using the
00817           // Full profile, and we want to call bind() instead
00818           // of connect() on the the streamctrl
00819           if( a_party->is_property_defined("Flows") &&
00820               b_party->is_property_defined("Flows") )
00821           {
00822               if (TAO_debug_level > 0) {
00823                 //FUZZ: disable check_for_lack_ACE_OS
00824                 ACE_DEBUG ((LM_DEBUG, "(%N,%l) Full profile, invoking bind()\n"));
00825                 //FUZZ: enable check_for_lack_ACE_OS
00826               }
00827 
00828               // It is full profile
00829               // we have feps in the sep then dont call connect
00830               // instead call bind on the streamctrl.
00831               this->bind (this->sep_a_.in (),
00832                           this->sep_b_.in (),
00833                           the_qos,
00834                           the_flows);
00835 
00836 
00837 
00838           }
00839           // This is the light profile, call connect()
00840           else  if (!CORBA::is_nil (this->vdev_a_.in ()) && !CORBA::is_nil (this->vdev_b_.in ()))
00841           {
00842               if (TAO_debug_level > 0) {
00843                 //FUZZ: disable check_for_lack_ACE_OS
00844                 ACE_DEBUG ((LM_DEBUG, "(%N,%l) Light profile, invoking connect()\n"));
00845                 //FUZZ: enable check_for_lack_ACE_OS
00846               }
00847 
00848               // Tell the 2 VDev's about one another
00849               this->vdev_a_->set_peer (this->streamctrl_.in (),
00850                                        this->vdev_b_.in (),
00851                                        the_qos,
00852                                        the_flows);
00853 
00854               this->vdev_b_->set_peer (this->streamctrl_.in (),
00855                                        this->vdev_a_.in (),
00856                                        the_qos,
00857                                        the_flows);
00858 
00859 
00860               // Now connect the streams together. This will
00861               // establish the connection
00862               CORBA::Boolean result  =
00863                 this->sep_a_->connect (this->sep_b_.in (),
00864                                        the_qos,
00865                                        the_flows);
00866               if (result == 0)
00867                 ACE_ERROR_RETURN ((LM_ERROR, "sep_a->connect (sep_b) failed\n"), 0);
00868           }
00869         }
00870     }
00871   catch (const CORBA::Exception& ex)
00872     {
00873       ex._tao_print_exception ("TAO_StreamCtrl::bind_devs");
00874       return 0;
00875     }
00876   return 1;
00877 }
00878 
00879 // Used to establish a connection between two endpoints
00880 // directly, i.e. without a MMDevice
00881 CORBA::Boolean
00882 TAO_StreamCtrl::bind (AVStreams::StreamEndPoint_A_ptr sep_a,
00883                       AVStreams::StreamEndPoint_B_ptr sep_b,
00884                       AVStreams::streamQoS &stream_qos,
00885                       const AVStreams::flowSpec &flow_spec)
00886 {
00887   this->sep_a_ = AVStreams::StreamEndPoint_A::_duplicate(sep_a);
00888   this->sep_b_ = AVStreams::StreamEndPoint_B::_duplicate(sep_b);
00889 
00890   int result = 0;
00891   try
00892     {
00893       if (CORBA::is_nil (sep_a_.in() ) ||
00894           CORBA::is_nil (sep_b_.in() ))
00895         ACE_ERROR_RETURN ((LM_ERROR,
00896                            "(%P|%t) TAO_StreamCtrl::bind:"
00897                            "a_party or b_party null!"),
00898                           0);
00899 
00900       // Define each other as their peers.
00901       CORBA::Any sep_any;
00902       sep_any <<= sep_b;
00903       sep_a_->define_property ("PeerAdapter",
00904                               sep_any);
00905       sep_any <<= sep_a;
00906       sep_b_->define_property ("PeerAdapter",
00907                               sep_any);
00908       // since its full profile we do the viable stream setup algorithm.
00909       // get the flows for the A streamendpoint.
00910       // the flows spec is empty and hence we do a exhaustive match.
00911       AVStreams::flowSpec a_flows, b_flows;
00912       CORBA::Any_var flows_any;
00913       flows_any = sep_a_->get_property_value ("Flows");
00914       AVStreams::flowSpec *temp_flows;
00915       flows_any.in () >>= temp_flows;
00916       a_flows = *temp_flows;
00917       flows_any = sep_b_->get_property_value ("Flows");
00918       flows_any.in () >>= temp_flows;
00919       b_flows = *temp_flows;
00920       u_int i;
00921       FlowEndPoint_Map *a_fep_map;
00922       FlowEndPoint_Map *b_fep_map;
00923       ACE_NEW_RETURN (a_fep_map,
00924                       FlowEndPoint_Map,
00925                       0);
00926       ACE_NEW_RETURN (b_fep_map,
00927                       FlowEndPoint_Map,
00928                       0);
00929       for (i=0;i<a_flows.length ();i++)
00930         {
00931           const char *flowname = a_flows[i];
00932           // get the flowendpoint references.
00933           CORBA::Object_var fep_obj =
00934             sep_a_->get_fep (flowname);
00935           AVStreams::FlowEndPoint_var fep =
00936             AVStreams::FlowEndPoint::_narrow (fep_obj.in ());
00937           ACE_CString fep_key (flowname);
00938           result = a_fep_map->bind (fep_key, fep);
00939           if (result == -1)
00940             if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamCtrl::bind failed for %s\n", flowname));
00941         }
00942       // get the flowendpoints for streamendpoint_b
00943       for (i=0;i<b_flows.length ();i++)
00944         {
00945           const char *flowname = b_flows[i];
00946           // get the flowendpoint references.
00947           CORBA::Object_var fep_obj =
00948             sep_b->get_fep (flowname);
00949           AVStreams::FlowEndPoint_var fep =
00950             AVStreams::FlowEndPoint::_narrow (fep_obj.in ());
00951           ACE_CString fep_key (flowname);
00952           result = b_fep_map->bind (fep_key, fep);
00953           if (result == -1)
00954             if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamCtrl::bind failed for %s\n", flowname));
00955         }
00956       FlowEndPoint_Map *map_a = 0, *map_b = 0;
00957       if (flow_spec.length () == 0)
00958         {
00959           map_a = a_fep_map;
00960           map_b = b_fep_map;
00961         }
00962       else
00963         {
00964           FlowEndPoint_Map *spec_fep_map_a, *spec_fep_map_b;
00965           ACE_NEW_RETURN (spec_fep_map_a,
00966                           FlowEndPoint_Map,
00967                           0);
00968           ACE_NEW_RETURN (spec_fep_map_b,
00969                           FlowEndPoint_Map,
00970                           0);
00971           for (i=0; i< flow_spec.length ();i++)
00972             {
00973               TAO_Forward_FlowSpec_Entry *entry = 0;
00974               ACE_NEW_RETURN (entry,
00975                               TAO_Forward_FlowSpec_Entry,
00976                               0);
00977               entry->parse (flow_spec[i]);
00978               ACE_CString fep_key (entry->flowname ());
00979               AVStreams::FlowEndPoint_var fep;
00980               result = a_fep_map->find (fep_key, fep);
00981               if (result == -1)
00982                 ACE_ERROR_RETURN ((LM_ERROR, "Fep not found on A side for flowname: %s\n", flow_spec[i].in ()), 0);
00983 
00984               result = spec_fep_map_a->bind (fep_key, fep);
00985               if (result == -1)
00986                 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Bind failed for %s\n", flow_spec[i].in ()));
00987 
00988               result = b_fep_map->find (fep_key, fep);
00989               if (result == -1)
00990                 ACE_ERROR_RETURN ((LM_ERROR, "Fep not found on B side for flowname: %s\n", flow_spec[i].in ()), 0);
00991 
00992               result = spec_fep_map_b->bind (fep_key, fep);
00993               if (result == -1)
00994                 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Bind failed for %s\n", flow_spec[i].in ()));
00995             }
00996           map_a = spec_fep_map_a;
00997           map_b = spec_fep_map_b;
00998         }
00999 
01000       TAO_AV_QoS qos (stream_qos);
01001       // Now go thru the list of flow endpoint and match them.
01002       // uses the first match policy.
01003       FlowEndPoint_Map_Iterator a_feps_iterator (*map_a);
01004       FlowEndPoint_Map_Entry *a_feps_entry, *b_feps_entry;
01005       try
01006         {
01007 
01008           for (;a_feps_iterator.next (a_feps_entry) != 0;
01009                a_feps_iterator.advance ())
01010             {
01011               AVStreams::FlowEndPoint_var fep_a = a_feps_entry->int_id_;
01012               AVStreams::FlowEndPoint_var connected_to =
01013                 fep_a->get_connected_fep ();
01014 
01015               if (!CORBA::is_nil (connected_to.in ()))
01016                 {
01017                   // Skip this one, it is already connected...
01018                   continue;
01019                 }
01020 
01021               FlowEndPoint_Map_Iterator b_feps_iterator (*map_b);
01022               for (;b_feps_iterator.next (b_feps_entry) != 0;
01023                    b_feps_iterator.advance ())
01024                 {
01025                   AVStreams::FlowEndPoint_var fep_b = b_feps_entry->int_id_;
01026                   AVStreams::FlowConnection_var flow_connection;
01027 
01028                   AVStreams::FlowEndPoint_var connected_to =
01029                     fep_b->get_connected_fep ();
01030 
01031                   if (!CORBA::is_nil (connected_to.in ()))
01032                     {
01033                       // Skip this one, it is already connected...
01034                       continue;
01035                     }
01036 
01037                   if (fep_a->is_fep_compatible (fep_b.in()) == 1)
01038                     {
01039                       // assume that flow names are same so that we
01040                       // can use either of them.
01041                       CORBA::Object_var flow_connection_obj;
01042                       CORBA::Any_var flowname_any =
01043                         fep_a->get_property_value ("FlowName");
01044                       const char *flowname = 0;
01045                       flowname_any.in () >>= flowname;
01046                       try
01047                         {
01048                           flow_connection_obj =
01049                             this->get_flow_connection (flowname);
01050                           flow_connection =
01051                             AVStreams::FlowConnection::_narrow (flow_connection_obj.in ());
01052                         }
01053                       catch (const CORBA::Exception&)
01054                         {
01055                           TAO_FlowConnection *flowConnection;
01056                           ACE_NEW_RETURN (flowConnection,
01057                                           TAO_FlowConnection,
01058                                           0);
01059                           flow_connection = flowConnection->_this ();
01060                           this->set_flow_connection (flowname,
01061                                                      flow_connection.in ());
01062                         }
01063 
01064                       // make sure that a_feps is flow_producer
01065                       // and b_feps is flow_consumer
01066                       // There should be a way to find which flow
01067                       // endpoint is producer and which is
01068                       // consumer.
01069 
01070                       AVStreams::FlowProducer_var producer;
01071                       AVStreams::FlowConsumer_var consumer;
01072 
01073                       try
01074                         {
01075                           producer =
01076                             AVStreams::FlowProducer::_narrow (fep_a.in());
01077                           consumer =
01078                             AVStreams::FlowConsumer::_narrow (fep_b.in());
01079 
01080                           // If the types don't match then try in
01081                           // the opposite order
01082                           if (CORBA::is_nil (producer.in ()))
01083                             {
01084                               producer =
01085                                 AVStreams::FlowProducer::_narrow (fep_b.in());
01086                               consumer =
01087                                 AVStreams::FlowConsumer::_narrow (fep_a.in());
01088                             }
01089                           // At this point they should both be
01090                           // non-nil
01091                           // @@ raise an exception (which one?) if
01092                           // this is not true...
01093                           ACE_ASSERT (!CORBA::is_nil (producer.in ()));
01094                           ACE_ASSERT (!CORBA::is_nil (consumer.in ()));
01095                         }
01096                       catch (const CORBA::Exception&)
01097                         {
01098                           //Yamuna : Recheck this
01099                           throw;//_EX (producer_check);
01100                         }
01101                       CORBA::String_var fep_a_name, fep_b_name;
01102                       flowname_any = fep_a->get_property_value ("FlowName");
01103                       const char *temp_name;
01104                       flowname_any.in () >>= temp_name;
01105                       fep_a_name = CORBA::string_dup (temp_name);
01106                       flowname_any = fep_b->get_property_value ("FlowName");
01107                       flowname_any.in () >>= temp_name;
01108                       fep_b_name = CORBA::string_dup (temp_name);
01109                       AVStreams::QoS flow_qos;
01110                       flow_qos.QoSType = fep_a_name;
01111                       flow_qos.QoSParams.length (0);
01112                       result = qos.get_flow_qos (fep_a_name.in (), flow_qos);
01113                       if (result == -1)
01114                         {
01115                           flow_qos.QoSType = fep_b_name;
01116                           result = qos.get_flow_qos (fep_b_name.in (),
01117                                                      flow_qos);
01118                           if (result == -1 && TAO_debug_level > 0)
01119                             ACE_DEBUG ((LM_DEBUG,
01120                                         "No QoS Specified for this flow <%s>\n", flowname));
01121                         }
01122                       flow_connection->connect (producer.in (),
01123                                                 consumer.in (),
01124                                                 flow_qos);
01125                     }
01126                 }
01127             }
01128         }
01129       catch (const CORBA::Exception& ex)
01130         {
01131           ex._tao_print_exception ("TAO_StreamCtrl::bind:flow_connect block");
01132           return 0;
01133         }
01134     }
01135   catch (const CORBA::Exception&)
01136     {
01137       // error was thrown because one of the streamendpoints is light profile.
01138       // Now connect the streams together
01139       this->sep_a_->connect (this->sep_b_.in (),
01140                              stream_qos,
01141                              flow_spec);
01142     }
01143   return 1;
01144 }
01145 
01146 void
01147 TAO_StreamCtrl::unbind (void)
01148 {
01149   try
01150     {
01151       if (this->flow_connection_map_.current_size () > 0)
01152         return;
01153 
01154       AVStreams::flowSpec flow_spec;
01155       flow_spec.length(0);
01156 
01157       MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
01158       MMDevice_Map::ENTRY *entry = 0;
01159       for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
01160         {
01161           entry->int_id_.sep_->destroy (flow_spec);
01162         }
01163       MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
01164       for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
01165         {
01166           entry->int_id_.sep_->destroy (flow_spec);
01167         }
01168     }
01169   catch (const CORBA::Exception& ex)
01170     {
01171       ex._tao_print_exception ("TAO_StreamCtrl::unbind");
01172       return;
01173     }
01174 }
01175 
01176 void
01177 TAO_StreamCtrl::unbind_party (AVStreams::StreamEndPoint_ptr /* the_ep */,
01178                               const AVStreams::flowSpec &/* the_spec */)
01179 {
01180 }
01181 
01182 void
01183 TAO_StreamCtrl::unbind_dev (AVStreams::MMDevice_ptr /* dev */,
01184                             const AVStreams::flowSpec & /* the_spec */)
01185 {
01186 }
01187 
01188 AVStreams::VDev_ptr
01189 TAO_StreamCtrl::get_related_vdev (AVStreams::MMDevice_ptr adev,
01190                                   AVStreams::StreamEndPoint_out sep)
01191 {
01192   MMDevice_Map_Hash_Key key (adev);
01193   MMDevice_Map_Entry entry;
01194   int result = -1;
01195   result = this->mmdevice_a_map_.find (key, entry);
01196   if (result < 0)
01197     {
01198       result = this->mmdevice_a_map_.find (key, entry);
01199       if (result < 0)
01200         return AVStreams::VDev::_nil ();
01201     }
01202   sep = AVStreams::StreamEndPoint::_duplicate (entry.sep_.in ());
01203   return AVStreams::VDev::_duplicate (entry.vdev_.in ());
01204 }
01205 
01206 CORBA::Boolean
01207 
01208 TAO_StreamCtrl::modify_QoS (AVStreams::streamQoS &new_qos,
01209                             const AVStreams::flowSpec &the_spec)
01210 {
01211   if (TAO_debug_level > 0)
01212   ACE_DEBUG ((LM_DEBUG,
01213               "TAO_StreamCtrl::modify_QoS\n"));
01214 
01215 
01216   if (this->mcastconfigif_ != 0)
01217     {
01218       // call modify_Qos on the root VDev which is the mcast configif.
01219       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Cannot Modify the Qos for multipoint streams\n"));
01220     }
01221   else
01222     {
01223       try
01224         {
01225           AVStreams::flowSpec in_flowspec;
01226           AVStreams::flowSpec out_flowspec;
01227 
01228           in_flowspec.length (0);
01229           out_flowspec.length (0);
01230 
01231           int in_index = 0;
01232           int out_index = 0;
01233 
01234           AVStreams::flowSpec flowspec;
01235           if (the_spec.length () == 0)
01236             {
01237               // Apply modify_qos to all the flows
01238               flowspec = this->flows_;
01239               MMDevice_Map_Iterator iterator (this->mmdevice_a_map_);
01240               MMDevice_Map::ENTRY *entry = 0;
01241               for (;iterator.next (entry) !=  0;iterator.advance ())
01242                 {
01243                   flowspec = entry->int_id_.flowspec_;
01244                 }
01245             }
01246           else
01247             {
01248               flowspec = the_spec;
01249             }
01250 
01251           if (TAO_debug_level > 0)
01252             ACE_DEBUG ((LM_DEBUG,
01253                         "TAO_StreamCtrl::modify_QoS\n"));
01254 
01255 
01256           for (u_int i=0;i < flowspec.length ();i++)
01257             {
01258               TAO_Forward_FlowSpec_Entry entry;
01259               entry.parse (flowspec [i]);
01260               int direction = entry.direction ();
01261               if (direction == 0)
01262                 {
01263                   in_flowspec.length (in_index + 1);
01264                   in_flowspec [in_index++] = CORBA::string_dup (entry.entry_to_string ());
01265                 }
01266               else
01267                 {
01268                   out_flowspec.length (out_index + 1);
01269                   out_flowspec [out_index++] = CORBA::string_dup (entry.entry_to_string ());
01270                 }
01271             }
01272 
01273           if (in_flowspec.length () != 0)
01274             {
01275               this->vdev_a_->modify_QoS (new_qos, in_flowspec);
01276             }
01277 
01278           if (out_flowspec.length () != 0)
01279             {
01280               this->vdev_b_->modify_QoS (new_qos, out_flowspec);
01281             }
01282         }
01283       catch (const CORBA::Exception& ex)
01284         {
01285           ex._tao_print_exception ("TAO_StreamCtrl::modify_QoS");
01286           return 0;
01287         }
01288 
01289     }
01290   return 1;
01291 }
01292 
01293 // ----------------------------------------------------------------------
01294 // TAO_MCastConfigIf
01295 // ----------------------------------------------------------------------
01296 
01297 TAO_MCastConfigIf::TAO_MCastConfigIf (void)
01298   :peer_list_iterator_ (peer_list_)
01299 {
01300 }
01301 
01302 TAO_MCastConfigIf::~TAO_MCastConfigIf (void)
01303 {
01304   //no-op
01305 }
01306 
01307 // In future this should be a multicast message instead of point-to-point unicasts.
01308 CORBA::Boolean
01309 TAO_MCastConfigIf::set_peer (CORBA::Object_ptr peer,
01310                              AVStreams::streamQoS & qos,
01311                              const AVStreams::flowSpec & flow_spec)
01312 {
01313   try
01314     {
01315       Peer_Info *info;
01316       ACE_NEW_RETURN (info,
01317                       Peer_Info,
01318                       0);
01319       info->peer_ = AVStreams::VDev::_narrow (peer);
01320       info->qos_ = qos;
01321       info->flow_spec_ = flow_spec;
01322       this->peer_list_.insert_tail (info);
01323     }
01324   catch (const CORBA::Exception& ex)
01325     {
01326       ex._tao_print_exception ("TAO_MCastConfigIf::set_peer");
01327       return 0;
01328     }
01329   return 1;
01330 }
01331 
01332 // In future this should be a multicast message instead of point-to-point unicasts.
01333 void
01334 TAO_MCastConfigIf::configure (const CosPropertyService::Property & a_configuration)
01335 {
01336   Peer_Info *info;
01337   try
01338     {
01339       for (this->peer_list_iterator_.first ();
01340            (info = this->peer_list_iterator_.next ()) != 0;
01341            this->peer_list_iterator_.advance ())
01342         {
01343           info->peer_->configure (a_configuration);
01344         }
01345     }
01346   catch (const CORBA::Exception& ex)
01347     {
01348       ex._tao_print_exception (
01349         "TAO_MCastConfigIf::set_configure");
01350       return;
01351     }
01352 }
01353 
01354 
01355 void
01356 TAO_MCastConfigIf::set_initial_configuration (const CosPropertyService::Properties &initial)
01357 {
01358   this->initial_configuration_ = initial;
01359 }
01360 
01361 // In future this should be a multicast message instead of point-to-point unicasts.
01362 void
01363 TAO_MCastConfigIf::set_format (const char * flowName,
01364                                const char * format_name)
01365 {
01366   Peer_Info *info;
01367   try
01368     {
01369       for (this->peer_list_iterator_.first ();
01370            (info = this->peer_list_iterator_.next ()) != 0;
01371            this->peer_list_iterator_.advance ())
01372         {
01373           if (this->in_flowSpec (info->flow_spec_, flowName))
01374             {
01375               info->peer_->set_format (flowName, format_name);
01376             }
01377         }
01378     }
01379   catch (const CORBA::Exception& ex)
01380     {
01381       ex._tao_print_exception ("TAO_MCastConfigIf::set_format");
01382       return;
01383     }
01384 }
01385 
01386 // In future this should be a multicast message instead of point-to-point unicasts.
01387 void
01388 TAO_MCastConfigIf::set_dev_params (const char * flowName,
01389                                    const CosPropertyService::Properties & new_params)
01390 {
01391   Peer_Info *info;
01392   try
01393     {
01394 
01395       for (this->peer_list_iterator_.first ();
01396            (info = this->peer_list_iterator_.next ()) != 0;
01397            this->peer_list_iterator_.advance ())
01398         {
01399           if (this->in_flowSpec (info->flow_spec_, flowName))
01400             {
01401               info->peer_->set_dev_params (flowName, new_params);
01402             }
01403         }
01404     }
01405   catch (const CORBA::Exception& ex)
01406     {
01407       ex._tao_print_exception (
01408         "TAO_MCastConfigIf::set_dev_params");
01409       return;
01410     }
01411 }
01412 
01413 int
01414 TAO_MCastConfigIf::in_flowSpec (const AVStreams::flowSpec& flow_spec, const char *flow_name)
01415 {
01416   size_t len = ACE_OS::strlen (flow_name);
01417   for (CORBA::ULong i = 0; i < flow_spec.length (); i++)
01418     if (ACE_OS::strncmp (flow_spec[i], flow_name, len) == 0)
01419       {
01420         return 1;
01421       }
01422   return 0;
01423 }
01424 
01425 // ----------------------------------------------------------------------
01426 // TAO_Base_StreamEndPoint
01427 // ----------------------------------------------------------------------
01428 
01429 TAO_Base_StreamEndPoint::TAO_Base_StreamEndPoint (void)
01430   : protocol_object_set_ (0)
01431 {
01432 }
01433 
01434 TAO_Base_StreamEndPoint::~TAO_Base_StreamEndPoint (void)
01435 {
01436 }
01437 
01438 int
01439 TAO_Base_StreamEndPoint::handle_close (void)
01440 {
01441   // This method should not be defined, but EGCS complains endlessly
01442   // about it.
01443   return -1;
01444 }
01445 
01446 int
01447 TAO_Base_StreamEndPoint::handle_open (void)
01448 {
01449   return 0;
01450 }
01451 
01452 int
01453 TAO_Base_StreamEndPoint::handle_stop (const AVStreams::flowSpec &)
01454 {
01455   return 0;
01456 }
01457 
01458 int
01459 TAO_Base_StreamEndPoint::handle_start (const AVStreams::flowSpec &)
01460 {
01461   return 0;
01462 }
01463 
01464 int
01465 TAO_Base_StreamEndPoint::handle_destroy (const AVStreams::flowSpec &)
01466 {
01467   return 0;
01468 }
01469 
01470 // The following function is for backward compatibility.
01471 CORBA::Boolean
01472 TAO_Base_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &)
01473 {
01474   return 1;
01475 }
01476 
01477 // The following function is for backward compatibility.
01478 CORBA::Boolean
01479 TAO_Base_StreamEndPoint::handle_postconnect (AVStreams::flowSpec &)
01480 {
01481 
01482   while (!this->is_protocol_object_set ())
01483     TAO_AV_CORE::instance ()->orb ()->perform_work ();
01484   return 1;
01485 }
01486 
01487 // The following function is for backward compatibility.
01488 CORBA::Boolean
01489 TAO_Base_StreamEndPoint::handle_connection_requested (AVStreams::flowSpec &)
01490 {
01491   return 1;
01492 }
01493 
01494 int
01495 TAO_Base_StreamEndPoint::set_protocol_object (const char * /*flowname*/,
01496                                               TAO_AV_Protocol_Object * /*sfp_object*/)
01497 {
01498   return -1;
01499 }
01500 
01501 void
01502 TAO_Base_StreamEndPoint::protocol_object_set (void)
01503 {
01504   this->protocol_object_set_ = 1;
01505 }
01506 
01507 
01508 int
01509 TAO_Base_StreamEndPoint::is_protocol_object_set (void)
01510 {
01511   return this->protocol_object_set_;
01512 }
01513 
01514 int
01515 TAO_Base_StreamEndPoint::get_callback (const char * /*flowname*/,
01516                                        TAO_AV_Callback *&/*sfp_callback*/)
01517 {
01518   return -1;
01519 }
01520 
01521 int
01522 TAO_Base_StreamEndPoint::get_control_callback (const char * /*flowname*/,
01523                                                TAO_AV_Callback *&/*sfp_callback*/)
01524 {
01525   return -1;
01526 }
01527 
01528 void
01529 TAO_Base_StreamEndPoint::set_flow_handler (const char *flowname,
01530                                            TAO_AV_Flow_Handler *handler)
01531 {
01532   if(TAO_debug_level > 1)
01533   {
01534      ACE_DEBUG ((LM_DEBUG, "(%N,%l) TAO_Base_StreamEndPoint::set_flow_handler(), flowname: %s\n", flowname));
01535   }
01536   ACE_CString flow_name_key (flowname);
01537   if (this->flow_handler_map_.bind (flow_name_key, handler) != 0)
01538     ACE_ERROR ((LM_ERROR,
01539                 "Error in storing flow handler\n"));
01540 }
01541 
01542 void
01543 TAO_Base_StreamEndPoint::set_control_flow_handler (const char *flowname,
01544                                                    TAO_AV_Flow_Handler *handler)
01545 {
01546   ACE_CString flow_name_key (flowname);
01547   if (this->control_flow_handler_map_.bind (flow_name_key, handler) != 0)
01548     ACE_ERROR ((LM_ERROR,
01549                 "Error in storing control flow handler\n"));
01550 }
01551 
01552 // ----------------------------------------------------------------------
01553 // TAO_StreamEndPoint
01554 // ----------------------------------------------------------------------
01555 
01556 // constructor.
01557 
01558 TAO_StreamEndPoint::TAO_StreamEndPoint (void)
01559   :flow_count_ (0),
01560    flow_num_ (0),
01561    mcast_port_ (ACE_DEFAULT_MULTICAST_PORT+1)
01562 {
01563   //is->mcast_addr_ = ACE_OS::inet_addr (ACE_DEFAULT_MULTICAST_ADDR);
01564   this->mcast_addr_.set (ACE_DEFAULT_MULTICAST_ADDR);
01565   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::TAO_StreamEndPoint::mcast_addr = %s", this->mcast_addr_.c_str ()));
01566   //  this->handle_open ();
01567 }
01568 
01569 
01570 CORBA::Boolean
01571 TAO_StreamEndPoint::connect (AVStreams::StreamEndPoint_ptr responder,
01572                              AVStreams::streamQoS &qos,
01573                              const AVStreams::flowSpec &the_spec)
01574 {
01575   if (TAO_debug_level > 0)
01576     ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect ()\n"));
01577   CORBA::Boolean retv = 0;
01578   this->peer_sep_ = AVStreams::StreamEndPoint::_duplicate (responder);
01579   try
01580     {
01581       if (!CORBA::is_nil (this->negotiator_.in ()))
01582         {
01583           ACE_DEBUG ((LM_DEBUG,
01584                       "NEGOTIATOR AVIALABLE\n"));
01585 
01586           CORBA::Any_var negotiator_any = responder->get_property_value ("Negotiator");
01587 
01588           AVStreams::Negotiator_ptr peer_negotiator;
01589           negotiator_any.in () >>= peer_negotiator;
01590           if (!CORBA::is_nil (peer_negotiator))
01591             {
01592               CORBA::Boolean result =
01593                 this->negotiator_->negotiate (peer_negotiator,
01594                                               qos);
01595               if (!result)
01596                 if (TAO_debug_level > 0)
01597                   ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect (): negotiate failed\n"));
01598             }
01599         }
01600     }
01601   catch (const CORBA::Exception& ex)
01602     {
01603       ex._tao_print_exception ("TAO_StreamEndPoint::negotiate");
01604     }
01605 
01606   try
01607     {
01608       if (this->protocols_.length () > 0)
01609         {
01610           // choose protocols based on what the remote endpoint can support.
01611           CORBA::Any_var protocols_any =
01612             responder->get_property_value ("AvailableProtocols");
01613           AVStreams::protocolSpec peer_protocols;
01614           AVStreams::protocolSpec *temp_protocols;
01615           protocols_any.in () >>= temp_protocols;
01616           peer_protocols = *temp_protocols;
01617           for (u_int i=0;i<peer_protocols.length ();i++)
01618             {
01619               for (u_int j=0;j<this->protocols_.length ();j++)
01620                 if (ACE_OS::strcmp (peer_protocols [i],
01621                                     this->protocols_[j]) == 0)
01622                   {
01623                     // we'll agree upon the first protocol that matches.
01624                     this->protocol_ = CORBA::string_dup (peer_protocols [i]);
01625                     break;
01626                   }
01627             }
01628         }
01629     }
01630   catch (const CORBA::Exception&)
01631     {
01632       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Availableprotocols property not defined\n"));
01633     }
01634   try
01635     {
01636       AVStreams::streamQoS network_qos;
01637       if (qos.length () > 0)
01638         {
01639           if (TAO_debug_level > 0)
01640             ACE_DEBUG ((LM_DEBUG,
01641                         "QoS is Specified\n"));
01642 
01643           int result = this->translate_qos (qos,
01644                                             network_qos);
01645           if (result != 0)
01646             if (TAO_debug_level > 0)
01647               ACE_DEBUG ((LM_DEBUG,
01648                           "QoS translation failed\n"));
01649 
01650           this->qos ().set (network_qos);
01651         }
01652 
01653 
01654       AVStreams::flowSpec flow_spec (the_spec);
01655       this->handle_preconnect (flow_spec);
01656 
01657       if (TAO_debug_level > 0)
01658         ACE_DEBUG ((LM_DEBUG,
01659                     "TAO_StreamEndPoint::connect: flow_spec_length = %d\n",
01660                     flow_spec.length ()));
01661       u_int i;
01662       for (i=0;i<flow_spec.length ();i++)
01663         {
01664           TAO_Forward_FlowSpec_Entry *entry = 0;
01665           ACE_NEW_RETURN (entry,
01666                           TAO_Forward_FlowSpec_Entry,
01667                           0);
01668 
01669           if (entry->parse (flow_spec[i]) == -1)
01670             return 0;
01671 
01672           if (TAO_debug_level > 0)
01673             ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect: %s\n",  entry->entry_to_string ()));
01674 
01675           this->forward_flow_spec_set.insert (entry);
01676         }
01677 
01678       int result =TAO_AV_CORE::instance ()->init_forward_flows (this,
01679                                                                 this->forward_flow_spec_set,
01680                                                                 TAO_AV_Core::TAO_AV_ENDPOINT_A,
01681                                                                 flow_spec);
01682 
01683 
01684       if (result < 0)
01685         ACE_ERROR_RETURN ((LM_ERROR, "%N:%l TAO_AV_Core::init_forward_flows failed\n"), 0);
01686 
01687 
01688       AVStreams::StreamEndPoint_var streamendpoint = this->_this ();
01689 
01690       retv = responder->request_connection (streamendpoint.in (),
01691                                             0,
01692                                             network_qos,
01693                                             flow_spec);
01694 
01695       if (TAO_debug_level > 0)
01696          ACE_DEBUG ((LM_DEBUG, "%N:%l request_connection returned %d\n", retv));
01697 
01698       if (retv == 0)
01699         return retv;
01700       for (i=0;i<flow_spec.length ();i++)
01701         {
01702           TAO_Reverse_FlowSpec_Entry *entry = 0;
01703           ACE_NEW_RETURN (entry,
01704                           TAO_Reverse_FlowSpec_Entry,
01705                           0);
01706           if (entry->parse (flow_spec[i]) == -1)
01707             ACE_ERROR_RETURN ((LM_ERROR,
01708                                "Reverse_Flow_Spec_Set::parse failed\n"),
01709                               0);
01710 
01711           if (TAO_debug_level > 0)
01712             ACE_DEBUG ((LM_DEBUG,
01713                         "TAO_StreamEndPoint::Connect: Reverse Flow Spec %s\n",
01714                         entry->entry_to_string ()));
01715 
01716           this->reverse_flow_spec_set.insert (entry);
01717         }
01718 
01719       result = TAO_AV_CORE::instance ()->init_reverse_flows (this,
01720                                                              this->forward_flow_spec_set,
01721                                                              this->reverse_flow_spec_set,
01722                                                              TAO_AV_Core::TAO_AV_ENDPOINT_A);
01723       if (result < 0)
01724         ACE_ERROR_RETURN ((LM_ERROR,
01725                            "TAO_AV_Core::init_reverse_flows failed\n"),
01726                           0);
01727 
01728       // Make the upcall to the app
01729       retv = this->handle_postconnect (flow_spec);
01730     }
01731   catch (const CORBA::Exception& ex)
01732     {
01733       ex._tao_print_exception ("TAO_StreamEndPoint::connect");
01734       return 0;
01735     }
01736   return retv;
01737 }
01738 
01739 int
01740 TAO_StreamEndPoint::translate_qos (const AVStreams::streamQoS& application_qos,
01741                                    AVStreams::streamQoS& network_qos)
01742 {
01743   u_int len = application_qos.length ();
01744   network_qos.length (len);
01745   for (u_int i=0;i<len;i++)
01746     {
01747       network_qos [i].QoSType = application_qos [i].QoSType;
01748       network_qos [i].QoSParams = application_qos [i].QoSParams;
01749     }
01750   return 0;
01751 }
01752 
01753 // Stop the physical flow of data on the stream
01754 // Empty the_spec --> apply to all flows
01755 
01756 void
01757 TAO_StreamEndPoint::stop (const AVStreams::flowSpec &flow_spec)
01758 {
01759   // Make the upcall into the app
01760   this->handle_stop (flow_spec);
01761 
01762   if (flow_spec.length () > 0)
01763     {
01764 
01765       for (u_int i=0;i<flow_spec.length ();i++)
01766         {
01767           TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01768           for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01769                begin != end; ++begin)
01770             {
01771               TAO_Forward_FlowSpec_Entry entry;
01772               entry.parse (flow_spec[i]);
01773               if (ACE_OS::strcmp ((*begin)->flowname (), entry.flowname ()) == 0)
01774                {
01775                  TAO_FlowSpec_Entry *entry = *begin;
01776                  //                  (*begin)->protocol_object ()->stop ();
01777                  if (entry->handler() != 0)
01778                    entry->handler ()->stop (entry->role ());
01779                  if (entry->control_handler () != 0)
01780                    entry->control_handler ()->stop (entry->role ());
01781                  break;
01782                }
01783             }
01784         }
01785     }
01786   else
01787     {
01788       TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01789       for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01790            begin != end; ++begin)
01791         {
01792           TAO_FlowSpec_Entry *entry = *begin;
01793           //          entry->protocol_object ()->stop ();
01794           if (entry->handler() != 0)
01795             entry->handler ()->stop (entry->role ());
01796           if (entry->control_handler () != 0)
01797             entry->control_handler ()->stop (entry->role ());
01798         }
01799     }
01800 }
01801 
01802 // Start the physical flow of data on the stream
01803 // Empty the_spec --> apply to all flows
01804 void
01805 TAO_StreamEndPoint::start (const AVStreams::flowSpec &flow_spec)
01806 {
01807   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::start\n"));
01808   // Make the upcall into the app
01809   this->handle_start (flow_spec);
01810 
01811   if (flow_spec.length () > 0)
01812     {
01813       // Now call start on all the flow handlers.
01814       for (u_int i=0;i<flow_spec.length ();i++)
01815         {
01816           TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01817           for (TAO_AV_FlowSpecSetItor forward_begin = this->forward_flow_spec_set.begin ();
01818                forward_begin != end; ++forward_begin)
01819             {
01820               TAO_FlowSpec_Entry *entry = *forward_begin;
01821               if (ACE_OS::strcmp (entry->flowname (), flow_spec [i]) == 0)
01822                 {
01823                   //                  entry->protocol_object ()->start ();
01824                   if (entry->handler () != 0)
01825                   {
01826                     entry->handler ()->start (entry->role ());
01827                   }
01828                   if (entry->control_handler () != 0)
01829                   {
01830                     entry->control_handler ()->start (entry->role ());
01831                   }
01832                 }
01833             }
01834 
01835           end = this->reverse_flow_spec_set.end ();
01836           for (TAO_AV_FlowSpecSetItor reverse_begin = this->reverse_flow_spec_set.begin ();
01837                reverse_begin != end; ++reverse_begin)
01838             {
01839               TAO_FlowSpec_Entry *entry = *reverse_begin;
01840               if (ACE_OS::strcmp (entry->flowname (), flow_spec [i]) == 0)
01841                 {
01842                   //                  entry->protocol_object ()->start ();
01843                   if (entry->handler () != 0)
01844                   {
01845                     entry->handler ()->start (entry->role ());
01846                   }
01847                   if (entry->control_handler () != 0)
01848                   {
01849                     entry->control_handler ()->start (entry->role ());
01850                   }
01851                 }
01852             }
01853         }
01854     }
01855   else
01856     {
01857       TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01858       for (TAO_AV_FlowSpecSetItor forwardbegin = this->forward_flow_spec_set.begin ();
01859            forwardbegin != end; ++forwardbegin)
01860         {
01861           TAO_FlowSpec_Entry *entry = *forwardbegin;
01862           if (entry->handler () != 0)
01863             {
01864               entry->handler ()->start (entry->role ());
01865             }
01866           if (entry->control_handler () != 0)
01867             {
01868               entry->control_handler ()->start (entry->role ());
01869             }
01870         }
01871 
01872       end = this->reverse_flow_spec_set.end ();
01873       for (TAO_AV_FlowSpecSetItor reversebegin = this->reverse_flow_spec_set.begin ();
01874            reversebegin != end; ++reversebegin)
01875         {
01876           TAO_FlowSpec_Entry *entry = *reversebegin;
01877           //          entry->protocol_object ()->start ();
01878           if (entry->handler () != 0)
01879             {
01880               entry->handler ()->start (entry->role ());
01881             }
01882           if (entry->control_handler () != 0)
01883             {
01884               entry->control_handler ()->start (entry->role ());
01885             }
01886         }
01887     }
01888 }
01889 
01890 // Close the connection
01891 void
01892 TAO_StreamEndPoint::destroy (const AVStreams::flowSpec &flow_spec)
01893 {
01894   CORBA::Any_var vdev_any = this->get_property_value ("Related_VDev");
01895 
01896   AVStreams::VDev_ptr vdev;
01897 
01898   vdev_any.in() >>= vdev;
01899   CORBA::Any_var mc_any = vdev->get_property_value ("Related_MediaCtrl");
01900 
01901   // The Related_MediaCtrl property was inserted as a CORBA::Object, so we
01902   // must extract it as the same type.
01903   CORBA::Object_var obj;
01904   mc_any.in() >>= CORBA::Any::to_object( obj.out() );
01905 
01906   AVStreams::MediaControl_var media_ctrl =
01907           AVStreams::MediaControl::_narrow( obj.in() );
01908 
01909   // deactivate the associated vdev and media ctrl
01910 
01911   if ( !CORBA::is_nil( vdev ) )
01912   {
01913     PortableServer::ServantBase_var vdev_servant =
01914         TAO_AV_CORE::instance()->poa()->reference_to_servant ( vdev );
01915     TAO_AV_Core::deactivate_servant (vdev_servant.in());
01916   }
01917 
01918   if ( !CORBA::is_nil ( media_ctrl.in () ) )
01919   {
01920     PortableServer::ServantBase_var mc_servant =
01921         TAO_AV_CORE::instance()->poa()->reference_to_servant (media_ctrl.in());
01922     TAO_AV_Core::deactivate_servant (mc_servant.in());
01923   }
01924 
01925   int result = TAO_AV_Core::deactivate_servant (this);
01926   if (result < 0)
01927     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::destroy failed\n"));
01928 
01929   if (flow_spec.length () > 0)
01930     {
01931       for (u_int i=0;i<flow_spec.length ();i++)
01932         {
01933           {
01934             TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01935             for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01936                  begin != end; ++begin)
01937               {
01938                 TAO_FlowSpec_Entry *entry = *begin;
01939                 TAO_Tokenizer flow_name (flow_spec [i], '\\');
01940                 if (ACE_OS::strcmp (entry->flowname (), flow_name[0]) == 0)
01941                   {
01942                     if (entry->protocol_object ())
01943                       {
01944                         entry->protocol_object ()->destroy ();
01945                       }
01946                     break;
01947                   }
01948               }
01949           }
01950           {
01951             TAO_AV_FlowSpecSetItor end = this->reverse_flow_spec_set.end ();
01952             for (TAO_AV_FlowSpecSetItor begin = this->reverse_flow_spec_set.begin ();
01953                  begin != end; ++begin)
01954               {
01955                 TAO_FlowSpec_Entry *entry = *begin;
01956                 TAO_Tokenizer flow_name (flow_spec [i], '\\');
01957                 if (ACE_OS::strcmp (entry->flowname (), flow_name[0]) == 0)
01958                   {
01959                     if (entry->protocol_object ())
01960                       {
01961                         entry->protocol_object ()->destroy ();
01962                       }
01963                     break;
01964                   }
01965               }
01966           }
01967         }
01968     }
01969   else
01970     {
01971       {
01972         TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01973         for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01974              begin != end; ++begin)
01975           {
01976             TAO_FlowSpec_Entry *entry = *begin;
01977             if (entry->protocol_object ())
01978               {
01979                 entry->protocol_object ()->stop ();
01980 
01981                 ACE_CString control_flowname =
01982                     TAO_AV_Core::get_control_flowname (entry->flowname ());
01983                 TAO_AV_CORE::instance()->remove_acceptor(entry->flowname());
01984                 TAO_AV_CORE::instance()->remove_acceptor(control_flowname.c_str());
01985 
01986                 entry->protocol_object ()->destroy ();
01987               }
01988           }
01989       }
01990       {
01991         TAO_AV_FlowSpecSetItor end = this->reverse_flow_spec_set.end ();
01992         for (TAO_AV_FlowSpecSetItor begin = this->reverse_flow_spec_set.begin ();
01993              begin != end; ++begin)
01994           {
01995             TAO_FlowSpec_Entry *entry = *begin;
01996             if (entry->protocol_object ())
01997               {
01998                 entry->protocol_object ()->stop ();
01999 
02000                 ACE_CString control_flowname =
02001                     TAO_AV_Core::get_control_flowname (entry->flowname ());
02002                 TAO_AV_CORE::instance()->remove_connector(entry->flowname());
02003                 TAO_AV_CORE::instance()->remove_connector(control_flowname.c_str());
02004                 entry->protocol_object ()->destroy ();
02005 
02006               }
02007           }
02008       }
02009     }
02010 
02011   // Make the upcall into the app
02012   //  this->handle_destroy (the_spec);
02013   //
02014 }
02015 
02016 // Called by our peer endpoint, requesting us to establish
02017 // a connection
02018 CORBA::Boolean
02019 TAO_StreamEndPoint::request_connection (AVStreams::StreamEndPoint_ptr /*initiator*/,
02020                                         CORBA::Boolean /*is_mcast*/,
02021                                         AVStreams::streamQoS &qos,
02022                                         AVStreams::flowSpec &flow_spec)
02023 
02024 {
02025   if (TAO_debug_level > 0)
02026     ACE_DEBUG ((LM_DEBUG,
02027                 "\n(%P|%t) TAO_StreamEndPoint::request_connection called"));
02028 
02029   int result = 0;
02030   try
02031     {
02032       AVStreams::streamQoS network_qos;
02033       if (qos.length () > 0)
02034         {
02035          if (TAO_debug_level > 0)
02036           ACE_DEBUG ((LM_DEBUG,
02037                       "QoS is Specified\n"));
02038 
02039           int result = this->translate_qos (qos, network_qos);
02040           if (result != 0)
02041             if (TAO_debug_level > 0)
02042               ACE_DEBUG ((LM_DEBUG, "QoS translation failed\n"));
02043 
02044           this->qos ().set (network_qos);
02045         }
02046 
02047       if (TAO_debug_level > 0)
02048         ACE_DEBUG ((LM_DEBUG,
02049                     "\n(%P|%t) TAO_StreamEndPoint::request_connection: "
02050                     "flowspec has length = %d and the strings are:\n",
02051                     flow_spec.length ()));
02052       CORBA::ULong i;
02053 
02054       for (i=0;i<flow_spec.length ();i++)
02055         {
02056           TAO_Forward_FlowSpec_Entry *entry = 0;
02057           ACE_NEW_RETURN (entry,
02058                           TAO_Forward_FlowSpec_Entry,
02059                           0);
02060 
02061           CORBA::String_var string_entry = CORBA::string_dup (flow_spec[i]);
02062 
02063           if(TAO_debug_level > 0)
02064              ACE_DEBUG(( LM_DEBUG,
02065                          "%N:%l Parsing flow spec: [%s]\n",
02066                          string_entry.in ()));
02067 
02068           if (entry->parse (string_entry.in ()) == -1)
02069           {
02070             if (TAO_debug_level > 0)
02071               ACE_DEBUG ((LM_DEBUG,
02072                           "%N:%l Error parsing flow_spec: [%s]\n",
02073                           string_entry.in ()));
02074             return 0;
02075           }
02076           if (TAO_debug_level > 0)
02077             ACE_DEBUG ((LM_DEBUG,
02078                         "TAO_StreamEndPoint::request_connection flow spec [%s]\n",
02079                         entry->entry_to_string ()));
02080 
02081           this->forward_flow_spec_set.insert (entry);
02082         }
02083 
02084       result = TAO_AV_CORE::instance ()->init_forward_flows (this,
02085                                                              this->forward_flow_spec_set,
02086                                                              TAO_AV_Core::TAO_AV_ENDPOINT_B,
02087                                                              flow_spec);
02088 
02089       if (result < 0)
02090         return 0;
02091 
02092       // Make the upcall to the app
02093       result = this->handle_connection_requested (flow_spec);
02094     }
02095   catch (const CORBA::Exception& ex)
02096     {
02097       ex._tao_print_exception ("TAO_StreamEndpoint::request_connection");
02098       return 0;
02099     }
02100   return result;
02101 }
02102 
02103 int
02104 TAO_StreamEndPoint::change_qos (AVStreams::streamQoS &new_qos,
02105                                 const AVStreams::flowSpec &the_flows)
02106 {
02107   if (TAO_debug_level > 0)
02108     ACE_DEBUG ((LM_DEBUG,
02109                 "TAO_StreamEndPoint::change_qos\n"));
02110 
02111   TAO_AV_QoS qos (new_qos);
02112   for (int i = 0; (unsigned) i < the_flows.length (); i++)
02113     {
02114       TAO_Forward_FlowSpec_Entry entry;
02115       entry.parse (the_flows [i]);
02116       ACE_CString flow_name_key (entry.flowname ());
02117       Flow_Handler_Map_Entry *handler_entry;
02118       if (this->flow_handler_map_.find (flow_name_key,
02119                                         handler_entry) == 0)
02120         {
02121           AVStreams::QoS flow_qos;
02122           if (qos.get_flow_qos (entry.flowname (), flow_qos) != 0)
02123             ACE_DEBUG ((LM_DEBUG,
02124                         "New QoS for the flow %s is not specified\n",
02125                         entry.flowname ()));
02126           int result;
02127           result = handler_entry->int_id_->change_qos (flow_qos);
02128           if (result != 0)
02129             ACE_ERROR_RETURN ((LM_ERROR,
02130                                "Modifying QoS Failed\n"),
02131                               -1);
02132 
02133         }
02134     }
02135   return 0;
02136 }
02137 
02138 // Refers to modification of transport QoS.
02139 CORBA::Boolean
02140 TAO_StreamEndPoint::modify_QoS (AVStreams::streamQoS &new_qos,
02141                                 const AVStreams::flowSpec &the_flows)
02142 {
02143   if (TAO_debug_level > 0)
02144   ACE_DEBUG ((LM_DEBUG,
02145               "TAO_StreamEndPoint::modify_QoS\n"));
02146 
02147   int result =  this->change_qos (new_qos, the_flows);
02148 
02149   if (result != 0)
02150     return 0;
02151 
02152   return 1;
02153 
02154 }
02155 
02156 // Sets the list of protocols this streamendpoint can understand.
02157 
02158 CORBA::Boolean
02159 TAO_StreamEndPoint::set_protocol_restriction (const AVStreams::protocolSpec &protocols)
02160 {
02161   try
02162     {
02163       CORBA::Any protocol_restriction_any;
02164 
02165       protocol_restriction_any <<= protocols;
02166       this->define_property ("ProtocolRestriction",
02167                              protocol_restriction_any);
02168       this->protocols_ = protocols;
02169     }
02170   catch (const CORBA::Exception& ex)
02171     {
02172       ex._tao_print_exception (
02173         "TAO_StreamEndPoint::set_protocol_restriction");
02174       return 0;
02175     }
02176   return 1;
02177 }
02178 
02179 
02180 void
02181 TAO_StreamEndPoint::disconnect (const AVStreams::flowSpec &the_spec)
02182 {
02183   ACE_UNUSED_ARG (the_spec);
02184 }
02185 
02186 // Sets the status of the flow protocol.
02187 
02188 void
02189 TAO_StreamEndPoint::set_FPStatus (const AVStreams::flowSpec &/*the_spec*/,
02190                                   const char *fp_name,
02191                                   const CORBA::Any &fp_settings)
02192 {
02193   if (ACE_OS::strcmp (fp_name, "SFP1.0") != 0)
02194     return;
02195   fp_settings >>= this->sfp_status_;
02196   // @@Naga: We should call set_FPStatus on all the protocol objects.
02197 }
02198 
02199 
02200 CORBA::Object_ptr
02201 TAO_StreamEndPoint::get_fep (const char *flow_name)
02202 {
02203   ACE_CString fep_name_key (flow_name);
02204   AVStreams::FlowEndPoint_var fep_entry;
02205   if (this->fep_map_.find (fep_name_key, fep_entry) == 0)
02206     return fep_entry._retn();
02207   return 0;
02208 }
02209 
02210 char*
02211 TAO_StreamEndPoint::add_fep_i_add_property (AVStreams::FlowEndPoint_ptr fep)
02212 {
02213   ACE_CString flow_name;
02214 
02215   try
02216     {
02217       // exception implies the flow name is not defined and is system
02218       // generated.
02219       flow_name = "flow";
02220       char tmp[255];
02221       ACE_OS::sprintf (tmp, "%u", this->flow_num_++);
02222       flow_name += tmp;
02223 
02224       CORBA::Any flowname_any;
02225       flowname_any <<= flow_name.c_str ();
02226       fep->define_property ("Flow",
02227                             flowname_any);
02228     }
02229   catch (const CORBA::Exception& ex)
02230     {
02231       ex._tao_print_exception ("TAO_StreamEndPoint::add_fep");
02232       return 0;
02233     }
02234   return ACE_OS::strdup( flow_name.c_str () );
02235 }
02236 
02237 char*
02238 TAO_StreamEndPoint::add_fep_i (AVStreams::FlowEndPoint_ptr fep)
02239 {
02240   CORBA::String_var flow_name;
02241   try
02242     {
02243       CORBA::Any_var flow_name_any =
02244         fep->get_property_value ("FlowName");
02245 
02246       const char *tmp;
02247       flow_name_any >>= tmp;
02248       flow_name = CORBA::string_dup (tmp);
02249     }
02250   catch (const CORBA::Exception&)
02251     {
02252       flow_name =
02253         this->add_fep_i_add_property (fep);
02254     }
02255   return flow_name._retn ();
02256 }
02257 
02258 char *
02259 TAO_StreamEndPoint::add_fep (CORBA::Object_ptr fep_obj)
02260 {
02261   AVStreams::FlowEndPoint_var fep =
02262     AVStreams::FlowEndPoint::_narrow (fep_obj);
02263 
02264   CORBA::String_var flow_name =
02265     this->add_fep_i (fep.in ());
02266 
02267   try
02268     {
02269       fep->lock ();
02270       // Add it to the sequence of flowNames supported.
02271       // put the flowname and the flowendpoint in a hashtable.
02272       ACE_CString fep_name_key (CORBA::string_dup (flow_name.in ()));
02273       if (this->fep_map_.bind (fep_name_key, AVStreams::FlowEndPoint::_duplicate (fep.in ())) != 0)
02274         {
02275           throw AVStreams::streamOpFailed ();
02276         }
02277       // increment the flow count.
02278       this->flow_count_++;
02279       this->flows_.length (this->flow_count_);
02280       this->flows_[this->flow_count_-1] = flow_name;
02281       // define/modify the "Flows" property.
02282       CORBA::Any flows_any;
02283       flows_any <<= this->flows_;
02284       this->define_property ("Flows",
02285                              flows_any);
02286     }
02287   catch (const CORBA::Exception& ex)
02288     {
02289       ex._tao_print_exception ("TAO_StreamEndPoint::add_fep");
02290       return 0;
02291     }
02292   return flow_name._retn ();
02293 }
02294 
02295 
02296 void
02297 TAO_StreamEndPoint::remove_fep (const char *flow_name)
02298 {
02299   try
02300     {
02301       ACE_CString fep_name_key (flow_name);
02302       AVStreams::FlowEndPoint_var fep_entry;
02303       // Remove the fep from the hash table.
02304       if (this->fep_map_.unbind (fep_name_key, fep_entry)!= 0)
02305         throw AVStreams::streamOpFailed ();
02306       // redefine the "Flows" property
02307       AVStreams::flowSpec new_flows (this->flows_.length ());
02308       for (u_int i=0, j=0 ; i <this->flows_.length (); i++)
02309         if (ACE_OS::strcmp (flow_name, this->flows_[i]) != 0)
02310           new_flows[j++] = this->flows_[i];
02311 
02312       CORBA::Any flows;
02313       flows <<= new_flows;
02314       this->flows_ = new_flows;
02315       this->define_property ("Flows",
02316                              flows);
02317     }
02318   catch (const CORBA::Exception& ex)
02319     {
02320       ex._tao_print_exception ("TAO_StreamEndPoint::remove_fep");
02321     }
02322 }
02323 
02324 // Sets the negotiator object.
02325 void
02326 TAO_StreamEndPoint::set_negotiator (AVStreams::Negotiator_ptr new_negotiator)
02327 {
02328   try
02329     {
02330       CORBA::Any negotiator;
02331       negotiator <<= new_negotiator;
02332       this->define_property ("Negotiator",
02333                              negotiator);
02334       this->negotiator_ = AVStreams::Negotiator::_duplicate (new_negotiator);
02335     }
02336   catch (const CORBA::Exception& ex)
02337     {
02338       ex._tao_print_exception (
02339         "TAO_StreamEndPoint::set_negotiator");
02340     }
02341 }
02342 
02343 
02344 // Sets the public key used for this streamendpoint.
02345 void
02346 TAO_StreamEndPoint::set_key (const char *flow_name,
02347                              const AVStreams::key & the_key)
02348 {
02349   try
02350     {
02351       this->key_ = the_key;
02352       CORBA::Any PublicKey;
02353       PublicKey <<= the_key;
02354       char PublicKey_property [BUFSIZ];
02355       ACE_OS::sprintf (PublicKey_property, "%s_PublicKey", flow_name);
02356       this->define_property (PublicKey_property,
02357                              PublicKey);
02358     }
02359   catch (const CORBA::Exception& ex)
02360     {
02361       ex._tao_print_exception ("TAO_StreamEndPoint::set_key");
02362     }
02363 }
02364 
02365 // Set the source id.
02366 void
02367 TAO_StreamEndPoint::set_source_id (CORBA::Long source_id)
02368 {
02369   this->source_id_ = source_id;
02370 }
02371 
02372 CORBA::Boolean
02373 TAO_StreamEndPoint::multiconnect (AVStreams::streamQoS &/*the_qos*/,
02374                                   AVStreams::flowSpec &/*flow_spec*/)
02375 {
02376   if (TAO_debug_level > 0)
02377     ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::multiconnect\n"));
02378   return 0;
02379 }
02380 
02381 TAO_StreamEndPoint::~TAO_StreamEndPoint (void)
02382 {
02383   //this->handle_close ();
02384   TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
02385   TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
02386 
02387   int i=0;
02388   // @@ Naga: Will the iterator always give the entries in the order of insertion.
02389   // or is it an implementation fact of ACE containers.
02390   for ( ; begin != end; ++begin, ++i)
02391     {
02392 //       if (i >= FLOWSPEC_MAX)
02393 //         {
02394           TAO_FlowSpec_Entry *entry = *begin;
02395           delete entry;
02396           //        }
02397     }
02398   begin = this->reverse_flow_spec_set.begin ();
02399   end = this->reverse_flow_spec_set.end ();
02400   i = 0;
02401   for (; begin != end; ++begin)
02402     {
02403 //       if (i >= FLOWSPEC_MAX)
02404 //         {
02405           TAO_FlowSpec_Entry *entry = *begin;
02406           delete entry;
02407           //        }
02408     }
02409 }
02410 
02411 
02412 // ----------------------------------------------------------------------
02413 // TAO_StreamEndPoint_A
02414 // ----------------------------------------------------------------------
02415 
02416 TAO_StreamEndPoint_A::TAO_StreamEndPoint_A (void)
02417 {
02418   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_StreamEndPoint_A::TAO_StreamEndPoint_A: created\n"));
02419 }
02420 
02421 // IP Multicast style connect.
02422 CORBA::Boolean
02423 TAO_StreamEndPoint_A::multiconnect (AVStreams::streamQoS &stream_qos,
02424                                     AVStreams::flowSpec &flow_spec)
02425 {
02426   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPointA::multiconnect\n"));
02427   try
02428     {
02429       int result = 0;
02430       TAO_AV_QoS qos (stream_qos);
02431       for (u_int i=0;i< flow_spec.length ();i++)
02432         {
02433           TAO_Forward_FlowSpec_Entry *forward_entry = 0;
02434           ACE_NEW_RETURN (forward_entry,
02435                           TAO_Forward_FlowSpec_Entry,
02436                           0);
02437           forward_entry->parse (flow_spec[i]);
02438           ACE_CString mcast_key (forward_entry->flowname ());
02439           AVStreams::FlowEndPoint_var flow_endpoint;
02440 
02441           // @@Naga: There is a problem in the full profile case for multiconnect. Since
02442           // multiconnect on sep_a is called everytime a sink is added and if called for
02443           // the same flow twice, the following code will just call add producer on the flow connection.
02444           // It is however very hard to find out if the flow producer is already in the flow connection
02445           // since comparing object references will not work and the flowproducer reference is
02446           // generated by _narrow. Our only hope is that _narrow on the same fep will return the same
02447           // pointer for the flowproducer in which case we can find out if the flowproducer exists in
02448           // fep set for that flowconnection.
02449           if (this->fep_map_.find (mcast_key, flow_endpoint) == 0)
02450             {
02451               try
02452                 {
02453                   AVStreams::QoS flow_qos;
02454                   result = qos.get_flow_qos (forward_entry->flowname (), flow_qos);
02455                   if (result < 0)
02456                     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "QoS not found for %s\n", forward_entry->flowname ()));
02457                   // Narrow it to FlowProducer.
02458                   AVStreams::FlowProducer_var producer;
02459                   producer = AVStreams::FlowProducer::_narrow (flow_endpoint.in());
02460                   //
02461                   // Else narrow succeeeded.
02462                   if (!CORBA::is_nil (producer.in ()))
02463                     {
02464                       AVStreams::FlowConnection_var flow_connection;
02465                       try
02466                         {
02467                           if (CORBA::is_nil (this->streamctrl_.in ()))
02468                               {
02469                                 CORBA::Any_var streamctrl_any;
02470                                 streamctrl_any = this->get_property_value ("Related_StreamCtrl");
02471                                 AVStreams::StreamCtrl_ptr streamctrl;
02472                                 streamctrl_any.in () >>= streamctrl;
02473                                 this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (streamctrl);
02474                               }
02475 
02476                           CORBA::Object_var flow_connection_obj =
02477                             this->streamctrl_->get_flow_connection (forward_entry->flowname ());
02478                           flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj.in ());
02479                         }
02480                       catch (const CORBA::Exception&)
02481                         {
02482                           TAO_FlowConnection *flowConnection;
02483                           ACE_NEW_RETURN (flowConnection,
02484                                           TAO_FlowConnection,
02485                                           0);
02486                           //@@ Strategize the multicast address allocation.
02487                           flowConnection->set_mcast_addr (this->mcast_addr_, this->mcast_port_);
02488                           this->mcast_port_++;
02489                           flowConnection->set_protocol (forward_entry->carrier_protocol_str ());
02490                           flow_connection = flowConnection->_this ();
02491                           this->streamctrl_->set_flow_connection (forward_entry->flowname (),
02492                                                                   flow_connection.in ());
02493                         }
02494                       if (ACE_OS::strcmp (forward_entry->flow_protocol_str (), "") != 0)
02495                         {
02496                           CORBA::Any fp_settings;
02497                           flow_connection->use_flow_protocol (forward_entry->flow_protocol_str (),
02498                                                               fp_settings);
02499                         }
02500                       result = flow_connection->add_producer (producer.in (),
02501                                                               flow_qos);
02502                       if (result == 0)
02503                         ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_A::multiconnect: add_producer failed\n"), 0);
02504                     }
02505                 }
02506               catch (const CORBA::Exception& ex)
02507                 {
02508                   // Narrow failed and since its not a flowproducer its an error.
02509                   ex._tao_print_exception (
02510                     "FlowProducer::_narrow");
02511                   ACE_ERROR_RETURN ((LM_ERROR, "sep_a doesn't contain a flowproducer"), 0);
02512                 }
02513             }
02514           else
02515             {
02516               ACE_INET_Addr *mcast_addr;
02517               TAO_FlowSpec_Entry *entry = 0;
02518               result = this->mcast_entry_map_.find (mcast_key, entry);
02519               if (result == 0)
02520                 {
02521                   mcast_addr = dynamic_cast<ACE_INET_Addr *> (entry->address ());
02522                   char str_addr [BUFSIZ];
02523                   result = mcast_addr->addr_to_string (str_addr, BUFSIZ);
02524                   if (result < 0)
02525                     ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPointA::multiconnect ::addr_to_string failed\n"), 0);
02526                   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint_A::multiconnect:%s\n", str_addr));
02527                   TAO_Forward_FlowSpec_Entry new_entry (entry->flowname (),
02528                                                         entry->direction_str (),
02529                                                         entry->format (),
02530                                                         entry->flow_protocol_str (),
02531                                                         entry->carrier_protocol_str (),
02532                                                         entry->address ());
02533                   flow_spec[i] = CORBA::string_dup (new_entry.entry_to_string ());
02534                 }
02535               else
02536                 {
02537 
02538                   switch (forward_entry->direction ())
02539                     {
02540                     case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
02541                       {
02542                         ACE_NEW_RETURN (mcast_addr,
02543                                         ACE_INET_Addr,
02544                                         0);
02545                         mcast_addr->set (this->mcast_port_, this->mcast_addr_.c_str ());
02546                         this->mcast_port_++;
02547                         char buf[BUFSIZ];
02548                         mcast_addr->addr_to_string (buf, BUFSIZ);
02549                         if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%s\n", buf));
02550                         TAO_Forward_FlowSpec_Entry *new_entry;
02551                         ACE_NEW_RETURN (new_entry,
02552                                         TAO_Forward_FlowSpec_Entry (forward_entry->flowname (),
02553                                                                     forward_entry->direction_str (),
02554                                                                     forward_entry->format (),
02555                                                                     forward_entry->flow_protocol_str (),
02556                                                                     forward_entry->carrier_protocol_str (),
02557                                                                     mcast_addr),
02558                                         0);
02559                         flow_spec[i] = CORBA::string_dup (new_entry->entry_to_string ());
02560                         //new_entry->is_multicast (1);
02561 
02562                         this->forward_flow_spec_set.insert (new_entry);
02563                         TAO_AV_Acceptor_Registry *acceptor_registry = TAO_AV_CORE::instance ()->acceptor_registry ();
02564                         result = acceptor_registry->open (this,
02565                                                           TAO_AV_CORE::instance (),
02566                                                           this->forward_flow_spec_set);
02567                         if (result < 0)
02568                           ACE_ERROR_RETURN ((LM_ERROR, "Acceptor_Registry::open failed\n"), 0);
02569                         result = this->mcast_entry_map_.bind (mcast_key, new_entry);
02570                         if (result < 0)
02571                           ACE_ERROR_RETURN ((LM_ERROR, "mcast_entry::bind failed"), 0);
02572                       }
02573                       break;
02574                     case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
02575                       // OUT implies we're the sink.
02576                       break;
02577                     default:
02578                       break;
02579                     }
02580                 }
02581             }
02582         }
02583     }
02584   catch (const CORBA::Exception& ex)
02585     {
02586       ex._tao_print_exception (
02587         "TAO_StreamEndPoint_A::multiconnect");
02588       return 0;
02589     }
02590   return 1;
02591 }
02592 
02593 // ATM style Multicast is not supported yet.
02594 CORBA::Boolean
02595 TAO_StreamEndPoint_A::connect_leaf (AVStreams::StreamEndPoint_B_ptr /* the_ep */,
02596                                     AVStreams::streamQoS & /* the_qos */,
02597                                     const AVStreams::flowSpec & /* the_flows */)
02598 {
02599   throw AVStreams::notSupported ();
02600 }
02601 
02602 // Multicast not supported yet.
02603 void
02604 TAO_StreamEndPoint_A::disconnect_leaf (AVStreams::StreamEndPoint_B_ptr /* the_ep */,
02605                                        const AVStreams::flowSpec & /* theSpec */)
02606 
02607 {
02608 
02609   throw AVStreams::notSupported ();
02610 
02611 }
02612 
02613 TAO_StreamEndPoint_A::~TAO_StreamEndPoint_A (void)
02614 {
02615 }
02616 
02617 // ----------------------------------------------------------------------
02618 // TAO_StreamEndPoint_B
02619 // ----------------------------------------------------------------------
02620 
02621 TAO_StreamEndPoint_B::TAO_StreamEndPoint_B (void)
02622 {
02623   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
02624                                        "\n(%P|%t) TAO_StreamEndPoint_B::TAO_StreamEndPoint_B: created"));
02625 }
02626 
02627 CORBA::Boolean
02628 TAO_StreamEndPoint_B::multiconnect (AVStreams::streamQoS &stream_qos,
02629                                     AVStreams::flowSpec &flow_spec)
02630 {
02631   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint_B::multiconnect\n"));
02632   try
02633     {
02634       int result = 0;
02635       TAO_AV_QoS qos (stream_qos);
02636       for (u_int i=0;i< flow_spec.length ();i++)
02637         {
02638           TAO_Forward_FlowSpec_Entry *forward_entry;
02639           ACE_NEW_RETURN (forward_entry,
02640                           TAO_Forward_FlowSpec_Entry,
02641                           0);
02642           forward_entry->parse (flow_spec[i]);
02643           ACE_CString mcast_key (forward_entry->flowname ());
02644           AVStreams::FlowEndPoint_var flow_endpoint;
02645           if (this->fep_map_.find (mcast_key, flow_endpoint ) == 0)
02646             {
02647               AVStreams::FlowConsumer_var consumer;
02648               try
02649                 {
02650                   consumer = AVStreams::FlowConsumer::_narrow (flow_endpoint.in ());
02651                 }
02652               catch (const CORBA::Exception& ex)
02653                 {
02654                   ex._tao_print_exception (
02655                     "FlowConsumer::_narrow");
02656                   ACE_ERROR_RETURN ((LM_ERROR, "sep_b doesn't contain a flowconsumer"), 0);
02657                 }
02658               AVStreams::QoS flow_qos;
02659               result = qos.get_flow_qos (forward_entry->flowname (), flow_qos);
02660               if (result < 0)
02661                 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "QoS not found for %s", forward_entry->flowname ()));
02662               AVStreams::FlowConnection_var flow_connection;
02663               try
02664                 {
02665                   if (CORBA::is_nil (this->streamctrl_.in ()))
02666                     {
02667                       CORBA::Any_var streamctrl_any;
02668                       streamctrl_any = this->get_property_value ("Related_StreamCtrl");
02669                       AVStreams::StreamCtrl_ptr streamctrl;
02670                       streamctrl_any.in () >>= streamctrl;
02671                       this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (streamctrl);
02672                     }
02673                   CORBA::Object_var flow_connection_obj =
02674                     this->streamctrl_->get_flow_connection (forward_entry->flowname ());
02675                   flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj.in ());
02676                 }
02677               catch (const CORBA::Exception& ex)
02678                 {
02679                   ex._tao_print_exception (
02680                     "TAO_StreamEndPoint_B::multiconnect::get_flow_connection");
02681                   return 0;
02682                 }
02683               result = flow_connection->add_consumer (consumer.in (),
02684                                                       flow_qos);
02685               if (result == 0)
02686                 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_B::multiconnect:add_consumer failed\n"), 0);
02687             }
02688           else
02689             {
02690               TAO_FlowSpec_Entry *mcast_entry = 0;
02691               ACE_INET_Addr *mcast_addr;
02692               mcast_addr = dynamic_cast<ACE_INET_Addr *> (forward_entry->address ());
02693               if (mcast_addr == 0)
02694                 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_B::multiconnect::Address missing in flowspec_entry\n"), 0);
02695               result = this->mcast_entry_map_.find (mcast_key, mcast_entry);
02696               if (result == 0)
02697                 {
02698                   ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_B::multiconnect::handler already found\n"), 0);
02699                 }
02700               else
02701                 {
02702                   switch (forward_entry->direction ())
02703                     {
02704                     case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
02705                       {
02706                         // IN means we're the sink.
02707                         // @@ We have to take care of this.
02708                         //                 result = this->make_dgram_mcast_flow_handler (mcast_dgram);
02709                         //                 if (result < 0)
02710                         //                   return 0;
02711 
02712                         this->forward_flow_spec_set.insert (forward_entry);
02713                         TAO_AV_Connector_Registry *connector_registry = TAO_AV_CORE::instance ()->connector_registry ();
02714                         result = connector_registry->open (this,
02715                                                            TAO_AV_CORE::instance (),
02716                                                            this->forward_flow_spec_set);
02717                         if (result < 0)
02718                           ACE_ERROR_RETURN ((LM_ERROR, "connector_registry::open failed\n"), 0);
02719                         result = this->mcast_entry_map_.bind (mcast_key, forward_entry);
02720                         if (result < 0)
02721                           ACE_ERROR_RETURN ((LM_ERROR, "dgram_mcast_handler::bind failed"), 0);
02722                       }
02723                       break;
02724                     case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
02725                       // OUT implies we're the source., which is an error.
02726                       break;
02727                     default:
02728                       break;
02729                     }
02730                 }
02731             }
02732         }
02733     }
02734   catch (const CORBA::Exception& ex)
02735     {
02736       ex._tao_print_exception (
02737         "TAO_StreamEndPoint_B::multiconnect");
02738       return 0;
02739     }
02740   return 1;
02741 }
02742 
02743 TAO_StreamEndPoint_B::~TAO_StreamEndPoint_B (void)
02744 {
02745 }
02746 
02747 // ----------------------------------------------------------------------
02748 // TAO_VDev
02749 // ----------------------------------------------------------------------
02750 
02751 TAO_VDev::TAO_VDev (void)
02752 {
02753   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
02754               "(%P|%t) TAO_VDev::TAO_VDev: created\n"));
02755 }
02756 
02757 // StreamCtrl will call this to give us a reference to itself, and to
02758 // our peer vdev..
02759 CORBA::Boolean
02760 TAO_VDev::set_peer (AVStreams::StreamCtrl_ptr the_ctrl,
02761                     AVStreams::VDev_ptr the_peer_dev,
02762                     AVStreams::streamQoS &the_qos,
02763                     const AVStreams::flowSpec &the_spec)
02764 {
02765   ACE_UNUSED_ARG (the_qos);
02766   ACE_UNUSED_ARG (the_spec);
02767 
02768   CORBA::Boolean result = 0;
02769   try
02770     {
02771       if (TAO_debug_level > 0)
02772         ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_VDev::set_peer: called\n"));
02773 
02774 
02775       CORBA::Any anyval;
02776       anyval <<= the_peer_dev;
02777       this->define_property ("Related_VDev",
02778                              anyval);
02779 
02780 
02781       this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (the_ctrl);
02782       this->peer_ = AVStreams::VDev::_duplicate (the_peer_dev);
02783 
02784       CORBA::Any_var anyptr;
02785       anyptr = this->peer_->get_property_value ("Related_MediaCtrl");
02786 
02787       CORBA::Object_ptr media_ctrl_obj = 0;
02788 
02789       anyptr.in () >>= CORBA::Any::to_object(media_ctrl_obj);
02790 
02791       result = this->set_media_ctrl (media_ctrl_obj);
02792     }
02793   catch (const CORBA::Exception& ex)
02794     {
02795       ex._tao_print_exception ("TAO_VDev::set_peer");
02796       return 0;
02797     }
02798   return result;
02799 }
02800 
02801 CORBA::Boolean
02802 TAO_VDev::set_media_ctrl (CORBA::Object_ptr media_ctrl)
02803 
02804 {
02805   //  since the media ctrl is not stored or used, delete it.
02806 
02807   CORBA::release( media_ctrl);
02808 
02809   return 1;
02810 }
02811 
02812 // Sets the multicast VDev peer.
02813 CORBA::Boolean
02814 TAO_VDev::set_Mcast_peer (AVStreams::StreamCtrl_ptr /* the_ctrl */,
02815                           AVStreams::MCastConfigIf_ptr mcast_peer,
02816                           AVStreams::streamQoS &/* the_qos */,
02817                           const AVStreams::flowSpec &/* the_spec */)
02818 {
02819   this->mcast_peer_ = AVStreams::MCastConfigIf::_duplicate (mcast_peer);
02820   return 1;
02821 }
02822 
02823 // applications should override this to handle configuration changes.
02824 void
02825 TAO_VDev::configure (const CosPropertyService::Property &/*the_config_mesg*/)
02826 {
02827 }
02828 
02829 // sets the media format used for the flowname as a property.
02830 void
02831 TAO_VDev::set_format (const char *flowName,
02832                       const char *format_name)
02833 {
02834   try
02835     {
02836       if (flowName == 0 || format_name == 0)
02837         ACE_ERROR ((LM_ERROR, "TAO_VDev::set_format: flowName or format_name is null\n"));
02838       char format_property [BUFSIZ];
02839       ACE_OS::sprintf (format_property, "%s_currFormat", flowName);
02840       CORBA::Any format;
02841       format <<= format_name;
02842       this->define_property (format_property,
02843                              format);
02844     }
02845   catch (const CORBA::Exception& ex)
02846     {
02847       ex._tao_print_exception ("TAO_VDev::set_format");
02848       return;
02849     }
02850   return;
02851 }
02852 
02853 // sets the device parameters for the flowname as a property.
02854 void
02855 TAO_VDev::set_dev_params (const char *flowName,
02856                           const CosPropertyService::Properties &new_params)
02857 {
02858   try
02859     {
02860       if (flowName == 0)
02861         ACE_ERROR ((LM_ERROR, "TAO_VDev::set_dev_params:flowName is null\n"));
02862       char devParams_property[BUFSIZ];
02863       ACE_OS::sprintf (devParams_property, "%s_devParams", flowName);
02864       CORBA::Any devParams;
02865       devParams <<= new_params;
02866       this->define_property (devParams_property,
02867                              devParams);
02868     }
02869   catch (const CORBA::Exception& ex)
02870     {
02871       ex._tao_print_exception ("TAO_VDev::set_dev_params");
02872       return;
02873     }
02874   return;
02875 }
02876 
02877 // QoS Modification should be handled by the application currently.
02878 CORBA::Boolean
02879 TAO_VDev::modify_QoS (AVStreams::streamQoS &the_qos,
02880                       const AVStreams::flowSpec &flowspec)
02881 {
02882   if (TAO_debug_level > 0)
02883   ACE_DEBUG ((LM_DEBUG,
02884              "TAO_VDev::modify_QoS\n"));
02885 
02886   if (flowspec.length () != 0)
02887     {
02888       TAO_Forward_FlowSpec_Entry entry;
02889       entry.parse (flowspec [0]);
02890       int direction = entry.direction ();
02891       if (direction == 0)
02892         {
02893           AVStreams::StreamEndPoint_A_ptr sep_a;
02894 
02895           CORBA::Any_ptr streamendpoint_a_any =
02896           this->get_property_value ("Related_StreamEndpoint");
02897 
02898           *streamendpoint_a_any >>= sep_a;
02899           if (sep_a != 0)
02900             {
02901               sep_a->modify_QoS (the_qos, flowspec);
02902             }
02903           else ACE_DEBUG ((LM_DEBUG,
02904                            "Stream EndPoint Not Found\n"));
02905         }
02906       else
02907         {
02908           AVStreams::StreamEndPoint_B_ptr sep_b;
02909 
02910           CORBA::Any_ptr streamendpoint_b_any =
02911           this->get_property_value ("Related_StreamEndpoint");
02912           *streamendpoint_b_any >>= sep_b;
02913           sep_b->modify_QoS (the_qos, flowspec);
02914         }
02915   }
02916   return 1;
02917 }
02918 
02919 TAO_VDev::~TAO_VDev (void)
02920 {
02921 }
02922 
02923 // ----------------------------------------------------------------------
02924 // TAO_MMDevice
02925 // ----------------------------------------------------------------------
02926 
02927 
02928 TAO_MMDevice::TAO_MMDevice (TAO_AV_Endpoint_Strategy *endpoint_strategy)
02929   : endpoint_strategy_ (endpoint_strategy),
02930     flow_count_ (0),
02931     flow_num_ (0),
02932     stream_ctrl_ (0)
02933 {
02934 }
02935 
02936 // create a streamctrl which is colocated with me, use that streamctrl
02937 // to bind the peer_device with me.
02938 AVStreams::StreamCtrl_ptr
02939 TAO_MMDevice::bind (AVStreams::MMDevice_ptr peer_device,
02940                     AVStreams::streamQoS &the_qos,
02941                     CORBA::Boolean_out is_met,
02942                     const AVStreams::flowSpec &the_spec)
02943 {
02944   AVStreams::StreamCtrl_ptr streamctrl (AVStreams::StreamCtrl::_nil ());
02945   try
02946     {
02947       ACE_UNUSED_ARG (is_met);
02948       ACE_NEW_RETURN (this->stream_ctrl_,
02949                       TAO_StreamCtrl,
02950                       0);
02951       AVStreams::MMDevice_var mmdevice = this->_this ();
02952       this->stream_ctrl_->bind_devs (peer_device,
02953                                      mmdevice.in (),
02954                                      the_qos,
02955                                      the_spec);
02956       streamctrl = this->stream_ctrl_->_this ();
02957     }
02958   catch (const CORBA::Exception& ex)
02959     {
02960       ex._tao_print_exception ("TAO_MMDevice::bind");
02961       return streamctrl;
02962     }
02963   return streamctrl;
02964 }
02965 
02966 // Multicast is not supported yet.
02967 AVStreams::StreamCtrl_ptr
02968 TAO_MMDevice::bind_mcast (AVStreams::MMDevice_ptr first_peer,
02969                           AVStreams::streamQoS &the_qos,
02970                           CORBA::Boolean_out is_met,
02971                           const AVStreams::flowSpec &the_spec)
02972 {
02973   ACE_UNUSED_ARG (first_peer);
02974   ACE_UNUSED_ARG (the_qos);
02975   ACE_UNUSED_ARG (is_met);
02976   ACE_UNUSED_ARG (the_spec);
02977 
02978   return 0;
02979 }
02980 
02981 AVStreams::StreamEndPoint_ptr
02982 TAO_MMDevice::create_A_B (MMDevice_Type type,
02983                           AVStreams::StreamCtrl_ptr streamctrl,
02984                           AVStreams::VDev_out the_vdev,
02985                           AVStreams::streamQoS &the_qos,
02986                           CORBA::Boolean_out met_qos,
02987                           char *&/*named_vdev*/,
02988                           const AVStreams::flowSpec &flow_spec)
02989 {
02990   AVStreams::StreamEndPoint_A_ptr sep_a (AVStreams::StreamEndPoint_A::_nil ());
02991   AVStreams::StreamEndPoint_B_ptr sep_b (AVStreams::StreamEndPoint_B::_nil ());
02992   AVStreams::StreamEndPoint_ptr sep (AVStreams::StreamEndPoint::_nil ());
02993   try
02994     {
02995       switch (type)
02996         {
02997         case MMDEVICE_A:
02998           {
02999             if (this->endpoint_strategy_->create_A (sep_a,
03000                                                     the_vdev) == -1)
03001               ACE_ERROR_RETURN ((LM_ERROR,
03002                                  "TAO_MMDevice::create_A_B (%P|%t) - "
03003                                  "error in create_A\n"),
03004                                 0);
03005             sep = sep_a;
03006           }
03007           break;
03008         case MMDEVICE_B:
03009           {
03010             if (this->endpoint_strategy_->create_B (sep_b,
03011                                                     the_vdev) == -1)
03012               ACE_ERROR_RETURN ((LM_ERROR,
03013                                  "TAO_MMDevice::create_A_B (%P|%t) - "
03014                                  "error in create_B\n"),
03015                                 0);
03016             sep = sep_b;
03017           }
03018           break;
03019         default:
03020           break;
03021         }
03022       if (this->fdev_map_.current_size () > 0)
03023         {
03024           TAO_AV_QoS qos (the_qos);
03025           // create flowendpoints from the FDevs.
03026           for (u_int i=0;i<flow_spec.length ();i++)
03027             {
03028               TAO_Forward_FlowSpec_Entry forward_entry;
03029               forward_entry.parse (flow_spec[i]);
03030               ACE_CString flow_key (forward_entry.flowname ());
03031               AVStreams::FDev_var flow_dev;
03032               AVStreams::FlowConnection_var flowconnection;
03033               try
03034                 {
03035                   // Get the flowconnection for this flow.
03036                   //static int blah = 0; if(blah == 1){blah=0; abort();}else{blah=1;}
03037                   CORBA::Object_var flowconnection_obj =
03038                     streamctrl->get_flow_connection (forward_entry.flowname ());
03039                   ACE_OS::printf("successfully called get_flow_connection\n");
03040                   if (!CORBA::is_nil (flowconnection_obj.in ()))
03041                     {
03042                       flowconnection = AVStreams::FlowConnection::_narrow (flowconnection_obj.in ());
03043                     }
03044                 }
03045               catch (const AVStreams::noSuchFlow&)
03046                 {
03047                           TAO_FlowConnection *flowConnection;
03048                           ACE_NEW_RETURN (flowConnection,
03049                                           TAO_FlowConnection,
03050                                           0);
03051                           flowconnection = flowConnection->_this ();
03052                           streamctrl->set_flow_connection (forward_entry.flowname(),
03053                                                      flowconnection.in ());
03054                 }
03055               catch (const CORBA::Exception& ex)
03056                 {
03057                   //if (TAO_debug_level >= 0)
03058                     ex._tao_print_exception (
03059                       "TAO_MMDevice::create_a::get_flow_connection");
03060                 }
03061 
03062               int result = this->fdev_map_.find (flow_key, flow_dev);
03063               if (result < 0)
03064                 ACE_ERROR_RETURN ((LM_ERROR, "(%N,%l) fdev_map::find failed\n"), 0);
03065 
03066               CORBA::String_var named_fdev;
03067               AVStreams::FlowEndPoint_var flow_endpoint;
03068               AVStreams::QoS flow_qos;
03069               result = qos.get_flow_qos (forward_entry.flowname (), flow_qos);
03070               if (result < 0)
03071                 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%N,%l) get_flow_qos failed for %s\n", forward_entry.flowname ()));
03072               switch (forward_entry.direction ())
03073                 {
03074                 case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
03075                   {
03076                     switch (type)
03077                       {
03078                       case MMDEVICE_A:
03079                         {
03080                           // In implies flow is from A to B and
03081                           // hence A is the producer for this flow and B is the consumer for this flow.
03082                           // We have to create a producer from the FDev for this flow.
03083                           flow_endpoint =
03084                             flow_dev->create_producer (flowconnection.in (),
03085                                                        flow_qos,
03086                                                        met_qos,
03087                                                        named_fdev.inout ());
03088                         }
03089                         break;
03090                       case MMDEVICE_B:
03091                         {
03092                           flow_endpoint =
03093                             flow_dev->create_consumer (flowconnection.in (),
03094                                                        flow_qos,
03095                                                        met_qos,
03096                                                        named_fdev.inout ());
03097                         }
03098                         break;
03099                       }
03100                   }
03101                   break;
03102                 case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
03103                   {
03104                     switch (type)
03105                       {
03106                       case MMDEVICE_A:
03107                         {
03108                           // OUT implies flow is from B to A and
03109                           // hence B is the producer for this flow and A is the consumer for this flow.
03110                           // We have to create a consumer from the FDev for this flow.
03111                           flow_endpoint =
03112                             flow_dev->create_consumer (flowconnection.in (),
03113                                                        flow_qos,
03114                                                        met_qos,
03115                                                        named_fdev.inout ());
03116                         }
03117                         break;
03118                       case MMDEVICE_B:
03119                         {
03120                           // In implies flow is from A to B and
03121                           // hence A is the producer for this flow and B is the consumer for this flow.
03122                           // We have to create a producer from the FDev for this flow.
03123                           flow_endpoint =
03124                             flow_dev->create_producer (flowconnection.in (),
03125                                                        flow_qos,
03126                                                        met_qos,
03127                                                        named_fdev.inout ());
03128                         }
03129                         break;
03130                       }
03131                   }
03132                   break;
03133                 default:
03134                   break;
03135                 }
03136               CORBA::Any flowname_any;
03137               flowname_any <<= forward_entry.flowname ();
03138               flow_endpoint->define_property ("FlowName", flowname_any);
03139               sep->add_fep (flow_endpoint.in ());
03140             }
03141         }
03142     }
03143   catch (const CORBA::Exception& ex)
03144     {
03145       ex._tao_print_exception ("TAO_MMDevice::create_A");
03146       return sep;
03147     }
03148   return sep;
03149 }
03150 
03151 AVStreams::StreamEndPoint_A_ptr
03152 TAO_MMDevice::create_A (AVStreams::StreamCtrl_ptr streamctrl,
03153                         AVStreams::VDev_out the_vdev,
03154                         AVStreams::streamQoS &stream_qos,
03155                         CORBA::Boolean_out met_qos,
03156                         char *&named_vdev,
03157                         const AVStreams::flowSpec &flow_spec)
03158 {
03159   AVStreams::StreamEndPoint_A_ptr sep_a = 0;
03160   AVStreams::StreamEndPoint_var sep;
03161   try
03162     {
03163       sep = this->create_A_B (MMDEVICE_A, streamctrl, the_vdev, stream_qos, met_qos, named_vdev, flow_spec);
03164       sep_a = AVStreams::StreamEndPoint_A::_narrow (sep.in());
03165 
03166       ACE_ASSERT( !CORBA::is_nil( sep_a ) );
03167     }
03168   catch (const CORBA::Exception& ex)
03169     {
03170       ex._tao_print_exception ("TAO_MMDevice::create_A");
03171       return sep_a;
03172     }
03173 
03174   return sep_a;
03175 }
03176 
03177 
03178 AVStreams::StreamEndPoint_B_ptr
03179 TAO_MMDevice::create_B (AVStreams::StreamCtrl_ptr streamctrl,
03180                         AVStreams::VDev_out the_vdev,
03181                         AVStreams::streamQoS &stream_qos,
03182                         CORBA::Boolean_out met_qos,
03183                         char *&named_vdev,
03184                         const AVStreams::flowSpec &flow_spec)
03185 {
03186   AVStreams::StreamEndPoint_B_ptr sep_b = AVStreams::StreamEndPoint_B::_nil ();
03187   AVStreams::StreamEndPoint_var sep;
03188 
03189   try
03190     {
03191       sep = this->create_A_B (MMDEVICE_B, streamctrl, the_vdev, stream_qos, met_qos, named_vdev, flow_spec);
03192       sep_b = AVStreams::StreamEndPoint_B::_narrow (sep.in());
03193 
03194       ACE_ASSERT ( !CORBA::is_nil( sep_b ) );
03195     }
03196   catch (const CORBA::Exception& ex)
03197     {
03198       ex._tao_print_exception ("TAO_MMDevice::create_B");
03199       return sep_b;
03200     }
03201   return sep_b;
03202 }
03203 
03204 
03205 // destroys the streamendpoint and the Vdev.
03206 void
03207 TAO_MMDevice::destroy (AVStreams::StreamEndPoint_ptr /* the_ep */,
03208                        const char * /* vdev_name */)
03209 {
03210   // Remove self from POA.  Because of reference counting, the POA
03211   // will automatically delete the servant when all pending requests
03212   // on this servant are complete.
03213   int result = TAO_AV_Core::deactivate_servant (this);
03214   if (result < 0)
03215     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_MMDevice::destroy failed\n"));
03216 }
03217 
03218 char *
03219 TAO_MMDevice::add_fdev_i (AVStreams::FDev_ptr fdev)
03220 {
03221   char* tmp;
03222   ACE_NEW_RETURN (tmp,
03223                   char[64],
03224                   0);
03225   CORBA::String_var flow_name = tmp;
03226 
03227   try
03228     {
03229       // exception implies the flow name is not defined and is system
03230       // generated.
03231       ACE_OS::sprintf (tmp, "flow%d", flow_num_++);
03232       CORBA::Any flowname_any;
03233       flowname_any <<= flow_name.in ();
03234       fdev->define_property ("Flow", flowname_any);
03235     }
03236   catch (const CORBA::Exception& ex)
03237     {
03238       ex._tao_print_exception ("TAO_MMDevice::add_fdev");
03239       return 0;
03240     }
03241   return flow_name._retn ();
03242 }
03243 
03244 // Adds the fdev object to the MMDevice.
03245 char *
03246 TAO_MMDevice::add_fdev (CORBA::Object_ptr fdev_obj)
03247 {
03248   CORBA::String_var flow_name;
03249   AVStreams::FDev_var fdev;
03250   try
03251     {
03252       CORBA::Any_ptr flow_name_any;
03253       fdev = AVStreams::FDev::_narrow (fdev_obj);
03254 
03255       if (CORBA::is_nil (fdev.in ()))
03256           return 0;
03257 
03258 
03259       flow_name_any = fdev->get_property_value ("Flow");
03260 
03261       const char *tmp;
03262       *flow_name_any >>= tmp;
03263       flow_name = CORBA::string_dup (tmp);
03264     }
03265   catch (const CORBA::Exception&)
03266     {
03267       flow_name =
03268         this->add_fdev_i (fdev.in ());
03269     }
03270 
03271 
03272   // Add it to the sequence of flowNames supported.
03273   // put the flowname and the fdev in a hashtable.
03274   ACE_CString fdev_name_key ( flow_name.in () );
03275 
03276 
03277   if ( (this->fdev_map_.bind (fdev_name_key, fdev )) != 0)
03278     throw AVStreams::streamOpFailed ();
03279   // increment the flow count.
03280   this->flow_count_++;
03281   this->flows_.length (this->flow_count_);
03282   this->flows_ [this->flow_count_-1] = flow_name;
03283   // define/modify the "Flows" property.
03284   CORBA::Any flows_any;
03285   flows_any <<= this->flows_;
03286   try
03287     {
03288       this->define_property ("Flows",
03289                              flows_any);
03290     }
03291   catch (const CORBA::Exception& ex)
03292     {
03293       ex._tao_print_exception ("TAO_MMDevice::add_fdev");
03294       return 0;
03295     }
03296   return flow_name._retn ();
03297 }
03298 
03299 // Gets the FDev object associated with this flow.
03300 CORBA::Object_ptr
03301 TAO_MMDevice::get_fdev (const char *flow_name)
03302 {
03303 
03304   ACE_CString fdev_name_key (flow_name);
03305   AVStreams::FDev_var fdev_entry;
03306   if (this->fdev_map_.find (fdev_name_key, fdev_entry) == 0)
03307     return fdev_entry._retn() ;
03308   return 0;
03309 }
03310 
03311 // Removes the fdev from this MMDevice.
03312 void
03313 TAO_MMDevice::remove_fdev (const char *flow_name)
03314 {
03315   try
03316     {
03317       ACE_CString fdev_name_key (flow_name);
03318       AVStreams::FDev_var fdev_entry;
03319       // Remove the fep from the hash table.
03320       if (this->fdev_map_.unbind (fdev_name_key, fdev_entry)!= 0)
03321         throw AVStreams::streamOpFailed ();
03322 
03323       AVStreams::flowSpec new_flows (this->flows_.length ());
03324       for (u_int i=0, j=0 ; i <this->flows_.length (); i++)
03325         if (ACE_OS::strcmp (flow_name, this->flows_[i]) != 0)
03326           new_flows[j++] = this->flows_[i];
03327 
03328       CORBA::Any flows;
03329       flows <<= new_flows;
03330       this->flows_ = new_flows;
03331       this->define_property ("Flows",
03332                              flows);
03333     }
03334   catch (const CORBA::Exception& ex)
03335     {
03336       ex._tao_print_exception ("TAO_MMDevice::remove_fdev");
03337     }
03338 }
03339 
03340 // destructor.
03341 TAO_MMDevice::~TAO_MMDevice (void)
03342 {
03343   delete this->stream_ctrl_;
03344 }
03345 
03346 //------------------------------------------------------------------
03347 // TAO_FlowConnection
03348 //------------------------------------------------------------------
03349 
03350 // default constructor.
03351 TAO_FlowConnection::TAO_FlowConnection (void)
03352   :fp_name_ (CORBA::string_dup ("")),
03353    ip_multicast_ (0)
03354 {
03355 }
03356 
03357 // int
03358 // TAO_FlowConnection::set_mcast_addr (ACE_UINT32 mcast_addr, u_short mcast_port)
03359 // {
03360 //   this->mcast_addr_ = mcast_addr;
03361 //   this->mcast_port_ = mcast_port;
03362 //   return 0;
03363 // }
03364 
03365 int
03366 TAO_FlowConnection::set_mcast_addr (ACE_CString mcast_addr, u_short mcast_port)
03367 {
03368   this->mcast_addr_ = mcast_addr;
03369   this->mcast_port_ = mcast_port;
03370   return 0;
03371 }
03372 
03373 void
03374 TAO_FlowConnection::set_protocol (const char *protocol)
03375 {
03376   this->protocol_ = protocol;
03377 }
03378 
03379 // stop this flow.
03380 void
03381 TAO_FlowConnection::stop (void)
03382 {
03383   try
03384     {
03385       FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03386         ();
03387       for (FlowProducer_SetItor producer_end =
03388              this->flow_producer_set_.end ();
03389            producer_begin != producer_end; ++producer_begin)
03390         {
03391           (*producer_begin)->stop ();
03392         }
03393       FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03394         ();
03395       for (FlowConsumer_SetItor consumer_end =
03396              this->flow_consumer_set_.end ();
03397            consumer_begin != consumer_end; ++consumer_begin)
03398         {
03399           (*consumer_begin)->stop ();
03400         }
03401     }
03402  catch(AVStreams::noSuchFlow& ex)
03403     {
03404     throw; //ACS mod 2007-08
03405     }
03406   catch (const CORBA::Exception& ex)
03407     {
03408     ex._tao_print_exception ("TAO_FlowConnection::stop");
03409     throw;  //ACS mod 2007-08
03410     }
03411   catch(...)
03412     {
03413       printf ("TAO_FlowConnection::stop - unknown exception\n");
03414     }
03415 }
03416 
03417 // start this flow.
03418 void
03419 TAO_FlowConnection::start (void)
03420 {
03421   try
03422     {
03423       FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03424         ();
03425       for (FlowConsumer_SetItor consumer_end =
03426              this->flow_consumer_set_.end ();
03427            consumer_begin != consumer_end; ++consumer_begin)
03428         {
03429           (*consumer_begin)->start ();
03430         }
03431       FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03432         ();
03433       for (FlowProducer_SetItor producer_end =
03434              this->flow_producer_set_.end ();
03435            producer_begin != producer_end; ++producer_begin)
03436         {
03437           (*producer_begin)->start ();
03438         }
03439     }
03440  catch(AVStreams::noSuchFlow& ex)
03441     {
03442     throw; //ACS mod 2007-08
03443     }
03444   catch (const CORBA::Exception& ex)
03445     {
03446     ex._tao_print_exception ("TAO_FlowConnection::start");
03447     throw;  //ACS mod 2007-08
03448     }
03449   catch(...)
03450     {
03451       printf ("TAO_FlowConnection::start - unknown exception\n");
03452     }
03453 }
03454 
03455 // destroy this flow.
03456 void
03457 TAO_FlowConnection::destroy (void)
03458 {
03459   try
03460     {
03461       FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03462         ();
03463       for (FlowProducer_SetItor producer_end =
03464              this->flow_producer_set_.end ();
03465            producer_begin != producer_end; ++producer_begin)
03466         {
03467           (*producer_begin)->destroy ();
03468         }
03469       FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03470         ();
03471       for (FlowConsumer_SetItor consumer_end =
03472              this->flow_consumer_set_.end ();
03473            consumer_begin != consumer_end; ++consumer_begin)
03474         {
03475           (*consumer_begin)->destroy ();
03476         }
03477     }
03478   catch (const CORBA::Exception& ex)
03479     {
03480       ex._tao_print_exception ("TAO_FlowConnection::destroy");
03481       return;
03482     }
03483   int result = TAO_AV_Core::deactivate_servant (this);
03484   if (result < 0)
03485     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::destroy failed\n"));
03486 }
03487 
03488 // modify the QoS for this flow.
03489 CORBA::Boolean
03490 TAO_FlowConnection::modify_QoS (AVStreams::QoS & new_qos)
03491 {
03492   ACE_UNUSED_ARG (new_qos);
03493   return 0;
03494 }
03495 
03496 // use the specified flow protocol for this flow.
03497 CORBA::Boolean
03498 TAO_FlowConnection::use_flow_protocol (const char * fp_name,
03499                                        const CORBA::Any & fp_settings)
03500 {
03501   this->fp_name_ = fp_name;
03502   this->fp_settings_ = fp_settings;
03503   FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03504     ();
03505   for (FlowProducer_SetItor producer_end =
03506          this->flow_producer_set_.end ();
03507        producer_begin != producer_end; ++producer_begin)
03508     {
03509       (*producer_begin)->use_flow_protocol
03510         (fp_name, fp_settings);
03511     }
03512   FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03513     ();
03514   for (FlowConsumer_SetItor consumer_end =
03515          this->flow_consumer_set_.end ();
03516        consumer_begin != consumer_end; ++consumer_begin)
03517     {
03518       (*consumer_begin)->use_flow_protocol
03519         (fp_name, fp_settings);
03520     }
03521   return 1;
03522 }
03523 
03524 void
03525 TAO_FlowConnection::push_event (const AVStreams::streamEvent & the_event)
03526 {
03527   ACE_UNUSED_ARG (the_event);
03528 }
03529 
03530 CORBA::Boolean
03531 TAO_FlowConnection::connect_devs (AVStreams::FDev_ptr a_party,
03532                                   AVStreams::FDev_ptr b_party,
03533                                   AVStreams::QoS & flow_qos)
03534 {
03535   CORBA::Boolean result = 0;
03536   try
03537     {
03538       AVStreams::FlowConnection_var flowconnection = this->_this ();
03539       CORBA::Boolean met_qos;
03540       CORBA::String_var named_fdev ((const char *)"");
03541       AVStreams::FlowProducer_var producer =
03542         a_party->create_producer (flowconnection.in (),
03543                                   flow_qos,
03544                                   met_qos,
03545                                   named_fdev.inout ());
03546       AVStreams::FlowConsumer_var consumer =
03547         b_party->create_consumer (flowconnection.in (),
03548                                   flow_qos,
03549                                   met_qos,
03550                                   named_fdev.inout ());
03551       result = this->connect (producer.in (),
03552                               consumer.in (),
03553                               flow_qos);
03554     }
03555   catch (const CORBA::Exception& ex)
03556     {
03557       ex._tao_print_exception (
03558         "TAO_FlowConnection::connect_devs");
03559       return 0;
03560     }
03561   return result;
03562 }
03563 
03564 // connect the producer and the consumer
03565 CORBA::Boolean
03566 TAO_FlowConnection::connect (AVStreams::FlowProducer_ptr producer,
03567                              AVStreams::FlowConsumer_ptr consumer,
03568                              AVStreams::QoS & the_qos)
03569 {
03570   try
03571     {
03572 
03573       AVStreams::FlowProducer_ptr flow_producer =
03574         AVStreams::FlowProducer::_duplicate (producer);
03575       AVStreams::FlowConsumer_ptr flow_consumer =
03576         AVStreams::FlowConsumer::_duplicate (consumer);
03577 
03578       this->flow_producer_set_.insert (flow_producer);
03579       this->flow_consumer_set_.insert (flow_consumer);
03580       AVStreams::FlowConnection_var flowconnection =
03581         this->_this ();
03582 
03583       flow_producer->set_peer (flowconnection.in (),
03584                                flow_consumer,
03585                                the_qos);
03586 
03587       flow_consumer->set_peer (flowconnection.in (),
03588                                flow_producer,
03589                                the_qos);
03590 
03591       char *consumer_address =
03592         flow_consumer->go_to_listen (the_qos,
03593                                      0, // false for is_mcast
03594                                      flow_producer,
03595                                      this->fp_name_.inout ());
03596 
03597       if (ACE_OS::strcmp (consumer_address, "") == 0)
03598         {
03599           // Consumer is not willing to listen, so try the producer.
03600           consumer_address = flow_producer->go_to_listen (the_qos,
03601                                                           0, // false for is_mcast
03602                                                           flow_consumer,
03603                                                           this->fp_name_.inout ());
03604           flow_consumer->connect_to_peer (the_qos,
03605                                           consumer_address,
03606                                           this->fp_name_.inout ());
03607           // @@ Naga: We have to find means to set the reverse channel for the producer.
03608           // Its broken in the point-to_point case for UDP.
03609         }
03610       else
03611         {
03612           if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::connect_to_peer addres: %s", consumer_address));
03613           flow_producer->connect_to_peer (the_qos,
03614                                           consumer_address,
03615                                           this->fp_name_.inout ());
03616         }
03617     }
03618   catch (const CORBA::Exception& ex)
03619     {
03620       ex._tao_print_exception ("TAO_FlowConnection::connect");
03621     }
03622   return 1;
03623 }
03624 
03625 
03626 CORBA::Boolean
03627 TAO_FlowConnection::disconnect (void)
03628 {
03629   return  0;
03630 }
03631 
03632 CORBA::Boolean
03633 TAO_FlowConnection::add_producer (AVStreams::FlowProducer_ptr producer,
03634                                   AVStreams::QoS & the_qos)
03635 {
03636   try
03637     {
03638           AVStreams::FlowProducer_ptr flow_producer =
03639             AVStreams::FlowProducer::_duplicate (producer);
03640           // @@Naga:Sometimes the same producer could be added with a different object reference.
03641           // There's no portable way of comparing obj refs. but we have to do this till we find
03642           // a permanent solution.For eg. 2 different flowproducers for the same flow in a
03643           // Multipoint to Multipoint binding will have the same flowname and hence cannot be
03644           // used for resolving ties.
03645           FlowProducer_SetItor begin = this->flow_producer_set_.begin ();
03646           FlowProducer_SetItor end = this->flow_producer_set_.end ();
03647           for (; begin != end; ++begin)
03648             {
03649               if ((*begin)->_is_equivalent (producer))
03650               // producer exists in the set, a duplicate.
03651               ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1);
03652             }
03653           // We need to check the return value of the insert into the flow producer
03654           // set, since multiconnect could be called many times which will lead to
03655           // a call to add_producer every time a sink is added. If the producer is already
03656           // present in our list we just return immediately.
03657           int result = this->flow_producer_set_.insert (flow_producer);
03658           if (result == 1)
03659             {
03660               // producer exists in the set, a duplicate.
03661               ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1);
03662             }
03663           CORBA::Boolean met_qos;
03664           char mcast_address[BUFSIZ];
03665           if (this->producer_address_.in () == 0)
03666             {
03667               ACE_INET_Addr mcast_addr;
03668               mcast_addr.set (this->mcast_port_,
03669                               this->mcast_addr_.c_str ()
03670                               );
03671 
03672               char buf [BUFSIZ];
03673               mcast_addr.addr_to_string (buf, BUFSIZ);
03674               ACE_OS::sprintf (mcast_address, "%s=%s", this->protocol_.in (), buf);
03675             }
03676           else
03677             {
03678               ACE_OS::strcpy (mcast_address, this->producer_address_.in ());
03679             }
03680           char *address = flow_producer->connect_mcast (the_qos,
03681                                                         met_qos,
03682                                                         mcast_address,
03683                                                         this->fp_name_.in ());
03684 
03685           if (this->producer_address_.in () == 0)
03686             {
03687               TAO_Forward_FlowSpec_Entry entry ("", "", "", "", address);
03688               if (entry.address () != 0)
03689                 {
03690                   // Internet multicasting is in use.
03691                   this->producer_address_ = address;
03692                 }
03693               else
03694                 {
03695                   // ATM Multicasting is in use.
03696                   this->ip_multicast_ = 0;
03697                 }
03698             }
03699           // set the multicast peer.
03700           if (CORBA::is_nil (this->mcastconfigif_.in ()))
03701             {
03702               ACE_NEW_RETURN (this->mcastconfigif_i_,
03703                               TAO_MCastConfigIf,
03704                               0);
03705               this->mcastconfigif_ = this->mcastconfigif_i_->_this ();
03706             }
03707           AVStreams::FlowConnection_var flowconnection = this->_this ();
03708           flow_producer->set_Mcast_peer (flowconnection.in (),
03709                                          this->mcastconfigif_.in (),
03710                                          the_qos);
03711     }
03712   catch (const CORBA::Exception& ex)
03713     {
03714       ex._tao_print_exception (
03715         "TAO_FlowConnection::add_producer");
03716       return 0;
03717     }
03718   return 1;
03719 }
03720 
03721 CORBA::Boolean
03722 TAO_FlowConnection::add_consumer (AVStreams::FlowConsumer_ptr consumer,
03723                                   AVStreams::QoS & the_qos)
03724 {
03725   try
03726     {
03727       AVStreams::FlowConsumer_ptr flow_consumer =
03728         AVStreams::FlowConsumer::_duplicate (consumer);
03729       FlowConsumer_SetItor begin = this->flow_consumer_set_.begin ();
03730       FlowConsumer_SetItor end = this->flow_consumer_set_.end ();
03731       for (; begin != end; ++begin)
03732         {
03733           if ((*begin)->_is_equivalent (consumer))
03734             // Consumer exists in the set, a duplicate.
03735             ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_Consumer: Consumer already exists\n"), 1);
03736         }
03737       int result = this->flow_consumer_set_.insert (flow_consumer);
03738       if (result == 1)
03739         {
03740           // consumer exists in the set, a duplicate.
03741           ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_consumer: consumer already exists\n"), 1);
03742         }
03743 
03744       FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin ();
03745       // @@Lets take that the first entry as the only producer. We're
03746       // not sure if we can have multiple flow producers in a
03747       // flowconnection. We can have multiple producer in the MtM binding,
03748       // in which case the first producer that gets added is the leader.
03749       AVStreams::FlowProducer_ptr flow_producer = (*producer_begin);
03750 
03751       AVStreams::protocolSpec protocols (1);
03752       protocols.length (1);
03753       protocols [0] = CORBA::string_dup (this->producer_address_.in ());
03754 
03755       if (!this->ip_multicast_)
03756         {
03757           flow_consumer->set_protocol_restriction (protocols);
03758           char * address =
03759             flow_consumer->go_to_listen (the_qos,
03760                                          1,
03761                                          flow_producer,
03762                                          this->fp_name_.inout ());
03763           CORBA::Boolean is_met;
03764           flow_producer->connect_mcast (the_qos,
03765                                         is_met,
03766                                         address,
03767                                         this->fp_name_.inout ());
03768         }
03769        else
03770         {
03771           // The spec says go_to_listen is called with the multicast
03772           // address returned from the connect_mcast call called
03773           // during add_producer. But go_to_listen doesn't have a
03774           // address parameter. I guess it should be connect_to_peer.
03775           // IP Multicasting.
03776           flow_consumer->connect_to_peer (the_qos,
03777                                           this->producer_address_.in (),
03778                                           this->fp_name_.inout ());
03779 
03780           //  char * address =
03781           //             flow_consumer->go_to_listen (the_qos,
03782           //                                          1,
03783           //                                          flow_producer,
03784           //                                          this->fp_name_.inout ()
03785           //);
03786 
03787         }
03788       if (CORBA::is_nil (this->mcastconfigif_.in ()))
03789         ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowConnection::add_consumer: first add a producer and then a consumer\n"), 0);
03790       // @@ Is this the right place to do set_peer?
03791       AVStreams::flowSpec flow_spec;
03792       AVStreams::streamQoS stream_qos (1);
03793       stream_qos.length (1);
03794       stream_qos [0] = the_qos;
03795       this->mcastconfigif_->set_peer (flow_consumer,
03796                                       stream_qos,
03797                                       flow_spec);
03798     }
03799   catch (const CORBA::Exception& ex)
03800     {
03801       ex._tao_print_exception (
03802         "TAO_FlowConnection::add_consumer");
03803       return 0;
03804     }
03805   return 1;
03806 }
03807 
03808 CORBA::Boolean
03809 TAO_FlowConnection::drop (AVStreams::FlowEndPoint_ptr target)
03810 {
03811   ACE_UNUSED_ARG (target);
03812   return 0;
03813 }
03814 
03815 // -----------------------------------------------------------------
03816 // TAO_FlowEndPoint
03817 // -----------------------------------------------------------------
03818 
03819 //default constructor.
03820 TAO_FlowEndPoint::TAO_FlowEndPoint (void)
03821   :lock_ (0)
03822 {
03823 }
03824 
03825 TAO_FlowEndPoint::TAO_FlowEndPoint (const char *flowname,
03826                                     AVStreams::protocolSpec &protocols,
03827                                     const char *format)
03828 {
03829   this->open (flowname, protocols, format);
03830 }
03831 
03832 void
03833 TAO_FlowEndPoint::set_flow_handler (const char * /*flowname*/,
03834                                     TAO_AV_Flow_Handler * /*handler*/)
03835 {
03836 }
03837 
03838 int
03839 TAO_FlowEndPoint::open (const char *flowname,
03840                         AVStreams::protocolSpec &protocols,
03841                         const char *format)
03842 {
03843   this->flowname_ = flowname;
03844   this->format_ = format;
03845 
03846   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowEndPoint::open\n"));
03847   try
03848     {
03849       CORBA::Any flowname_any;
03850       flowname_any <<= flowname;
03851       this->define_property ("FlowName",
03852                              flowname_any);
03853       this->set_format (format);
03854       this->protocol_addresses_ = protocols;
03855       AVStreams::protocolSpec protocol_spec (protocols.length ());
03856       protocol_spec.length (protocols.length ());
03857       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%N:%l\n"));
03858       for (u_int i=0;i<protocols.length ();i++)
03859         {
03860           CORBA::String_var address = CORBA::string_dup (protocols [i]);
03861           TAO_Forward_FlowSpec_Entry entry ("", "", "", "", address.in ());
03862           protocol_spec [i] = CORBA::string_dup (entry.carrier_protocol_str ());
03863           if (TAO_debug_level > 0)
03864             ACE_DEBUG ((LM_DEBUG,
03865                         "[%s]\n",
03866                         static_cast<char const*>(protocol_spec[i])));
03867         }
03868       this->set_protocol_restriction (protocol_spec);
03869     }
03870   catch (const CORBA::Exception& ex)
03871     {
03872       ex._tao_print_exception ("TAO_FlowEndPoint::open");
03873       return -1;
03874     }
03875   return 0;
03876 }
03877 
03878 
03879 int
03880 TAO_FlowEndPoint::set_flowname (const char *flowname)
03881 {
03882   this->flowname_ = flowname;
03883   return 0;
03884 }
03885 
03886 // used by one flowconnection so that multiple connections cant use
03887 // the same flowendpoint.
03888 CORBA::Boolean
03889 TAO_FlowEndPoint::lock (void)
03890 {
03891   // lock the current flowendpoint
03892 
03893   if (this->lock_)
03894     return 0;
03895   this->lock_ = 1;
03896   return 1;
03897 }
03898 
03899 // unlocks the flowendpoint , becomes free to be used in another flow.
03900 void
03901 TAO_FlowEndPoint::unlock (void)
03902 {
03903   this->lock_ = 0;
03904 }
03905 
03906 
03907 void
03908 TAO_FlowEndPoint::destroy (void)
03909 {
03910   int result = TAO_AV_Core::deactivate_servant (this);
03911   if (result < 0)
03912     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::destroy failed\n"));
03913   TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
03914   for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
03915        begin != end; ++begin)
03916     (*begin)->protocol_object ()->destroy ();
03917 }
03918 
03919 AVStreams::StreamEndPoint_ptr
03920 TAO_FlowEndPoint::related_sep (void)
03921 {
03922 
03923   return AVStreams::StreamEndPoint::_duplicate (this->related_sep_.in ());
03924 }
03925 
03926 void
03927 TAO_FlowEndPoint::related_sep (AVStreams::StreamEndPoint_ptr related_sep)
03928 {
03929   this->related_sep_ = AVStreams::StreamEndPoint::_duplicate (related_sep);
03930 }
03931 
03932 AVStreams::FlowConnection_ptr
03933 TAO_FlowEndPoint::related_flow_connection (void)
03934 {
03935   return AVStreams::FlowConnection::_duplicate (this->related_flow_connection_.in ());
03936 }
03937 
03938 void
03939 TAO_FlowEndPoint::related_flow_connection (AVStreams::FlowConnection_ptr related_flow_connection)
03940 {
03941   this->related_flow_connection_ = AVStreams::FlowConnection::_duplicate (related_flow_connection);
03942 }
03943 
03944 // returns the connected peer for this flow
03945 AVStreams::FlowEndPoint_ptr
03946 TAO_FlowEndPoint::get_connected_fep (void)
03947 {
03948   return AVStreams::FlowEndPoint::_duplicate (this->peer_fep_.in ());
03949 }
03950 
03951 CORBA::Boolean
03952 TAO_FlowEndPoint::use_flow_protocol (const char * fp_name,
03953                                      const CORBA::Any &)
03954 {
03955   try
03956     {
03957       // Define the property called FlowProtocol
03958       CORBA::Any flowname_property;
03959       flowname_property <<= fp_name;
03960       this->define_property ("FlowProtocol",
03961                              flowname_property);
03962     }
03963   catch (const CORBA::Exception& ex)
03964     {
03965       ex._tao_print_exception (
03966         "TAO_FlowEndPoint::use_flow_protocol");
03967       return 0;
03968     }
03969   return 1;
03970 }
03971 
03972 void
03973 TAO_FlowEndPoint::set_format (const char * format)
03974 {
03975   this->format_ = format;
03976   try
03977     {
03978       // make this a property so that is_fep_compatible can query this and
03979       // check if 2 flowendpoints are compatible.
03980       CORBA::Any format_val;
03981       format_val <<= format;
03982       this->define_property ("Format",
03983                              format_val);
03984     }
03985   catch (const CORBA::Exception& ex)
03986     {
03987       ex._tao_print_exception ("TAO_FlowEndpoint::set_format");
03988     }
03989 }
03990 
03991 void
03992 TAO_FlowEndPoint::set_dev_params (const CosPropertyService::Properties & new_settings)
03993 {
03994   this->dev_params_ = new_settings;
03995   try
03996     {
03997       CORBA::Any DevParams_property;
03998       DevParams_property <<= new_settings;
03999       this->define_property ("DevParams",
04000                              DevParams_property);
04001     }
04002   catch (const CORBA::Exception& ex)
04003     {
04004       ex._tao_print_exception (
04005         "TAO_FlowEndPoint::set_dev_params");
04006     }
04007 }
04008 
04009 void
04010 TAO_FlowEndPoint::set_protocol_restriction (const AVStreams::protocolSpec & protocols)
04011 {
04012   try
04013     {
04014       u_int i = 0;
04015       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%N:%l\n"));
04016       for (i=0;i<protocols.length ();i++)
04017         {
04018           const char *protocol = (protocols)[i];
04019           if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%s\n", protocol));
04020         }
04021       CORBA::Any AvailableProtocols_property;
04022       AvailableProtocols_property <<= protocols;
04023       this->define_property ("AvailableProtocols",
04024                              AvailableProtocols_property);
04025       AVStreams::protocolSpec *temp_spec;
04026       CORBA::Any_var temp_any = this->get_property_value ("AvailableProtocols");
04027       temp_any.in () >>= temp_spec;
04028       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%N:%l\n"));
04029       for (i=0;i<temp_spec->length ();i++)
04030         {
04031           const char *protocol = (*temp_spec)[i];
04032           if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%s\n", protocol));
04033         }
04034       this->protocols_ = protocols;
04035     }
04036   catch (const CORBA::Exception& ex)
04037     {
04038       ex._tao_print_exception (
04039         "TAO_FlowEndpoint::set_protocol_restriction");
04040     }
04041 }
04042 
04043 CORBA::Boolean
04044 TAO_FlowEndPoint::is_fep_compatible (AVStreams::FlowEndPoint_ptr peer_fep)
04045 {
04046   const char *exception_message = "";
04047   try
04048     {
04049       // check whether the passed flowendpoint is compatible with this flowendpoint.
04050       // should we check for the availableFormats and choose one format.
04051       // get my format value
04052       CORBA::Any_var format_ptr;
04053       CORBA::String_var my_format, peer_format;
04054 
04055       exception_message = "TAO_FlowEndPoint::is_fep_compatible - Format";
04056       format_ptr = this->get_property_value ("Format");
04057 
04058       const char *temp_format;
04059       format_ptr.in () >>= temp_format;
04060       my_format = CORBA::string_dup (temp_format);
04061       // get my peer's format value
04062       exception_message = "TAO_FlowEndPoint::is_fep_compatible - Format[2]";
04063       format_ptr = peer_fep->get_property_value ("Format");
04064       format_ptr.in () >>= temp_format;
04065       peer_format = CORBA::string_dup (temp_format);
04066       if (ACE_OS::strcmp (my_format.in (),
04067                           peer_format.in ()) != 0)
04068         return 0;
04069 
04070       // since formats are same, check for a common protocol
04071       CORBA::Any_var AvailableProtocols_ptr;
04072       AVStreams::protocolSpec my_protocol_spec, peer_protocol_spec;
04073       AVStreams::protocolSpec *temp_protocols;;
04074 
04075       exception_message =
04076         "TAO_FlowEndPoint::is_fep_compatible - AvailableProtocols";
04077       AvailableProtocols_ptr = this->get_property_value ("AvailableProtocols");
04078       AvailableProtocols_ptr.in () >>= temp_protocols;
04079       my_protocol_spec = *temp_protocols;
04080 
04081       exception_message =
04082         "TAO_FlowEndPoint::is_fep_compatible - AvailableProtocols[2]";
04083       AvailableProtocols_ptr = peer_fep->get_property_value ("AvailableProtocols");
04084       AvailableProtocols_ptr.in () >>= temp_protocols;
04085       peer_protocol_spec = *temp_protocols;
04086 
04087       int protocol_match = 0;
04088       for (u_int i=0;i<my_protocol_spec.length ();i++)
04089         {
04090           CORBA::String_var my_protocol_string;
04091           for (u_int j=0;j<peer_protocol_spec.length ();j++)
04092             {
04093               CORBA::String_var peer_protocol_string;
04094               my_protocol_string = CORBA::string_dup (my_protocol_spec[i]);
04095               peer_protocol_string = CORBA::string_dup (peer_protocol_spec[j]);
04096               if (ACE_OS::strcmp (my_protocol_string.in (), peer_protocol_string.in ()) == 0)
04097                 {
04098                   protocol_match = 1;
04099                   break;
04100                 }
04101             }
04102           if (protocol_match)
04103             break;
04104         }
04105       if (!protocol_match)
04106         return 0;
04107     }
04108   catch (const CosPropertyService::PropertyNotFound& nf)
04109     {
04110       nf._tao_print_exception (exception_message);
04111     }
04112   catch (const CORBA::Exception& ex)
04113     {
04114       ex._tao_print_exception ("TAO_FlowEndPoint::is_fep_compatible");
04115       return 0;
04116     }
04117   return 1;
04118 }
04119 
04120 CORBA::Boolean
04121 TAO_FlowEndPoint::set_peer (AVStreams::FlowConnection_ptr /* the_fc */,
04122                             AVStreams::FlowEndPoint_ptr the_peer_fep,
04123                             AVStreams::QoS & /* the_qos */)
04124 {
04125   this->peer_fep_ =
04126     AVStreams::FlowEndPoint::_duplicate (the_peer_fep);
04127   return 1;
04128 }
04129 
04130 CORBA::Boolean
04131 TAO_FlowEndPoint::set_Mcast_peer (AVStreams::FlowConnection_ptr /* the_fc */,
04132                                   AVStreams::MCastConfigIf_ptr mcast_peer,
04133                                   AVStreams::QoS & /* the_qos */)
04134 {
04135   this->mcast_peer_ = AVStreams::MCastConfigIf::_duplicate (mcast_peer);
04136   return 0;
04137 }
04138 
04139 char *
04140 TAO_FlowEndPoint::go_to_listen_i (TAO_FlowSpec_Entry::Role role,
04141                                   AVStreams::QoS & /*the_qos*/,
04142                                   CORBA::Boolean /*is_mcast*/,
04143                                   AVStreams::FlowEndPoint_ptr peer_fep,
04144                                   char *& flowProtocol)
04145 {
04146   char direction [BUFSIZ];
04147   switch (role)
04148     {
04149     case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
04150       ACE_OS::strcpy (direction, "IN");
04151       break;
04152     case TAO_FlowSpec_Entry::TAO_AV_CONSUMER:
04153       ACE_OS::strcpy (direction, "OUT");
04154       break;
04155     default:
04156       break;
04157     }
04158   AVStreams::protocolSpec my_protocol_spec, peer_protocol_spec;
04159   AVStreams::protocolSpec *temp_protocols;
04160   CORBA::Any_var AvailableProtocols_ptr =
04161     peer_fep->get_property_value ("AvailableProtocols");
04162   AvailableProtocols_ptr.in () >>= temp_protocols;
04163   peer_protocol_spec = *temp_protocols;
04164   AvailableProtocols_ptr =
04165     this->get_property_value ("AvailableProtocols");
04166   AvailableProtocols_ptr.in () >>= temp_protocols;
04167   my_protocol_spec = *temp_protocols;
04168   int protocol_match = 0;
04169   CORBA::String_var listen_protocol;
04170   u_int i =0;
04171   for (i=0;i<my_protocol_spec.length ();i++)
04172     {
04173       CORBA::String_var my_protocol_string;
04174       for (u_int j=0;j<peer_protocol_spec.length ();j++)
04175         {
04176           CORBA::String_var peer_protocol_string;
04177           my_protocol_string = CORBA::string_dup (my_protocol_spec[i]);
04178           peer_protocol_string = CORBA::string_dup (peer_protocol_spec[j]);
04179           if (ACE_OS::strcmp (my_protocol_string.in (), peer_protocol_string.in ()) == 0)
04180             {
04181               listen_protocol = my_protocol_string;
04182               protocol_match = 1;
04183               break;
04184             }
04185         }
04186       if (protocol_match)
04187         break;
04188     }
04189   if (!protocol_match)
04190     ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowEndPoint::go_to_listen failed: no protoocol match\n"), 0);
04191 
04192   for (u_int j=0;j<this->protocol_addresses_.length ();j++)
04193     if (ACE_OS::strncmp (this->protocol_addresses_ [j], listen_protocol.in (), ACE_OS::strlen (listen_protocol.in ())) == 0)
04194       {
04195         // Now listen on that protocol.
04196         TAO_Forward_FlowSpec_Entry *entry;
04197         ACE_NEW_RETURN (entry,
04198                         TAO_Forward_FlowSpec_Entry (this->flowname_.in (),
04199                                                     direction,
04200                                                     this->format_.in (),
04201                                                     flowProtocol,
04202                                                     this->protocol_addresses_ [j]),
04203                         0);
04204 
04205         TAO_AV_Acceptor_Registry *acceptor_registry = TAO_AV_CORE::instance ()->acceptor_registry ();
04206         this->flow_spec_set_.insert (entry);
04207         int result = acceptor_registry->open (this,
04208                                               TAO_AV_CORE::instance (),
04209                                               this->flow_spec_set_);
04210         if (result < 0)
04211           return 0;
04212         char *listen_address = entry->get_local_addr_str ();
04213         char *address;
04214         ACE_NEW_RETURN (address,
04215                         char [BUFSIZ],
04216                         0);
04217         ACE_OS::sprintf (address, "%s=%s", listen_protocol.in (), listen_address);
04218         return address;
04219       }
04220   return 0;
04221 }
04222 
04223 
04224 CORBA::Boolean
04225 TAO_FlowEndPoint::connect_to_peer_i (TAO_FlowSpec_Entry::Role role,
04226                                      AVStreams::QoS & /*the_qos*/,
04227                                      const char * address,
04228                                      const char * use_flow_protocol)
04229 {
04230   char direction [BUFSIZ];
04231   switch (role)
04232     {
04233     case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
04234       ACE_OS::strcpy (direction, "IN");
04235       break;
04236     case TAO_FlowSpec_Entry::TAO_AV_CONSUMER:
04237       ACE_OS::strcpy (direction, "OUT");
04238       break;
04239     default:
04240       break;
04241     }
04242   TAO_Forward_FlowSpec_Entry *entry;
04243   ACE_NEW_RETURN (entry,
04244                   TAO_Forward_FlowSpec_Entry (this->flowname_.in (),
04245                                               direction,
04246                                               this->format_.in (),
04247                                               use_flow_protocol,
04248                                               address),
04249                   0);
04250   this->flow_spec_set_.insert (entry);
04251   TAO_AV_Connector_Registry *connector_registry = TAO_AV_CORE::instance ()->connector_registry ();
04252   int result = connector_registry->open (this,
04253                                          TAO_AV_CORE::instance (),
04254                                          this->flow_spec_set_);
04255   if (result < 0)
04256     ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowEndPoint::connector_registry::open failed\n"), 0);
04257   this->reverse_channel_ = entry->get_local_addr_str ();
04258   return 1;
04259 }
04260 
04261 int
04262 TAO_FlowEndPoint::set_protocol_object (const char * /*flowname*/,
04263                                        TAO_AV_Protocol_Object * /*object*/)
04264 {
04265   return 0;
04266 }
04267 
04268 
04269 // ------------------------------------------------------------
04270 // TAO_FlowProducer class
04271 // ------------------------------------------------------------
04272 
04273 //default constructor
04274 TAO_FlowProducer::TAO_FlowProducer (void)
04275 {
04276 }
04277 
04278 TAO_FlowProducer::TAO_FlowProducer (const char *flowname,
04279                                     AVStreams::protocolSpec protocols,
04280                                     const char *format)
04281 {
04282   this->open (flowname, protocols, format);
04283 }
04284 
04285 // gets the reverse channel for feedback.
04286 char *
04287 TAO_FlowProducer::get_rev_channel (const char * /*pcol_name*/)
04288 {
04289   return 0;
04290 }
04291 
04292 // The start, stop and destroy are to be handled by the application.
04293 void
04294 TAO_FlowProducer::stop (void)
04295 {
04296   TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04297   for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04298        begin != end; ++begin)
04299     {
04300       TAO_FlowSpec_Entry *entry = (*begin);
04301       entry->handler ()->stop (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
04302     }
04303 }
04304 
04305 void
04306 TAO_FlowProducer::start (void)
04307 {
04308   TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04309   for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04310        begin != end; ++begin)
04311     {
04312       TAO_FlowSpec_Entry *entry = (*begin);
04313       if (entry->handler () != 0)
04314         {
04315           entry->handler ()->start (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
04316         }
04317       if (entry->control_handler () != 0)
04318         {
04319           entry->control_handler ()->start (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
04320         }
04321     }
04322 }
04323 
04324 char *
04325 TAO_FlowProducer::go_to_listen (AVStreams::QoS & the_qos,
04326                                 CORBA::Boolean is_mcast,
04327                                 AVStreams::FlowEndPoint_ptr peer_fep,
04328                                 char *& flowProtocol)
04329 {
04330   return this->go_to_listen_i (TAO_FlowSpec_Entry::TAO_AV_PRODUCER,
04331                                the_qos,
04332                                is_mcast,
04333                                peer_fep,
04334                                flowProtocol);
04335 }
04336 
04337 CORBA::Boolean
04338 TAO_FlowProducer::connect_to_peer (AVStreams::QoS & the_qos,
04339                                    const char * address,
04340                                    const char * use_flow_protocol)
04341 {
04342   return this->connect_to_peer_i (TAO_FlowSpec_Entry::TAO_AV_PRODUCER,
04343                                   the_qos,
04344                                   address,
04345                                   use_flow_protocol);
04346 }
04347 //  Connect to a IP multicast address.
04348 char *
04349 TAO_FlowProducer::connect_mcast (AVStreams::QoS & /* the_qos */,
04350                                  CORBA::Boolean_out /* is_met */,
04351                                  const char *address,
04352                                  const char * use_flow_protocol)
04353 {
04354   // The address variable gives the multicast address to subscribe to.
04355   for (u_int i=0;i<this->protocols_.length ();i++)
04356     {
04357       // choose the protocol which supports multicast.
04358     }
04359 
04360   if (address == 0)
04361     if (TAO_debug_level > 0)
04362       ACE_DEBUG ((LM_DEBUG, "TAO_FlowProducer::connect_mcast address is 0\n"));
04363   TAO_Forward_FlowSpec_Entry  *entry;
04364   ACE_NEW_RETURN (entry,
04365                   TAO_Forward_FlowSpec_Entry(this->flowname_.in (),
04366                                              "IN",
04367                                              this->format_.in (),
04368                                              use_flow_protocol,
04369                                              address),
04370                   0);
04371 
04372   this->flow_spec_set_.insert (entry);
04373   TAO_AV_Acceptor_Registry *acceptor_registry =
04374     TAO_AV_CORE::instance ()->acceptor_registry ();
04375   int result = acceptor_registry->open (this,
04376                                         TAO_AV_CORE::instance (),
04377                                         this->flow_spec_set_);
04378   if (result < 0)
04379     ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowProducer::connect_mcast:acceptor_registry open failed\n"), 0);
04380   // Now remove our handler from the reactor since we're a producer and dont want to get called for
04381   // multicast packets.
04382   ACE_Event_Handler *event_handler = entry->handler ()->event_handler ();
04383   event_handler->reactor ()->remove_handler (event_handler,
04384                                              ACE_Event_Handler::READ_MASK);
04385   return CORBA::string_dup (address);
04386 }
04387 
04388 // sets the key for this flow.
04389 void
04390 TAO_FlowProducer::set_key (const AVStreams::key & the_key)
04391 {
04392   try
04393     {
04394       CORBA::Any anyval;
04395       anyval <<= the_key;
04396       this->define_property ("PublicKey",
04397                              anyval);
04398     }
04399   catch (const CORBA::Exception& ex)
04400     {
04401       ex._tao_print_exception ("TAO_FlowProducer::set_key");
04402     }
04403 }
04404 
04405 // source id to be used to distinguish this source from others.
04406 void
04407 TAO_FlowProducer::set_source_id (CORBA::Long source_id)
04408 {
04409   this->source_id_ = source_id;
04410 }
04411 
04412 // ------------------------------------------------------------
04413 // TAO_FlowConsumer
04414 // ------------------------------------------------------------
04415 
04416 
04417 // default constructor.
04418 TAO_FlowConsumer::TAO_FlowConsumer (void)
04419 {
04420 }
04421 
04422 TAO_FlowConsumer::TAO_FlowConsumer (const char *flowname,
04423                                     AVStreams::protocolSpec protocols,
04424                                     const char *format)
04425 {
04426   this->open (flowname, protocols, format);
04427 }
04428 
04429 // The start, stop and destroy are to be handled by the application.
04430 void
04431 TAO_FlowConsumer::stop (void)
04432 {
04433   TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04434   for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04435        begin != end; ++begin)
04436     (*begin)->handler ()->stop (TAO_FlowSpec_Entry::TAO_AV_CONSUMER);
04437 }
04438 
04439 void
04440 TAO_FlowConsumer::start (void)
04441 {
04442   TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04443   for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04444        begin != end; ++begin)
04445     {
04446       (*begin)->handler ()->start (TAO_FlowSpec_Entry::TAO_AV_CONSUMER);
04447     }
04448 }
04449 
04450 char *
04451 TAO_FlowConsumer::go_to_listen (AVStreams::QoS & the_qos,
04452                                 CORBA::Boolean is_mcast,
04453                                 AVStreams::FlowEndPoint_ptr peer_fep,
04454                                 char *& flowProtocol)
04455 {
04456   return this->go_to_listen_i (TAO_FlowSpec_Entry::TAO_AV_CONSUMER,
04457                                the_qos,
04458                                is_mcast,
04459                                peer_fep,
04460                                flowProtocol);
04461 }
04462 
04463 CORBA::Boolean
04464 TAO_FlowConsumer::connect_to_peer (AVStreams::QoS & the_qos,
04465                                    const char * address,
04466                                    const char * use_flow_protocol)
04467 {
04468   return this->connect_to_peer_i (TAO_FlowSpec_Entry::TAO_AV_CONSUMER,
04469                                   the_qos,
04470                                   address,
04471                                   use_flow_protocol);
04472 }
04473 
04474 //------------------------------------------------------------
04475 // TAO_Tokenizer
04476 //------------------------------------------------------------
04477 TAO_Tokenizer::TAO_Tokenizer (const char *string, char delimiter)
04478   :token_array_ (10),
04479    count_ (0)
04480 {
04481   this->parse (string, delimiter);
04482 }
04483 
04484 TAO_Tokenizer::~TAO_Tokenizer ()
04485 {
04486   for (unsigned int i=0; i<this->num_tokens_; i++)
04487     CORBA::string_free (this->token_array_[i]);
04488 }
04489 
04490 
04491 int
04492 TAO_Tokenizer::parse (const char *string, char delimiter)
04493 {
04494   ACE_CString new_string (string);
04495   u_int pos =0;
04496   ACE_CString::size_type slash_pos = 0;
04497   u_int count = 0;
04498   int result;
04499   while (pos < new_string.length ())
04500     {
04501       slash_pos = new_string.find (delimiter, pos);
04502       ACE_CString substring;
04503       if (slash_pos != new_string.npos)
04504         {
04505           substring = new_string.substring (pos,
04506                                             slash_pos - pos);
04507           pos = slash_pos + 1;
04508         }
04509       else
04510         {
04511           substring = new_string.substring (pos);
04512           pos = static_cast<int> (new_string.length ());
04513         }
04514       char *token = CORBA::string_dup (substring.c_str ());
04515       result = this->token_array_.set (token, count);
04516       if (result == -1)
04517         {
04518           this->token_array_.size (this->token_array_.size ()*2);
04519           result = this->token_array_.set (token, count);
04520           if (result == -1)
04521             ACE_ERROR_RETURN ((LM_ERROR, "TAO_Tokenizer::parse error"), -1);
04522         }
04523       count++;
04524     }
04525 
04526   /*
04527   ACE_OS::strcpy (this->string_ , string);
04528   char delimiter_str [2] = {0, 0};
04529   delimiter_str [0] = delimiter;
04530   char *token = ACE_OS::strtok (this->string_, delimiter_str);
04531 
04532   while (token != 0)
04533     {
04534       result = this->token_array_.set (token, count);
04535       if (result == -1)
04536         {
04537           this->token_array_.size (this->token_array_.size ()*2);
04538           result = this->token_array_.set (token, count);
04539           if (result == -1)
04540             ACE_ERROR_RETURN ((LM_ERROR, "TAO_Tokenizer::parse error"), -1);
04541         }
04542       token = ACE_OS::strtok (0, delimiter_str);
04543       count++;
04544     }
04545   */
04546   this->num_tokens_ = count;
04547   return 0;
04548 }
04549 
04550 char*
04551 TAO_Tokenizer::token (void)
04552 {
04553   if (count_ < num_tokens_)
04554     return CORBA::string_dup (this->token_array_[this->count_++]);
04555   else
04556     return 0;
04557 }
04558 
04559 int
04560 TAO_Tokenizer::num_tokens (void)
04561 {
04562   return static_cast<int> (this->num_tokens_);
04563 }
04564 
04565 const char *
04566 TAO_Tokenizer::operator [] (size_t index) const
04567 {
04568   if (index >= this->num_tokens_)
04569     return 0;
04570 
04571   return this->token_array_[index];
04572 }
04573 
04574 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:47:49 2010 for TAO_AV by  doxygen 1.4.7