AVStreams_i.cpp

Go to the documentation of this file.
00001 
00002 //=============================================================================
00003 /**
00004  *  @file   AVStreams_i.cpp
00005  *
00006  *  $Id: AVStreams_i.cpp 78820 2007-07-07 20:06:46Z sowayaa $
00007  *
00008  *  @author Sumedh Mungee <sumedh@cs.wustl.edu> Nagarajan Surendran <naga@cs.wustl.edu>
00009  */
00010 //=============================================================================
00011 
00012 
00013 #include "orbsvcs/AV/AVStreams_i.h"
00014 #include "orbsvcs/AV/sfp.h"
00015 #include "orbsvcs/AV/MCast.h"
00016 #include "orbsvcs/AV/RTCP.h"
00017 
00018 #include "tao/debug.h"
00019 #include "tao/ORB_Core.h"
00020 #include "tao/AnyTypeCode/Any.h"
00021 #include "ace/OS_NS_arpa_inet.h"
00022 
00023 #if !defined (__ACE_INLINE__)
00024 #include "orbsvcs/AV/AVStreams_i.inl"
00025 #endif /* __ACE_INLINE__ */
00026 
00027 ACE_RCSID (AV,
00028            AVStreams_i,
00029            "$Id: AVStreams_i.cpp 78820 2007-07-07 20:06:46Z sowayaa $")
00030 
00031 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00032 
00033 //------------------------------------------------------------
00034 // TAO_AV_Qos
00035 //------------------------------------------------------------
00036 
00037 TAO_AV_QoS::TAO_AV_QoS (void)
00038 {
00039 }
00040 
00041 TAO_AV_QoS::TAO_AV_QoS (AVStreams::streamQoS &stream_qos)
00042 {
00043   this->set (stream_qos);
00044 }
00045 
00046 int
00047 TAO_AV_QoS::convert (AVStreams::streamQoS &/*network_qos*/)
00048 {
00049   return -1;
00050 }
00051 
00052 
00053 // ----------------------------------------------------------------------
00054 // AV_Null_MediaCtrl
00055 // ----------------------------------------------------------------------
00056 AV_Null_MediaCtrl::AV_Null_MediaCtrl (void)
00057 {
00058 }
00059 
00060 AV_Null_MediaCtrl::~AV_Null_MediaCtrl (void)
00061 {
00062 }
00063 
00064 
00065 // ----------------------------------------------------------------------
00066 // TAO_Basic_StreamCtrl
00067 // ----------------------------------------------------------------------
00068 
00069 // Constructor
00070 TAO_Basic_StreamCtrl::TAO_Basic_StreamCtrl (void)
00071   :flow_count_ (0)
00072 {
00073 }
00074 
00075 
00076 // Stop the transfer of data of the stream
00077 // Empty the_spec means apply operation to all flows
00078 void
00079 TAO_Basic_StreamCtrl::stop (const AVStreams::flowSpec &flow_spec)
00080 {
00081   try
00082     {
00083       // @@Call stop on the Related MediaCtrl.  call stop on the flow
00084       // connections.
00085       if (this->flow_connection_map_.current_size () > 0)
00086         {
00087           if (flow_spec.length () > 0)
00088             for (u_int i=0;i<flow_spec.length ();i++)
00089               {
00090                 char *flowname = TAO_AV_Core::get_flowname (flow_spec[i]);
00091                 ACE_CString flow_name_key (flowname);
00092                 AVStreams::FlowConnection_var flow_connection_entry;
00093                 if (this->flow_connection_map_.find (flow_name_key,
00094                                                      flow_connection_entry) == 0)
00095                   {
00096                     flow_connection_entry->stop ();
00097                   }
00098               }
00099           else
00100             {
00101               // call stop on all the flows.
00102               FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
00103               FlowConnection_Map_Entry *entry;
00104               for (;iterator.next (entry) !=  0;iterator.advance ())
00105                 {
00106                   entry->int_id_->stop ();
00107                 }
00108             }
00109         }
00110     }
00111   catch (const CORBA::Exception& ex)
00112     {
00113       ex._tao_print_exception ("TAO_Basic_StreamCtrl::stop");
00114       return;
00115     }
00116 }
00117 
00118 // Start the transfer of data in the stream.
00119 // Empty the_spec means apply operation to all flows
00120 void
00121 TAO_Basic_StreamCtrl::start (const AVStreams::flowSpec &flow_spec)
00122 {
00123   try
00124     {
00125       // @@Call start on the Related MediaCtrl.
00126 
00127       // call start on the flow connections.
00128       if (this->flow_connection_map_.current_size () > 0)
00129         {
00130           if (flow_spec.length () > 0)
00131             for (u_int i = 0; i < flow_spec.length (); i++)
00132               {
00133                 char *flowname = TAO_AV_Core::get_flowname (flow_spec[i]);
00134                 ACE_CString flow_name_key (flowname);
00135                 FlowConnection_Map::ENTRY *flow_connection_entry = 0;
00136                 if (this->flow_connection_map_.find (flow_name_key,
00137                                                      flow_connection_entry) == 0)
00138                   {
00139                     flow_connection_entry->int_id_->start ();
00140                   }
00141               }
00142           else
00143             {
00144               // call start on all the flows.
00145               FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
00146               FlowConnection_Map_Entry *entry = 0;
00147               for (;iterator.next (entry) !=  0;iterator.advance ())
00148                 {
00149                   entry->int_id_->start ();
00150                 }
00151             }
00152         }
00153     }
00154   catch (const CORBA::Exception& ex)
00155     {
00156       ex._tao_print_exception ("TAO_Basic_StreamCtrl::start");
00157       return;
00158     }
00159 }
00160 
00161 // Tears down the stream. This will close the connection, and delete
00162 // the streamendpoint and vdev associated with this stream Empty
00163 // the_spec means apply operation to all flows
00164 
00165 void
00166 TAO_Basic_StreamCtrl::destroy (const AVStreams::flowSpec &flow_spec)
00167 {
00168   try
00169     {
00170       // call stop on the flow connections.
00171       if (this->flow_connection_map_.current_size () > 0)
00172         {
00173           if (flow_spec.length () > 0)
00174           {
00175             for (u_int i=0;i<flow_spec.length ();i++)
00176               {
00177                 char *flowname = TAO_AV_Core::get_flowname (flow_spec[i]);
00178                 ACE_CString flow_name_key (flowname);
00179                 FlowConnection_Map::ENTRY *flow_connection_entry = 0;
00180                 if (this->flow_connection_map_.find (flow_name_key, flow_connection_entry) == 0)
00181                   {
00182                     flow_connection_entry->int_id_->destroy ();
00183                   }
00184               }
00185           }
00186           else
00187             {
00188               // call destroy on all the flows.
00189               FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
00190               FlowConnection_Map_Entry *entry = 0;
00191               for (;iterator.next (entry) !=  0;iterator.advance ())
00192                 {
00193                   entry->int_id_->destroy ();
00194                 }
00195             }
00196         }
00197     }
00198   catch (const CORBA::Exception& ex)
00199     {
00200       ex._tao_print_exception ("TAO_Basic_StreamCtrl::destroy");
00201       return;
00202     }
00203 }
00204 
00205 // Changes the QoS associated with the stream
00206 // Empty the_spec means apply operation to all flows
00207 CORBA::Boolean
00208 
00209 TAO_Basic_StreamCtrl::modify_QoS (AVStreams::streamQoS & /*new_qos*/,
00210                                   const AVStreams::flowSpec &/*flowspec*/)
00211 {
00212   return 1;
00213 }
00214 
00215 // Used by StreamEndPoint and VDev to inform StreamCtrl of events.
00216 // E.g., loss of flow, reestablishment of flow, etc..
00217 void
00218 TAO_Basic_StreamCtrl::push_event (const struct CosPropertyService::Property &/*the_event*/)
00219 {
00220   if (TAO_debug_level > 0)
00221     ACE_DEBUG ((LM_DEBUG, "\n(%P|%t) Recieved event \""));
00222 }
00223 
00224 // Sets the flow protocol status.
00225 void
00226 TAO_Basic_StreamCtrl::set_FPStatus (const AVStreams::flowSpec &flow_spec,
00227                                     const char  *fp_name,
00228                                     const CORBA::Any &fp_settings)
00229 
00230 {
00231   if (!CORBA::is_nil (this->sep_a_.in ()))
00232     {
00233       this->sep_a_->set_FPStatus (flow_spec, fp_name, fp_settings);
00234     }
00235 }
00236 
00237 // Gets the flow connection.
00238 CORBA::Object_ptr
00239 TAO_Basic_StreamCtrl::get_flow_connection (const char *flow_name)
00240 {
00241   ACE_CString flow_name_key (flow_name);
00242   AVStreams::FlowConnection_var flow_connection_entry;
00243 
00244   if (this->flow_connection_map_.find (flow_name_key, flow_connection_entry) == 0){
00245     return flow_connection_entry._retn();
00246   }
00247   else{
00248     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%N,%l) Cannot find flow: %s\n", flow_name ));
00249     throw AVStreams::noSuchFlow ();
00250   }
00251 }
00252 
00253 // Sets the flow connection.
00254 void
00255 TAO_Basic_StreamCtrl::set_flow_connection (const char *flow_name,
00256                                            CORBA::Object_ptr flow_connection_obj)
00257 {
00258   AVStreams::FlowConnection_var flow_connection;
00259   try
00260     {
00261       flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj);
00262     }
00263   catch (const CORBA::Exception& ex)
00264     {
00265       ex._tao_print_exception (
00266         "TAO_Basic_StreamCtrl::set_flow_connection");
00267       return;
00268     }
00269   // add the flowname and the flowconnection to the hashtable.
00270   this->flows_.length (this->flow_count_ + 1);
00271   this->flows_ [this->flow_count_++] = CORBA::string_dup (flow_name);
00272   ACE_CString flow_name_key (flow_name);
00273   if (this->flow_connection_map_.bind (flow_name_key, flow_connection) != 0)
00274   {
00275     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%N,%l) Cannot find flow: %s\n", flow_name ));
00276     throw AVStreams::noSuchFlow ();// is this right?
00277   }
00278 }
00279 
00280 TAO_Basic_StreamCtrl::~TAO_Basic_StreamCtrl (void)
00281 {
00282 }
00283 
00284 // ----------------------------------------------------------------------
00285 // TAO_Negotiator
00286 // ----------------------------------------------------------------------
00287 
00288 CORBA::Boolean
00289 TAO_Negotiator::negotiate (AVStreams::Negotiator_ptr /* remote_negotiator */,
00290                            const AVStreams::streamQoS &/* qos_spec */)
00291 {
00292   ACE_DEBUG ((LM_DEBUG,
00293               "TAO_Negotiator::negotiate\n"));
00294   return 0;
00295 }
00296 
00297 // ----------------------------------------------------------------------
00298 // MMDevice_Map_Hash_Key
00299 // ----------------------------------------------------------------------
00300 
00301 const int MMDevice_Map_Hash_Key::hash_maximum_ = 10000;
00302 
00303 //default constructor.
00304 MMDevice_Map_Hash_Key::MMDevice_Map_Hash_Key (void)
00305 {
00306   this->mmdevice_ = AVStreams::MMDevice::_nil ();
00307 }
00308 
00309 // constructor.
00310 MMDevice_Map_Hash_Key::MMDevice_Map_Hash_Key (AVStreams::MMDevice_ptr mmdevice)
00311 {
00312   this->mmdevice_ = AVStreams::MMDevice::_duplicate (mmdevice);
00313 }
00314 
00315 // copy constructor.
00316 MMDevice_Map_Hash_Key::MMDevice_Map_Hash_Key (const MMDevice_Map_Hash_Key& hash_key)
00317 {
00318   this->mmdevice_ = AVStreams::MMDevice::_duplicate (hash_key.mmdevice_);
00319 }
00320 
00321 // destructor.
00322 MMDevice_Map_Hash_Key::~MMDevice_Map_Hash_Key (void)
00323 {
00324   CORBA::release (this->mmdevice_);
00325 }
00326 
00327 bool
00328 MMDevice_Map_Hash_Key::operator == (const MMDevice_Map_Hash_Key &hash_key) const
00329 {
00330   CORBA::Boolean result = 0;
00331   try
00332     {
00333       result =
00334         this->mmdevice_->_is_equivalent (hash_key.mmdevice_);
00335     }
00336   catch (const CORBA::Exception& ex)
00337     {
00338       ex._tao_print_exception (
00339         "MMDevice_Map_Hash_Key::operator == ");
00340       return false;
00341     }
00342 
00343   return result;
00344 }
00345 
00346 bool
00347 operator < (const MMDevice_Map_Hash_Key &left,
00348             const MMDevice_Map_Hash_Key &right)
00349 {
00350   bool result = false;
00351 
00352   try
00353     {
00354       const CORBA::ULong left_hash =
00355         left.mmdevice_->_hash (left.hash_maximum_);
00356 
00357       const CORBA::ULong right_hash =
00358         right.mmdevice_->_hash (right.hash_maximum_);
00359 
00360       result = left_hash < right_hash;
00361     }
00362   catch (const CORBA::Exception& ex)
00363     {
00364       ex._tao_print_exception ("operator < for MMDevice_Map_Hash_Key");
00365       return false;
00366     }
00367 
00368   return result;
00369 }
00370 
00371 u_long
00372 MMDevice_Map_Hash_Key::hash (void)  const
00373 {
00374   u_long result = 0;
00375   try
00376     {
00377       result = this->mmdevice_->_hash (this->hash_maximum_);
00378     }
00379   catch (const CORBA::Exception& ex)
00380     {
00381       ex._tao_print_exception ("MMDevice_Map_Hash_Key::hash");
00382       return 0;
00383     }
00384   return result;
00385 }
00386 
00387 // ----------------------------------------------------------------------
00388 // TAO_StreamCtrl
00389 // ----------------------------------------------------------------------
00390 
00391 TAO_StreamCtrl::TAO_StreamCtrl (void)
00392   :mcastconfigif_ (0)
00393 {
00394   try
00395     {
00396       this->streamctrl_ = this->_this ();
00397       char buf [BUFSIZ];
00398       int result = ACE_OS::hostname (buf, BUFSIZ);
00399       unsigned long ipaddr = 0;
00400       if (result == 0)
00401         ipaddr = ACE_OS::inet_addr (buf);
00402       this->source_id_ = TAO_AV_RTCP::alloc_srcid (ipaddr);
00403     }
00404   catch (const CORBA::Exception& ex)
00405     {
00406       ex._tao_print_exception ("TAO_StreamCtrl::TAO_StreamCtrl");
00407     }
00408 }
00409 
00410 TAO_StreamCtrl::~TAO_StreamCtrl (void)
00411 {
00412   delete this->mcastconfigif_;
00413 }
00414 
00415 
00416 // Stop the transfer of data of the stream
00417 // Empty the_spec means apply operation to all flows
00418 void
00419 TAO_StreamCtrl::stop (const AVStreams::flowSpec &flow_spec)
00420 {
00421   try
00422     {
00423       TAO_Basic_StreamCtrl::stop (flow_spec);
00424       if (this->flow_connection_map_.current_size () > 0)
00425         return;
00426       MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
00427       MMDevice_Map::ENTRY *entry = 0;
00428       for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
00429         {
00430           entry->int_id_.sep_->stop (flow_spec);
00431         }
00432       MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
00433       for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
00434         {
00435           entry->int_id_.sep_->stop (flow_spec);
00436         }
00437     }
00438   catch (const CORBA::Exception& ex)
00439     {
00440       ex._tao_print_exception ("TAO_Basic_StreamCtrl::stop");
00441       return;
00442     }
00443 }
00444 
00445 // Start the transfer of data in the stream.
00446 // Empty the_spec means apply operation to all flows
00447 void
00448 TAO_StreamCtrl::start (const AVStreams::flowSpec &flow_spec)
00449 {
00450   try
00451     {
00452       TAO_Basic_StreamCtrl::start (flow_spec);
00453       if (this->flow_connection_map_.current_size () > 0)
00454         return;
00455 
00456       MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
00457       MMDevice_Map::ENTRY *entry = 0;
00458       for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
00459         {
00460           entry->int_id_.sep_->start (flow_spec);
00461         }
00462       MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
00463       for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
00464         {
00465           entry->int_id_.sep_->start (flow_spec);
00466         }
00467     }
00468   catch (const CORBA::Exception& ex)
00469     {
00470       ex._tao_print_exception ("TAO_StreamCtrl::start");
00471       return;
00472     }
00473 }
00474 
00475 // Tears down the stream. This will close the connection, and delete
00476 // the streamendpoint and vdev associated with this stream
00477 // Empty the_spec means apply operation to all flows
00478 void
00479 TAO_StreamCtrl::destroy (const AVStreams::flowSpec &flow_spec)
00480 {
00481   try
00482     {
00483       TAO_Basic_StreamCtrl::destroy (flow_spec);
00484       if (this->flow_connection_map_.current_size () > 0)
00485         return;
00486 
00487       MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
00488       MMDevice_Map::ENTRY *entry = 0;
00489       for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
00490         {
00491           entry->int_id_.sep_->destroy (flow_spec);
00492         }
00493       MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
00494       for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
00495         {
00496           entry->int_id_.sep_->destroy (flow_spec);
00497         }
00498     }
00499   catch (const CORBA::Exception& ex)
00500     {
00501       ex._tao_print_exception ("TAO_StreamCtrl::destroy");
00502       return;
00503     }
00504 
00505   int result = TAO_AV_Core::deactivate_servant (this);
00506   if (result < 0)
00507     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamCtrl::destroy failed\n"));
00508 }
00509 
00510 // request the two MMDevices to create vdev and stream endpoints. save
00511 // the references returned.
00512 
00513 // The interaction diagram for this method is on page 13 of the spec
00514 CORBA::Boolean
00515 TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party,
00516                            AVStreams::MMDevice_ptr b_party,
00517                            AVStreams::streamQoS &the_qos,
00518                            const AVStreams::flowSpec &the_flows)
00519 {
00520   try
00521     {
00522       if (CORBA::is_nil (a_party) && CORBA::is_nil (b_party))
00523         ACE_ERROR_RETURN ((LM_ERROR, "Both parties are nil\n"), 0);
00524       // Check to see if we have non-nil parties to bind!
00525       if (TAO_debug_level > 0)
00526         if (CORBA::is_nil (a_party) ||
00527             CORBA::is_nil (b_party))
00528           if (TAO_debug_level > 0)
00529             ACE_DEBUG ((LM_DEBUG,
00530                         "(%P|%t) TAO_StreamCtrl::bind_devs: "
00531                         "a_party or b_party is null"
00532                         "Multicast mode\n"));
00533 
00534       // Request a_party to create the endpoint and vdev
00535       CORBA::Boolean met_qos;
00536       CORBA::String_var named_vdev;
00537 
00538       if (!CORBA::is_nil (a_party))
00539         {
00540           MMDevice_Map_Hash_Key find_key (a_party);
00541           MMDevice_Map_Entry find_entry;
00542           int result =
00543             this->mmdevice_a_map_.find (find_key, find_entry);
00544           if (result == 0)
00545             {
00546               if (TAO_debug_level > 0)
00547                 {
00548                   // Already in the map.
00549                   if (TAO_debug_level > 0)
00550                     ACE_DEBUG ((LM_DEBUG, "mmdevice a_party is already bound\n"));
00551                 }
00552               return 1;
00553             }
00554           else
00555             {
00556               this->sep_a_ =
00557                 a_party-> create_A (this->streamctrl_.in (),
00558                                     this->vdev_a_.out (),
00559                                     the_qos,
00560                                     met_qos,
00561                                     named_vdev.inout (),
00562                                     the_flows);
00563 
00564               if (TAO_debug_level > 0)
00565                 ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_StreamCtrl::create_A: succeeded\n"));
00566               // Define ourselves as the related_streamctrl property of the sep.
00567               CORBA::Any streamctrl_any;
00568               streamctrl_any <<= this->streamctrl_.in ();
00569               this->sep_a_->define_property ("Related_StreamCtrl",
00570                                              streamctrl_any);
00571 
00572               CORBA::Any vdev_a_any;
00573               vdev_a_any <<= this->vdev_a_.in ();
00574               this->sep_a_->define_property ("Related_VDev",
00575                                              vdev_a_any);
00576 
00577               CORBA::Any streamendpoint_a_any;
00578               streamendpoint_a_any <<= this->sep_a_.in ();
00579               this->vdev_a_->define_property ("Related_StreamEndpoint",
00580                                               streamendpoint_a_any);
00581 
00582 
00583               CORBA::Any mmdevice_a_any;
00584               mmdevice_a_any <<= a_party;
00585               this->vdev_a_->define_property ("Related_MMDevice",
00586                                               mmdevice_a_any);
00587 
00588               // add the mmdevice, sep and vdev to the map.
00589               MMDevice_Map_Entry map_entry;
00590               MMDevice_Map_Hash_Key key (a_party);
00591               map_entry.sep_ = AVStreams::StreamEndPoint_A::_duplicate (this->sep_a_.in ());
00592               map_entry.vdev_ = AVStreams::VDev::_duplicate (this->vdev_a_.in ());
00593               map_entry.flowspec_ = the_flows;
00594               map_entry.qos_ = the_qos;
00595               result =
00596                 this->mmdevice_a_map_.bind (key, map_entry);
00597               if (result < 0)
00598                 if (TAO_debug_level > 0)
00599                   ACE_DEBUG ((LM_DEBUG, "Error binding mmdevice entry in the a_map"));
00600             }
00601         }
00602       // Request b_party to create the endpoint and vdev
00603       if (!CORBA::is_nil (b_party))
00604         {
00605           MMDevice_Map_Hash_Key find_key (b_party);
00606           MMDevice_Map_Entry find_entry;
00607           int result =
00608             this->mmdevice_b_map_.find (find_key, find_entry);
00609           if (result == 0)
00610             {
00611               // Already in the map.
00612               if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "mmdevice b_party is already bound\n"));
00613               return 1;
00614             }
00615           else
00616             {
00617               this->sep_b_ =
00618                 b_party-> create_B (this->streamctrl_.in (),
00619                                     this->vdev_b_.out (),
00620                                     the_qos,
00621                                     met_qos,
00622                                     named_vdev.inout (),
00623                                     the_flows);
00624 
00625               if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_StreamCtrl::create_B: succeeded\n"));
00626 
00627               if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
00628                           "\n(%P|%t)stream_endpoint_b_ = %s",
00629                           TAO_ORB_Core_instance ()->orb ()->object_to_string (this->sep_b_.in ())));
00630               // Define ourselves as the related_streamctrl property of the sep.
00631               CORBA::Any streamctrl_any;
00632               streamctrl_any <<= this->streamctrl_.in ();
00633               this->sep_b_->define_property ("Related_StreamCtrl",
00634                                              streamctrl_any);
00635 
00636               CORBA::Any vdev_b_any;
00637               vdev_b_any <<= this->vdev_b_.in ();
00638               this->sep_b_->define_property ("Related_VDev",
00639                                              vdev_b_any);
00640 
00641               CORBA::Any streamendpoint_b_any;
00642               streamendpoint_b_any <<= this->sep_b_.in ();
00643               this->vdev_b_->define_property ("Related_StreamEndpoint",
00644                                               streamendpoint_b_any);
00645 
00646 
00647               CORBA::Any mmdevice_b_any;
00648               mmdevice_b_any <<= b_party;
00649               this->vdev_b_->define_property ("Related_MMDevice",
00650                                               mmdevice_b_any);
00651               // add the mmdevice, sep and vdev to the map.
00652               MMDevice_Map_Entry map_entry;
00653               MMDevice_Map_Hash_Key key (b_party);
00654               map_entry.sep_ = AVStreams::StreamEndPoint::_duplicate (this->sep_b_.in ());
00655               map_entry.vdev_ = AVStreams::VDev::_duplicate(this->vdev_b_.in ());
00656               map_entry.flowspec_ = the_flows;
00657               map_entry.qos_ = the_qos;
00658               int result =
00659                 this->mmdevice_b_map_.bind (key, map_entry);
00660               if (result < 0)
00661                 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Error binding mmdevice entry in the b_map"));
00662             }
00663         }
00664 
00665       // Tell the endpoints about each other.
00666       if ((!CORBA::is_nil (a_party)) && (!CORBA::is_nil (b_party)))
00667         {
00668           CORBA::Any sep_a_peer_any;
00669           CORBA::Any sep_b_peer_any;
00670 
00671           sep_a_peer_any <<= this->sep_b_.in();
00672           sep_b_peer_any <<= this->sep_a_.in();
00673           this->sep_a_->define_property ("PeerAdapter",
00674                                           sep_a_peer_any);
00675 
00676           this->sep_b_->define_property ("PeerAdapter",
00677                                          sep_b_peer_any);
00678         }
00679 
00680       // In the full profile case there's no VDev.
00681       if (CORBA::is_nil (b_party) && (!CORBA::is_nil (this->vdev_a_.in ())))
00682         {
00683           // Now set the source id for this A endpoint.
00684           // If the sep contains flow producers then set the source ids for those
00685           // instead.
00686           try
00687             {
00688               CORBA::Any_ptr flows_any = this->sep_a_->get_property_value ("Flows");
00689               AVStreams::flowSpec_var flows;
00690               *flows_any >>= flows.out ();
00691               for (CORBA::ULong i=0; i< flows->length ();++i)
00692                 {
00693                   CORBA::Object_var fep_obj =
00694                     this->sep_a_->get_fep (flows [i]);
00695                   try
00696                     {
00697                       AVStreams::FlowProducer_var producer =
00698                         AVStreams::FlowProducer::_narrow (fep_obj.in ());
00699                       producer->set_source_id (this->source_id_++);
00700                     }
00701                   catch (const CORBA::Exception& ex)
00702                     {
00703                       if (TAO_debug_level > 0)
00704                         ACE_DEBUG ((LM_DEBUG, " %s ", static_cast<char const*>(flows[i])));
00705 
00706                       ex._tao_print_exception (
00707                         "producer_check: not a producer");
00708 
00709                     }
00710                 }
00711             }
00712           catch (const CORBA::Exception&)
00713             {
00714               // Since the full profile failed try setting the source id
00715               // for the sep instead.
00716               // @@Naga: What do we do if in the light profile the sep has
00717               // many producers who do not have flow interfaces. Then
00718               // the streamctrl has to give an array of source ids to
00719               // the sep.
00720               this->sep_a_->set_source_id (this->source_id_++);
00721             }
00722           if (!this->mcastconfigif_)
00723             {
00724               ACE_NEW_RETURN (this->mcastconfigif_,
00725                               TAO_MCastConfigIf,
00726                               0);
00727               // @@: Deactivating the object thru poa means calling remove_ref after _this.
00728               this->mcastconfigif_ptr_ = this->mcastconfigif_->_this ();
00729             }
00730           // Multicast source being added.
00731           CORBA::Boolean result = this->vdev_a_->set_Mcast_peer (this->streamctrl_.in (),
00732                                                                  this->mcastconfigif_ptr_.in (),
00733                                                                  the_qos,
00734                                                                  the_flows);
00735           if (!result)
00736             ACE_ERROR_RETURN ((LM_ERROR, "set_Mcast_peer failed\n"), 0);
00737         }
00738 
00739       if (CORBA::is_nil (a_party))
00740         {
00741           if (!CORBA::is_nil (this->vdev_b_.in ()))
00742             {
00743               // Multicast sink being added.
00744               if (!this->mcastconfigif_)
00745                 ACE_ERROR_RETURN ((LM_ERROR, "first add a source and then a sink\n"), 0);
00746               this->mcastconfigif_->set_peer (this->vdev_b_.in (),
00747                                               the_qos,
00748                                               the_flows);
00749             }
00750 
00751           int connect_leaf_success = 0;
00752           try
00753             {
00754               // @@: define null interfaces for Atm so that they can be implemented once
00755               //  ACE adds support for ATM multicast.
00756               connect_leaf_success = this->sep_a_->connect_leaf (this->sep_b_.in (),
00757                                                                  the_qos,
00758                                                                  the_flows);
00759               connect_leaf_success = 1;
00760             }
00761           catch (const AVStreams::notSupported&)
00762             {
00763               if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "connect_leaf failed\n"));
00764               connect_leaf_success = 0;
00765             }
00766           catch (const CORBA::Exception& ex)
00767             {
00768               ex._tao_print_exception (
00769                 "TAO_StreamCtrl::bind_devs");
00770             }
00771           if (!connect_leaf_success)
00772             {
00773               if (TAO_debug_level > 0)
00774                 ACE_DEBUG ((LM_DEBUG,"TAO_StreamCtrl::bind_devs Multiconnect\n"));
00775               AVStreams::flowSpec connect_flows = the_flows;
00776               this->sep_a_->multiconnect (the_qos, connect_flows);
00777               this->sep_b_->multiconnect (the_qos, connect_flows);
00778             }
00779         }
00780 
00781       if (!CORBA::is_nil (a_party) && !CORBA::is_nil (b_party))
00782         {
00783           // Check to see if the MMDevice contains FDev objects
00784           // If it contains FDev objects, then we are using the
00785           // Full profile, and we want to call bind() instead
00786           // of connect() on the the streamctrl
00787           if( a_party->is_property_defined("Flows") &&
00788               b_party->is_property_defined("Flows") )
00789           {
00790               if (TAO_debug_level > 0) {
00791                 //FUZZ: disable check_for_lack_ACE_OS
00792                 ACE_DEBUG ((LM_DEBUG, "(%N,%l) Full profile, invoking bind()\n"));
00793                 //FUZZ: enable check_for_lack_ACE_OS
00794               }
00795 
00796               // It is full profile
00797               // we have feps in the sep then dont call connect
00798               // instead call bind on the streamctrl.
00799               this->bind (this->sep_a_.in (),
00800                           this->sep_b_.in (),
00801                           the_qos,
00802                           the_flows);
00803 
00804 
00805 
00806           }
00807           // This is the light profile, call connect()
00808           else  if (!CORBA::is_nil (this->vdev_a_.in ()) && !CORBA::is_nil (this->vdev_b_.in ()))
00809           {
00810               if (TAO_debug_level > 0) {
00811                 //FUZZ: disable check_for_lack_ACE_OS
00812                 ACE_DEBUG ((LM_DEBUG, "(%N,%l) Light profile, invoking connect()\n"));
00813                 //FUZZ: enable check_for_lack_ACE_OS
00814               }
00815 
00816               // Tell the 2 VDev's about one another
00817               this->vdev_a_->set_peer (this->streamctrl_.in (),
00818                                        this->vdev_b_.in (),
00819                                        the_qos,
00820                                        the_flows);
00821 
00822               this->vdev_b_->set_peer (this->streamctrl_.in (),
00823                                        this->vdev_a_.in (),
00824                                        the_qos,
00825                                        the_flows);
00826 
00827 
00828               // Now connect the streams together. This will
00829               // establish the connection
00830               CORBA::Boolean result  =
00831                 this->sep_a_->connect (this->sep_b_.in (),
00832                                        the_qos,
00833                                        the_flows);
00834               if (result == 0)
00835                 ACE_ERROR_RETURN ((LM_ERROR, "sep_a->connect (sep_b) failed\n"), 0);
00836           }
00837         }
00838     }
00839   catch (const CORBA::Exception& ex)
00840     {
00841       ex._tao_print_exception ("TAO_StreamCtrl::bind_devs");
00842       return 0;
00843     }
00844   return 1;
00845 }
00846 
00847 // Used to establish a connection between two endpoints
00848 // directly, i.e. without a MMDevice
00849 CORBA::Boolean
00850 TAO_StreamCtrl::bind (AVStreams::StreamEndPoint_A_ptr sep_a,
00851                       AVStreams::StreamEndPoint_B_ptr sep_b,
00852                       AVStreams::streamQoS &stream_qos,
00853                       const AVStreams::flowSpec &flow_spec)
00854 {
00855   this->sep_a_ = AVStreams::StreamEndPoint_A::_duplicate(sep_a);
00856   this->sep_b_ = AVStreams::StreamEndPoint_B::_duplicate(sep_b);
00857 
00858   int result = 0;
00859   try
00860     {
00861       if (CORBA::is_nil (sep_a_.in() ) ||
00862           CORBA::is_nil (sep_b_.in() ))
00863         ACE_ERROR_RETURN ((LM_ERROR,
00864                            "(%P|%t) TAO_StreamCtrl::bind:"
00865                            "a_party or b_party null!"),
00866                           0);
00867 
00868       // Define each other as their peers.
00869       CORBA::Any sep_any;
00870       sep_any <<= sep_b;
00871       sep_a_->define_property ("PeerAdapter",
00872                               sep_any);
00873       sep_any <<= sep_a;
00874       sep_b_->define_property ("PeerAdapter",
00875                               sep_any);
00876       // since its full profile we do the viable stream setup algorithm.
00877       // get the flows for the A streamendpoint.
00878       // the flows spec is empty and hence we do a exhaustive match.
00879       AVStreams::flowSpec a_flows, b_flows;
00880       CORBA::Any_var flows_any;
00881       flows_any = sep_a_->get_property_value ("Flows");
00882       AVStreams::flowSpec *temp_flows;
00883       flows_any.in () >>= temp_flows;
00884       a_flows = *temp_flows;
00885       flows_any = sep_b_->get_property_value ("Flows");
00886       flows_any.in () >>= temp_flows;
00887       b_flows = *temp_flows;
00888       u_int i;
00889       FlowEndPoint_Map *a_fep_map;
00890       FlowEndPoint_Map *b_fep_map;
00891       ACE_NEW_RETURN (a_fep_map,
00892                       FlowEndPoint_Map,
00893                       0);
00894       ACE_NEW_RETURN (b_fep_map,
00895                       FlowEndPoint_Map,
00896                       0);
00897       for (i=0;i<a_flows.length ();i++)
00898         {
00899           const char *flowname = a_flows[i];
00900           // get the flowendpoint references.
00901           CORBA::Object_var fep_obj =
00902             sep_a_->get_fep (flowname);
00903           AVStreams::FlowEndPoint_var fep =
00904             AVStreams::FlowEndPoint::_narrow (fep_obj.in ());
00905           ACE_CString fep_key (flowname);
00906           result = a_fep_map->bind (fep_key, fep);
00907           if (result == -1)
00908             if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamCtrl::bind failed for %s\n", flowname));
00909         }
00910       // get the flowendpoints for streamendpoint_b
00911       for (i=0;i<b_flows.length ();i++)
00912         {
00913           const char *flowname = b_flows[i];
00914           // get the flowendpoint references.
00915           CORBA::Object_var fep_obj =
00916             sep_b->get_fep (flowname);
00917           AVStreams::FlowEndPoint_var fep =
00918             AVStreams::FlowEndPoint::_narrow (fep_obj.in ());
00919           ACE_CString fep_key (flowname);
00920           result = b_fep_map->bind (fep_key, fep);
00921           if (result == -1)
00922             if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamCtrl::bind failed for %s\n", flowname));
00923         }
00924       FlowEndPoint_Map *map_a = 0, *map_b = 0;
00925       if (flow_spec.length () == 0)
00926         {
00927           map_a = a_fep_map;
00928           map_b = b_fep_map;
00929         }
00930       else
00931         {
00932           FlowEndPoint_Map *spec_fep_map_a, *spec_fep_map_b;
00933           ACE_NEW_RETURN (spec_fep_map_a,
00934                           FlowEndPoint_Map,
00935                           0);
00936           ACE_NEW_RETURN (spec_fep_map_b,
00937                           FlowEndPoint_Map,
00938                           0);
00939           for (i=0; i< flow_spec.length ();i++)
00940             {
00941               TAO_Forward_FlowSpec_Entry *entry = 0;
00942               ACE_NEW_RETURN (entry,
00943                               TAO_Forward_FlowSpec_Entry,
00944                               0);
00945               entry->parse (flow_spec[i]);
00946               ACE_CString fep_key (entry->flowname ());
00947               AVStreams::FlowEndPoint_var fep;
00948               result = a_fep_map->find (fep_key, fep);
00949               if (result == -1)
00950                 ACE_ERROR_RETURN ((LM_ERROR, "Fep not found on A side for flowname: %s\n", flow_spec[i].in ()), 0);
00951 
00952               result = spec_fep_map_a->bind (fep_key, fep);
00953               if (result == -1)
00954                 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Bind failed for %s\n", flow_spec[i].in ()));
00955 
00956               result = b_fep_map->find (fep_key, fep);
00957               if (result == -1)
00958                 ACE_ERROR_RETURN ((LM_ERROR, "Fep not found on B side for flowname: %s\n", flow_spec[i].in ()), 0);
00959 
00960               result = spec_fep_map_b->bind (fep_key, fep);
00961               if (result == -1)
00962                 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Bind failed for %s\n", flow_spec[i].in ()));
00963             }
00964           map_a = spec_fep_map_a;
00965           map_b = spec_fep_map_b;
00966         }
00967 
00968       TAO_AV_QoS qos (stream_qos);
00969       // Now go thru the list of flow endpoint and match them.
00970       // uses the first match policy.
00971       FlowEndPoint_Map_Iterator a_feps_iterator (*map_a);
00972       FlowEndPoint_Map_Entry *a_feps_entry, *b_feps_entry;
00973       try
00974         {
00975 
00976           for (;a_feps_iterator.next (a_feps_entry) != 0;
00977                a_feps_iterator.advance ())
00978             {
00979               AVStreams::FlowEndPoint_var fep_a = a_feps_entry->int_id_;
00980               AVStreams::FlowEndPoint_var connected_to =
00981                 fep_a->get_connected_fep ();
00982 
00983               if (!CORBA::is_nil (connected_to.in ()))
00984                 {
00985                   // Skip this one, it is already connected...
00986                   continue;
00987                 }
00988 
00989               FlowEndPoint_Map_Iterator b_feps_iterator (*map_b);
00990               for (;b_feps_iterator.next (b_feps_entry) != 0;
00991                    b_feps_iterator.advance ())
00992                 {
00993                   AVStreams::FlowEndPoint_var fep_b = b_feps_entry->int_id_;
00994                   AVStreams::FlowConnection_var flow_connection;
00995 
00996                   AVStreams::FlowEndPoint_var connected_to =
00997                     fep_b->get_connected_fep ();
00998 
00999                   if (!CORBA::is_nil (connected_to.in ()))
01000                     {
01001                       // Skip this one, it is already connected...
01002                       continue;
01003                     }
01004 
01005                   if (fep_a->is_fep_compatible (fep_b.in()) == 1)
01006                     {
01007                       // assume that flow names are same so that we
01008                       // can use either of them.
01009                       CORBA::Object_var flow_connection_obj;
01010                       CORBA::Any_var flowname_any =
01011                         fep_a->get_property_value ("FlowName");
01012                       const char *flowname = 0;
01013                       flowname_any.in () >>= flowname;
01014                       try
01015                         {
01016                           flow_connection_obj =
01017                             this->get_flow_connection (flowname);
01018                           flow_connection =
01019                             AVStreams::FlowConnection::_narrow (flow_connection_obj.in ());
01020                         }
01021                       catch (const CORBA::Exception&)
01022                         {
01023                           TAO_FlowConnection *flowConnection;
01024                           ACE_NEW_RETURN (flowConnection,
01025                                           TAO_FlowConnection,
01026                                           0);
01027                           flow_connection = flowConnection->_this ();
01028                           this->set_flow_connection (flowname,
01029                                                      flow_connection.in ());
01030                         }
01031 
01032                       // make sure that a_feps is flow_producer
01033                       // and b_feps is flow_consumer
01034                       // There should be a way to find which flow
01035                       // endpoint is producer and which is
01036                       // consumer.
01037 
01038                       AVStreams::FlowProducer_var producer;
01039                       AVStreams::FlowConsumer_var consumer;
01040 
01041                       try
01042                         {
01043                           producer =
01044                             AVStreams::FlowProducer::_narrow (fep_a.in());
01045                           consumer =
01046                             AVStreams::FlowConsumer::_narrow (fep_b.in());
01047 
01048                           // If the types don't match then try in
01049                           // the opposite order
01050                           if (CORBA::is_nil (producer.in ()))
01051                             {
01052                               producer =
01053                                 AVStreams::FlowProducer::_narrow (fep_b.in());
01054                               consumer =
01055                                 AVStreams::FlowConsumer::_narrow (fep_a.in());
01056                             }
01057                           // At this point they should both be
01058                           // non-nil
01059                           // @@ raise an exception (which one?) if
01060                           // this is not true...
01061                           ACE_ASSERT (!CORBA::is_nil (producer.in ()));
01062                           ACE_ASSERT (!CORBA::is_nil (consumer.in ()));
01063                         }
01064                       catch (const CORBA::Exception&)
01065                         {
01066                           //Yamuna : Recheck this
01067                           throw;//_EX (producer_check);
01068                         }
01069                       CORBA::String_var fep_a_name, fep_b_name;
01070                       flowname_any = fep_a->get_property_value ("FlowName");
01071                       const char *temp_name;
01072                       flowname_any.in () >>= temp_name;
01073                       fep_a_name = CORBA::string_dup (temp_name);
01074                       flowname_any = fep_b->get_property_value ("FlowName");
01075                       flowname_any.in () >>= temp_name;
01076                       fep_b_name = CORBA::string_dup (temp_name);
01077                       AVStreams::QoS flow_qos;
01078                       flow_qos.QoSType = fep_a_name;
01079                       flow_qos.QoSParams.length (0);
01080                       result = qos.get_flow_qos (fep_a_name.in (), flow_qos);
01081                       if (result == -1)
01082                         {
01083                           flow_qos.QoSType = fep_b_name;
01084                           result = qos.get_flow_qos (fep_b_name.in (),
01085                                                      flow_qos);
01086                           if (result == -1 && TAO_debug_level > 0)
01087                             ACE_DEBUG ((LM_DEBUG,
01088                                         "No QoS Specified for this flow <%s>\n", flowname));
01089                         }
01090                       flow_connection->connect (producer.in (),
01091                                                 consumer.in (),
01092                                                 flow_qos);
01093                     }
01094                 }
01095             }
01096         }
01097       catch (const CORBA::Exception& ex)
01098         {
01099           ex._tao_print_exception ("TAO_StreamCtrl::bind:flow_connect block");
01100           return 0;
01101         }
01102     }
01103   catch (const CORBA::Exception&)
01104     {
01105       // error was thrown because one of the streamendpoints is light profile.
01106       // Now connect the streams together
01107       this->sep_a_->connect (this->sep_b_.in (),
01108                              stream_qos,
01109                              flow_spec);
01110     }
01111   return 1;
01112 }
01113 
01114 void
01115 TAO_StreamCtrl::unbind (void)
01116 {
01117   try
01118     {
01119       if (this->flow_connection_map_.current_size () > 0)
01120         return;
01121 
01122       AVStreams::flowSpec flow_spec;
01123       flow_spec.length(0);
01124 
01125       MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
01126       MMDevice_Map::ENTRY *entry = 0;
01127       for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
01128         {
01129           entry->int_id_.sep_->destroy (flow_spec);
01130         }
01131       MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
01132       for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
01133         {
01134           entry->int_id_.sep_->destroy (flow_spec);
01135         }
01136     }
01137   catch (const CORBA::Exception& ex)
01138     {
01139       ex._tao_print_exception ("TAO_StreamCtrl::unbind");
01140       return;
01141     }
01142 }
01143 
01144 void
01145 TAO_StreamCtrl::unbind_party (AVStreams::StreamEndPoint_ptr /* the_ep */,
01146                               const AVStreams::flowSpec &/* the_spec */)
01147 {
01148 }
01149 
01150 void
01151 TAO_StreamCtrl::unbind_dev (AVStreams::MMDevice_ptr /* dev */,
01152                             const AVStreams::flowSpec & /* the_spec */)
01153 {
01154 }
01155 
01156 AVStreams::VDev_ptr
01157 TAO_StreamCtrl::get_related_vdev (AVStreams::MMDevice_ptr adev,
01158                                   AVStreams::StreamEndPoint_out sep)
01159 {
01160   MMDevice_Map_Hash_Key key (adev);
01161   MMDevice_Map_Entry entry;
01162   int result = -1;
01163   result = this->mmdevice_a_map_.find (key, entry);
01164   if (result < 0)
01165     {
01166       result = this->mmdevice_a_map_.find (key, entry);
01167       if (result < 0)
01168         return AVStreams::VDev::_nil ();
01169     }
01170   sep = AVStreams::StreamEndPoint::_duplicate (entry.sep_.in ());
01171   return AVStreams::VDev::_duplicate (entry.vdev_.in ());
01172 }
01173 
01174 CORBA::Boolean
01175 
01176 TAO_StreamCtrl::modify_QoS (AVStreams::streamQoS &new_qos,
01177                             const AVStreams::flowSpec &the_spec)
01178 {
01179   if (TAO_debug_level > 0)
01180   ACE_DEBUG ((LM_DEBUG,
01181               "TAO_StreamCtrl::modify_QoS\n"));
01182 
01183 
01184   if (this->mcastconfigif_ != 0)
01185     {
01186       // call modify_Qos on the root VDev which is the mcast configif.
01187       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Cannot Modify the Qos for multipoint streams\n"));
01188     }
01189   else
01190     {
01191       try
01192         {
01193           AVStreams::flowSpec in_flowspec;
01194           AVStreams::flowSpec out_flowspec;
01195 
01196           in_flowspec.length (0);
01197           out_flowspec.length (0);
01198 
01199           int in_index = 0;
01200           int out_index = 0;
01201 
01202           AVStreams::flowSpec flowspec;
01203           if (the_spec.length () == 0)
01204             {
01205               // Apply modify_qos to all the flows
01206               flowspec = this->flows_;
01207               MMDevice_Map_Iterator iterator (this->mmdevice_a_map_);
01208               MMDevice_Map::ENTRY *entry = 0;
01209               for (;iterator.next (entry) !=  0;iterator.advance ())
01210                 {
01211                   flowspec = entry->int_id_.flowspec_;
01212                 }
01213             }
01214           else
01215             {
01216               flowspec = the_spec;
01217             }
01218 
01219           if (TAO_debug_level > 0)
01220             ACE_DEBUG ((LM_DEBUG,
01221                         "TAO_StreamCtrl::modify_QoS\n"));
01222 
01223 
01224           for (u_int i=0;i < flowspec.length ();i++)
01225             {
01226               TAO_Forward_FlowSpec_Entry entry;
01227               entry.parse (flowspec [i]);
01228               int direction = entry.direction ();
01229               if (direction == 0)
01230                 {
01231                   in_flowspec.length (in_index + 1);
01232                   in_flowspec [in_index++] = CORBA::string_dup (entry.entry_to_string ());
01233                 }
01234               else
01235                 {
01236                   out_flowspec.length (out_index + 1);
01237                   out_flowspec [out_index++] = CORBA::string_dup (entry.entry_to_string ());
01238                 }
01239             }
01240 
01241           if (in_flowspec.length () != 0)
01242             {
01243               this->vdev_a_->modify_QoS (new_qos, in_flowspec);
01244             }
01245 
01246           if (out_flowspec.length () != 0)
01247             {
01248               this->vdev_b_->modify_QoS (new_qos, out_flowspec);
01249             }
01250         }
01251       catch (const CORBA::Exception& ex)
01252         {
01253           ex._tao_print_exception ("TAO_StreamCtrl::modify_QoS");
01254           return 0;
01255         }
01256 
01257     }
01258   return 1;
01259 }
01260 
01261 // ----------------------------------------------------------------------
01262 // TAO_MCastConfigIf
01263 // ----------------------------------------------------------------------
01264 
01265 TAO_MCastConfigIf::TAO_MCastConfigIf (void)
01266   :peer_list_iterator_ (peer_list_)
01267 {
01268 }
01269 
01270 TAO_MCastConfigIf::~TAO_MCastConfigIf (void)
01271 {
01272   //no-op
01273 }
01274 
01275 // In future this should be a multicast message instead of point-to-point unicasts.
01276 CORBA::Boolean
01277 TAO_MCastConfigIf::set_peer (CORBA::Object_ptr peer,
01278                              AVStreams::streamQoS & qos,
01279                              const AVStreams::flowSpec & flow_spec)
01280 {
01281   try
01282     {
01283       Peer_Info *info;
01284       ACE_NEW_RETURN (info,
01285                       Peer_Info,
01286                       0);
01287       info->peer_ = AVStreams::VDev::_narrow (peer);
01288       info->qos_ = qos;
01289       info->flow_spec_ = flow_spec;
01290       this->peer_list_.insert_tail (info);
01291     }
01292   catch (const CORBA::Exception& ex)
01293     {
01294       ex._tao_print_exception ("TAO_MCastConfigIf::set_peer");
01295       return 0;
01296     }
01297   return 1;
01298 }
01299 
01300 // In future this should be a multicast message instead of point-to-point unicasts.
01301 void
01302 TAO_MCastConfigIf::configure (const CosPropertyService::Property & a_configuration)
01303 {
01304   Peer_Info *info;
01305   try
01306     {
01307       for (this->peer_list_iterator_.first ();
01308            (info = this->peer_list_iterator_.next ()) != 0;
01309            this->peer_list_iterator_.advance ())
01310         {
01311           info->peer_->configure (a_configuration);
01312         }
01313     }
01314   catch (const CORBA::Exception& ex)
01315     {
01316       ex._tao_print_exception (
01317         "TAO_MCastConfigIf::set_configure");
01318       return;
01319     }
01320 }
01321 
01322 
01323 void
01324 TAO_MCastConfigIf::set_initial_configuration (const CosPropertyService::Properties &initial)
01325 {
01326   this->initial_configuration_ = initial;
01327 }
01328 
01329 // In future this should be a multicast message instead of point-to-point unicasts.
01330 void
01331 TAO_MCastConfigIf::set_format (const char * flowName,
01332                                const char * format_name)
01333 {
01334   Peer_Info *info;
01335   try
01336     {
01337       for (this->peer_list_iterator_.first ();
01338            (info = this->peer_list_iterator_.next ()) != 0;
01339            this->peer_list_iterator_.advance ())
01340         {
01341           if (this->in_flowSpec (info->flow_spec_, flowName))
01342             {
01343               info->peer_->set_format (flowName, format_name);
01344             }
01345         }
01346     }
01347   catch (const CORBA::Exception& ex)
01348     {
01349       ex._tao_print_exception ("TAO_MCastConfigIf::set_format");
01350       return;
01351     }
01352 }
01353 
01354 // In future this should be a multicast message instead of point-to-point unicasts.
01355 void
01356 TAO_MCastConfigIf::set_dev_params (const char * flowName,
01357                                    const CosPropertyService::Properties & new_params)
01358 {
01359   Peer_Info *info;
01360   try
01361     {
01362 
01363       for (this->peer_list_iterator_.first ();
01364            (info = this->peer_list_iterator_.next ()) != 0;
01365            this->peer_list_iterator_.advance ())
01366         {
01367           if (this->in_flowSpec (info->flow_spec_, flowName))
01368             {
01369               info->peer_->set_dev_params (flowName, new_params);
01370             }
01371         }
01372     }
01373   catch (const CORBA::Exception& ex)
01374     {
01375       ex._tao_print_exception (
01376         "TAO_MCastConfigIf::set_dev_params");
01377       return;
01378     }
01379 }
01380 
01381 int
01382 TAO_MCastConfigIf::in_flowSpec (const AVStreams::flowSpec& flow_spec, const char *flow_name)
01383 {
01384   size_t len = ACE_OS::strlen (flow_name);
01385   for (CORBA::ULong i = 0; i < flow_spec.length (); i++)
01386     if (ACE_OS::strncmp (flow_spec[i], flow_name, len) == 0)
01387       {
01388         return 1;
01389       }
01390   return 0;
01391 }
01392 
01393 // ----------------------------------------------------------------------
01394 // TAO_Base_StreamEndPoint
01395 // ----------------------------------------------------------------------
01396 
01397 TAO_Base_StreamEndPoint::TAO_Base_StreamEndPoint (void)
01398   : protocol_object_set_ (0)
01399 {
01400 }
01401 
01402 TAO_Base_StreamEndPoint::~TAO_Base_StreamEndPoint (void)
01403 {
01404 }
01405 
01406 int
01407 TAO_Base_StreamEndPoint::handle_close (void)
01408 {
01409   // This method should not be defined, but EGCS complains endlessly
01410   // about it.
01411   return -1;
01412 }
01413 
01414 int
01415 TAO_Base_StreamEndPoint::handle_open (void)
01416 {
01417   return 0;
01418 }
01419 
01420 int
01421 TAO_Base_StreamEndPoint::handle_stop (const AVStreams::flowSpec &)
01422 {
01423   return 0;
01424 }
01425 
01426 int
01427 TAO_Base_StreamEndPoint::handle_start (const AVStreams::flowSpec &)
01428 {
01429   return 0;
01430 }
01431 
01432 int
01433 TAO_Base_StreamEndPoint::handle_destroy (const AVStreams::flowSpec &)
01434 {
01435   return 0;
01436 }
01437 
01438 // The following function is for backward compatibility.
01439 CORBA::Boolean
01440 TAO_Base_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &)
01441 {
01442   return 1;
01443 }
01444 
01445 // The following function is for backward compatibility.
01446 CORBA::Boolean
01447 TAO_Base_StreamEndPoint::handle_postconnect (AVStreams::flowSpec &)
01448 {
01449 
01450   while (!this->is_protocol_object_set ())
01451     TAO_AV_CORE::instance ()->orb ()->perform_work ();
01452   return 1;
01453 }
01454 
01455 // The following function is for backward compatibility.
01456 CORBA::Boolean
01457 TAO_Base_StreamEndPoint::handle_connection_requested (AVStreams::flowSpec &)
01458 {
01459   return 1;
01460 }
01461 
01462 int
01463 TAO_Base_StreamEndPoint::set_protocol_object (const char * /*flowname*/,
01464                                               TAO_AV_Protocol_Object * /*sfp_object*/)
01465 {
01466   return -1;
01467 }
01468 
01469 void
01470 TAO_Base_StreamEndPoint::protocol_object_set (void)
01471 {
01472   this->protocol_object_set_ = 1;
01473 }
01474 
01475 
01476 int
01477 TAO_Base_StreamEndPoint::is_protocol_object_set (void)
01478 {
01479   return this->protocol_object_set_;
01480 }
01481 
01482 int
01483 TAO_Base_StreamEndPoint::get_callback (const char * /*flowname*/,
01484                                        TAO_AV_Callback *&/*sfp_callback*/)
01485 {
01486   return -1;
01487 }
01488 
01489 int
01490 TAO_Base_StreamEndPoint::get_control_callback (const char * /*flowname*/,
01491                                                TAO_AV_Callback *&/*sfp_callback*/)
01492 {
01493   return -1;
01494 }
01495 
01496 void
01497 TAO_Base_StreamEndPoint::set_flow_handler (const char *flowname,
01498                                            TAO_AV_Flow_Handler *handler)
01499 {
01500   if(TAO_debug_level > 1)
01501   {
01502      ACE_DEBUG ((LM_DEBUG, "(%N,%l) TAO_Base_StreamEndPoint::set_flow_handler(), flowname: %s\n", flowname));
01503   }
01504   ACE_CString flow_name_key (flowname);
01505   if (this->flow_handler_map_.bind (flow_name_key, handler) != 0)
01506     ACE_ERROR ((LM_ERROR,
01507                 "Error in storing flow handler\n"));
01508 }
01509 
01510 void
01511 TAO_Base_StreamEndPoint::set_control_flow_handler (const char *flowname,
01512                                                    TAO_AV_Flow_Handler *handler)
01513 {
01514   ACE_CString flow_name_key (flowname);
01515   if (this->control_flow_handler_map_.bind (flow_name_key, handler) != 0)
01516     ACE_ERROR ((LM_ERROR,
01517                 "Error in storing control flow handler\n"));
01518 }
01519 
01520 // ----------------------------------------------------------------------
01521 // TAO_StreamEndPoint
01522 // ----------------------------------------------------------------------
01523 
01524 // constructor.
01525 
01526 TAO_StreamEndPoint::TAO_StreamEndPoint (void)
01527   :flow_count_ (0),
01528    flow_num_ (0),
01529    mcast_port_ (ACE_DEFAULT_MULTICAST_PORT+1)
01530 {
01531   //is->mcast_addr_ = ACE_OS::inet_addr (ACE_DEFAULT_MULTICAST_ADDR);
01532   this->mcast_addr_.set (ACE_DEFAULT_MULTICAST_ADDR);
01533   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::TAO_StreamEndPoint::mcast_addr = %s", this->mcast_addr_.c_str ()));
01534   //  this->handle_open ();
01535 }
01536 
01537 
01538 CORBA::Boolean
01539 TAO_StreamEndPoint::connect (AVStreams::StreamEndPoint_ptr responder,
01540                              AVStreams::streamQoS &qos,
01541                              const AVStreams::flowSpec &the_spec)
01542 {
01543   if (TAO_debug_level > 0)
01544     ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect ()\n"));
01545   CORBA::Boolean retv = 0;
01546   this->peer_sep_ = AVStreams::StreamEndPoint::_duplicate (responder);
01547   try
01548     {
01549       if (!CORBA::is_nil (this->negotiator_.in ()))
01550         {
01551           ACE_DEBUG ((LM_DEBUG,
01552                       "NEGOTIATOR AVIALABLE\n"));
01553 
01554           CORBA::Any_var negotiator_any = responder->get_property_value ("Negotiator");
01555 
01556           AVStreams::Negotiator_ptr peer_negotiator;
01557           negotiator_any.in () >>= peer_negotiator;
01558           if (!CORBA::is_nil (peer_negotiator))
01559             {
01560               CORBA::Boolean result =
01561                 this->negotiator_->negotiate (peer_negotiator,
01562                                               qos);
01563               if (!result)
01564                 if (TAO_debug_level > 0)
01565                   ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect (): negotiate failed\n"));
01566             }
01567         }
01568     }
01569   catch (const CORBA::Exception& ex)
01570     {
01571       ex._tao_print_exception ("TAO_StreamEndPoint::negotiate");
01572     }
01573 
01574   try
01575     {
01576       if (this->protocols_.length () > 0)
01577         {
01578           // choose protocols based on what the remote endpoint can support.
01579           CORBA::Any_var protocols_any =
01580             responder->get_property_value ("AvailableProtocols");
01581           AVStreams::protocolSpec peer_protocols;
01582           AVStreams::protocolSpec *temp_protocols;
01583           protocols_any.in () >>= temp_protocols;
01584           peer_protocols = *temp_protocols;
01585           for (u_int i=0;i<peer_protocols.length ();i++)
01586             {
01587               for (u_int j=0;j<this->protocols_.length ();j++)
01588                 if (ACE_OS::strcmp (peer_protocols [i],
01589                                     this->protocols_[j]) == 0)
01590                   {
01591                     // we'll agree upon the first protocol that matches.
01592                     this->protocol_ = CORBA::string_dup (peer_protocols [i]);
01593                     break;
01594                   }
01595             }
01596         }
01597     }
01598   catch (const CORBA::Exception&)
01599     {
01600       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Availableprotocols property not defined\n"));
01601     }
01602   try
01603     {
01604       AVStreams::streamQoS network_qos;
01605       if (qos.length () > 0)
01606         {
01607           if (TAO_debug_level > 0)
01608             ACE_DEBUG ((LM_DEBUG,
01609                         "QoS is Specified\n"));
01610 
01611           int result = this->translate_qos (qos,
01612                                             network_qos);
01613           if (result != 0)
01614             if (TAO_debug_level > 0)
01615               ACE_DEBUG ((LM_DEBUG,
01616                           "QoS translation failed\n"));
01617 
01618           this->qos ().set (network_qos);
01619         }
01620 
01621 
01622       AVStreams::flowSpec flow_spec (the_spec);
01623       this->handle_preconnect (flow_spec);
01624 
01625       if (TAO_debug_level > 0)
01626         ACE_DEBUG ((LM_DEBUG,
01627                     "TAO_StreamEndPoint::connect: flow_spec_length = %d\n",
01628                     flow_spec.length ()));
01629       u_int i;
01630       for (i=0;i<flow_spec.length ();i++)
01631         {
01632           TAO_Forward_FlowSpec_Entry *entry = 0;
01633           ACE_NEW_RETURN (entry,
01634                           TAO_Forward_FlowSpec_Entry,
01635                           0);
01636 
01637           if (entry->parse (flow_spec[i]) == -1)
01638             return 0;
01639 
01640           if (TAO_debug_level > 0)
01641             ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect: %s\n",  entry->entry_to_string ()));
01642 
01643           this->forward_flow_spec_set.insert (entry);
01644         }
01645 
01646       int result =TAO_AV_CORE::instance ()->init_forward_flows (this,
01647                                                                 this->forward_flow_spec_set,
01648                                                                 TAO_AV_Core::TAO_AV_ENDPOINT_A,
01649                                                                 flow_spec);
01650 
01651 
01652       if (result < 0)
01653         ACE_ERROR_RETURN ((LM_ERROR, "%N:%l TAO_AV_Core::init_forward_flows failed\n"), 0);
01654 
01655 
01656       AVStreams::StreamEndPoint_var streamendpoint = this->_this ();
01657 
01658       retv = responder->request_connection (streamendpoint.in (),
01659                                             0,
01660                                             network_qos,
01661                                             flow_spec);
01662 
01663       if (TAO_debug_level > 0)
01664          ACE_DEBUG ((LM_DEBUG, "%N:%l request_connection returned %d\n", retv));
01665 
01666       if (retv == 0)
01667         return retv;
01668       for (i=0;i<flow_spec.length ();i++)
01669         {
01670           TAO_Reverse_FlowSpec_Entry *entry = 0;
01671           ACE_NEW_RETURN (entry,
01672                           TAO_Reverse_FlowSpec_Entry,
01673                           0);
01674           if (entry->parse (flow_spec[i]) == -1)
01675             ACE_ERROR_RETURN ((LM_ERROR,
01676                                "Reverse_Flow_Spec_Set::parse failed\n"),
01677                               0);
01678 
01679           if (TAO_debug_level > 0)
01680             ACE_DEBUG ((LM_DEBUG,
01681                         "TAO_StreamEndPoint::Connect: Reverse Flow Spec %s\n",
01682                         entry->entry_to_string ()));
01683 
01684           this->reverse_flow_spec_set.insert (entry);
01685         }
01686 
01687       result = TAO_AV_CORE::instance ()->init_reverse_flows (this,
01688                                                              this->forward_flow_spec_set,
01689                                                              this->reverse_flow_spec_set,
01690                                                              TAO_AV_Core::TAO_AV_ENDPOINT_A);
01691       if (result < 0)
01692         ACE_ERROR_RETURN ((LM_ERROR,
01693                            "TAO_AV_Core::init_reverse_flows failed\n"),
01694                           0);
01695 
01696       // Make the upcall to the app
01697       retv = this->handle_postconnect (flow_spec);
01698     }
01699   catch (const CORBA::Exception& ex)
01700     {
01701       ex._tao_print_exception ("TAO_StreamEndPoint::connect");
01702       return 0;
01703     }
01704   return retv;
01705 }
01706 
01707 int
01708 TAO_StreamEndPoint::translate_qos (const AVStreams::streamQoS& application_qos,
01709                                    AVStreams::streamQoS& network_qos)
01710 {
01711   u_int len = application_qos.length ();
01712   network_qos.length (len);
01713   for (u_int i=0;i<len;i++)
01714     {
01715       network_qos [i].QoSType = application_qos [i].QoSType;
01716       network_qos [i].QoSParams = application_qos [i].QoSParams;
01717     }
01718   return 0;
01719 }
01720 
01721 // Stop the physical flow of data on the stream
01722 // Empty the_spec --> apply to all flows
01723 
01724 void
01725 TAO_StreamEndPoint::stop (const AVStreams::flowSpec &flow_spec)
01726 {
01727   // Make the upcall into the app
01728   this->handle_stop (flow_spec);
01729 
01730   if (flow_spec.length () > 0)
01731     {
01732 
01733       for (u_int i=0;i<flow_spec.length ();i++)
01734         {
01735           TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01736           for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01737                begin != end; ++begin)
01738             {
01739               TAO_Forward_FlowSpec_Entry entry;
01740               entry.parse (flow_spec[i]);
01741               if (ACE_OS::strcmp ((*begin)->flowname (), entry.flowname ()) == 0)
01742                {
01743                  TAO_FlowSpec_Entry *entry = *begin;
01744                  //                  (*begin)->protocol_object ()->stop ();
01745                  if (entry->handler() != 0)
01746                    entry->handler ()->stop (entry->role ());
01747                  if (entry->control_handler () != 0)
01748                    entry->control_handler ()->stop (entry->role ());
01749                  break;
01750                }
01751             }
01752         }
01753     }
01754   else
01755     {
01756       TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01757       for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01758            begin != end; ++begin)
01759         {
01760           TAO_FlowSpec_Entry *entry = *begin;
01761           //          entry->protocol_object ()->stop ();
01762           if (entry->handler() != 0)
01763             entry->handler ()->stop (entry->role ());
01764           if (entry->control_handler () != 0)
01765             entry->control_handler ()->stop (entry->role ());
01766         }
01767     }
01768 }
01769 
01770 // Start the physical flow of data on the stream
01771 // Empty the_spec --> apply to all flows
01772 void
01773 TAO_StreamEndPoint::start (const AVStreams::flowSpec &flow_spec)
01774 {
01775   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::start\n"));
01776   // Make the upcall into the app
01777   this->handle_start (flow_spec);
01778 
01779   if (flow_spec.length () > 0)
01780     {
01781       // Now call start on all the flow handlers.
01782       for (u_int i=0;i<flow_spec.length ();i++)
01783         {
01784           TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01785           for (TAO_AV_FlowSpecSetItor forward_begin = this->forward_flow_spec_set.begin ();
01786                forward_begin != end; ++forward_begin)
01787             {
01788               TAO_FlowSpec_Entry *entry = *forward_begin;
01789               if (ACE_OS::strcmp (entry->flowname (), flow_spec [i]) == 0)
01790                 {
01791                   //                  entry->protocol_object ()->start ();
01792                   if (entry->handler () != 0)
01793                   {
01794                     entry->handler ()->start (entry->role ());
01795                   }
01796                   if (entry->control_handler () != 0)
01797                   {
01798                     entry->control_handler ()->start (entry->role ());
01799                   }
01800                 }
01801             }
01802 
01803           end = this->reverse_flow_spec_set.end ();
01804           for (TAO_AV_FlowSpecSetItor reverse_begin = this->reverse_flow_spec_set.begin ();
01805                reverse_begin != end; ++reverse_begin)
01806             {
01807               TAO_FlowSpec_Entry *entry = *reverse_begin;
01808               if (ACE_OS::strcmp (entry->flowname (), flow_spec [i]) == 0)
01809                 {
01810                   //                  entry->protocol_object ()->start ();
01811                   if (entry->handler () != 0)
01812                   {
01813                     entry->handler ()->start (entry->role ());
01814                   }
01815                   if (entry->control_handler () != 0)
01816                   {
01817                     entry->control_handler ()->start (entry->role ());
01818                   }
01819                 }
01820             }
01821         }
01822     }
01823   else
01824     {
01825       TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01826       for (TAO_AV_FlowSpecSetItor forwardbegin = this->forward_flow_spec_set.begin ();
01827            forwardbegin != end; ++forwardbegin)
01828         {
01829           TAO_FlowSpec_Entry *entry = *forwardbegin;
01830           if (entry->handler () != 0)
01831             {
01832               entry->handler ()->start (entry->role ());
01833             }
01834           if (entry->control_handler () != 0)
01835             {
01836               entry->control_handler ()->start (entry->role ());
01837             }
01838         }
01839 
01840       end = this->reverse_flow_spec_set.end ();
01841       for (TAO_AV_FlowSpecSetItor reversebegin = this->reverse_flow_spec_set.begin ();
01842            reversebegin != end; ++reversebegin)
01843         {
01844           TAO_FlowSpec_Entry *entry = *reversebegin;
01845           //          entry->protocol_object ()->start ();
01846           if (entry->handler () != 0)
01847             {
01848               entry->handler ()->start (entry->role ());
01849             }
01850           if (entry->control_handler () != 0)
01851             {
01852               entry->control_handler ()->start (entry->role ());
01853             }
01854         }
01855     }
01856 }
01857 
01858 // Close the connection
01859 void
01860 TAO_StreamEndPoint::destroy (const AVStreams::flowSpec &flow_spec)
01861 {
01862   CORBA::Any_var vdev_any = this->get_property_value ("Related_VDev");
01863 
01864   AVStreams::VDev_ptr vdev;
01865 
01866   vdev_any.in() >>= vdev;
01867   CORBA::Any_var mc_any = vdev->get_property_value ("Related_MediaCtrl");
01868 
01869   // The Related_MediaCtrl property was inserted as a CORBA::Object, so we
01870   // must extract it as the same type.
01871   CORBA::Object_var obj;
01872   mc_any.in() >>= CORBA::Any::to_object( obj.out() );
01873 
01874   AVStreams::MediaControl_var media_ctrl =
01875           AVStreams::MediaControl::_narrow( obj.in() );
01876 
01877   // deactivate the associated vdev and media ctrl
01878 
01879   if ( !CORBA::is_nil( vdev ) )
01880   {
01881     PortableServer::ServantBase_var vdev_servant =
01882         TAO_AV_CORE::instance()->poa()->reference_to_servant ( vdev );
01883     TAO_AV_Core::deactivate_servant (vdev_servant.in());
01884   }
01885 
01886   if ( !CORBA::is_nil ( media_ctrl.in () ) )
01887   {
01888     PortableServer::ServantBase_var mc_servant =
01889         TAO_AV_CORE::instance()->poa()->reference_to_servant (media_ctrl.in());
01890     TAO_AV_Core::deactivate_servant (mc_servant.in());
01891   }
01892 
01893   int result = TAO_AV_Core::deactivate_servant (this);
01894   if (result < 0)
01895     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::destroy failed\n"));
01896 
01897   if (flow_spec.length () > 0)
01898     {
01899       for (u_int i=0;i<flow_spec.length ();i++)
01900         {
01901           {
01902             TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01903             for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01904                  begin != end; ++begin)
01905               {
01906                 TAO_FlowSpec_Entry *entry = *begin;
01907                 TAO_Tokenizer flow_name (flow_spec [i], '\\');
01908                 if (ACE_OS::strcmp (entry->flowname (), flow_name[0]) == 0)
01909                   {
01910                     if (entry->protocol_object ())
01911                       {
01912                         entry->protocol_object ()->destroy ();
01913                       }
01914                     break;
01915                   }
01916               }
01917           }
01918           {
01919             TAO_AV_FlowSpecSetItor end = this->reverse_flow_spec_set.end ();
01920             for (TAO_AV_FlowSpecSetItor begin = this->reverse_flow_spec_set.begin ();
01921                  begin != end; ++begin)
01922               {
01923                 TAO_FlowSpec_Entry *entry = *begin;
01924                 TAO_Tokenizer flow_name (flow_spec [i], '\\');
01925                 if (ACE_OS::strcmp (entry->flowname (), flow_name[0]) == 0)
01926                   {
01927                     if (entry->protocol_object ())
01928                       {
01929                         entry->protocol_object ()->destroy ();
01930                       }
01931                     break;
01932                   }
01933               }
01934           }
01935         }
01936     }
01937   else
01938     {
01939       {
01940         TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01941         for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01942              begin != end; ++begin)
01943           {
01944             TAO_FlowSpec_Entry *entry = *begin;
01945             if (entry->protocol_object ())
01946               {
01947                 entry->protocol_object ()->stop ();
01948 
01949                 ACE_CString control_flowname =
01950                     TAO_AV_Core::get_control_flowname (entry->flowname ());
01951                 TAO_AV_CORE::instance()->remove_acceptor(entry->flowname());
01952                 TAO_AV_CORE::instance()->remove_acceptor(control_flowname.c_str());
01953 
01954                 entry->protocol_object ()->destroy ();
01955               }
01956           }
01957       }
01958       {
01959         TAO_AV_FlowSpecSetItor end = this->reverse_flow_spec_set.end ();
01960         for (TAO_AV_FlowSpecSetItor begin = this->reverse_flow_spec_set.begin ();
01961              begin != end; ++begin)
01962           {
01963             TAO_FlowSpec_Entry *entry = *begin;
01964             if (entry->protocol_object ())
01965               {
01966                 entry->protocol_object ()->stop ();
01967 
01968                 ACE_CString control_flowname =
01969                     TAO_AV_Core::get_control_flowname (entry->flowname ());
01970                 TAO_AV_CORE::instance()->remove_connector(entry->flowname());
01971                 TAO_AV_CORE::instance()->remove_connector(control_flowname.c_str());
01972                 entry->protocol_object ()->destroy ();
01973 
01974               }
01975           }
01976       }
01977     }
01978 
01979   // Make the upcall into the app
01980   //  this->handle_destroy (the_spec);
01981   //
01982 }
01983 
01984 // Called by our peer endpoint, requesting us to establish
01985 // a connection
01986 CORBA::Boolean
01987 TAO_StreamEndPoint::request_connection (AVStreams::StreamEndPoint_ptr /*initiator*/,
01988                                         CORBA::Boolean /*is_mcast*/,
01989                                         AVStreams::streamQoS &qos,
01990                                         AVStreams::flowSpec &flow_spec)
01991 
01992 {
01993   if (TAO_debug_level > 0)
01994     ACE_DEBUG ((LM_DEBUG,
01995                 "\n(%P|%t) TAO_StreamEndPoint::request_connection called"));
01996 
01997   int result = 0;
01998   try
01999     {
02000       AVStreams::streamQoS network_qos;
02001       if (qos.length () > 0)
02002         {
02003          if (TAO_debug_level > 0)
02004           ACE_DEBUG ((LM_DEBUG,
02005                       "QoS is Specified\n"));
02006 
02007           int result = this->translate_qos (qos, network_qos);
02008           if (result != 0)
02009             if (TAO_debug_level > 0)
02010               ACE_DEBUG ((LM_DEBUG, "QoS translation failed\n"));
02011 
02012           this->qos ().set (network_qos);
02013         }
02014 
02015       if (TAO_debug_level > 0)
02016         ACE_DEBUG ((LM_DEBUG,
02017                     "\n(%P|%t) TAO_StreamEndPoint::request_connection: "
02018                     "flowspec has length = %d and the strings are:\n",
02019                     flow_spec.length ()));
02020       CORBA::ULong i;
02021 
02022       for (i=0;i<flow_spec.length ();i++)
02023         {
02024           TAO_Forward_FlowSpec_Entry *entry = 0;
02025           ACE_NEW_RETURN (entry,
02026                           TAO_Forward_FlowSpec_Entry,
02027                           0);
02028 
02029           CORBA::String_var string_entry = CORBA::string_dup (flow_spec[i]);
02030 
02031           if(TAO_debug_level > 0)
02032              ACE_DEBUG(( LM_DEBUG,
02033                          "%N:%l Parsing flow spec: [%s]\n",
02034                          string_entry.in ()));
02035 
02036           if (entry->parse (string_entry.in ()) == -1)
02037           {
02038             if (TAO_debug_level > 0)
02039               ACE_DEBUG ((LM_DEBUG,
02040                           "%N:%l Error parsing flow_spec: [%s]\n",
02041                           string_entry.in ()));
02042             return 0;
02043           }
02044           if (TAO_debug_level > 0)
02045             ACE_DEBUG ((LM_DEBUG,
02046                         "TAO_StreamEndPoint::request_connection flow spec [%s]\n",
02047                         entry->entry_to_string ()));
02048 
02049           this->forward_flow_spec_set.insert (entry);
02050         }
02051 
02052       result = TAO_AV_CORE::instance ()->init_forward_flows (this,
02053                                                              this->forward_flow_spec_set,
02054                                                              TAO_AV_Core::TAO_AV_ENDPOINT_B,
02055                                                              flow_spec);
02056 
02057       if (result < 0)
02058         return 0;
02059 
02060       // Make the upcall to the app
02061       result = this->handle_connection_requested (flow_spec);
02062     }
02063   catch (const CORBA::Exception& ex)
02064     {
02065       ex._tao_print_exception ("TAO_StreamEndpoint::request_connection");
02066       return 0;
02067     }
02068   return result;
02069 }
02070 
02071 int
02072 TAO_StreamEndPoint::change_qos (AVStreams::streamQoS &new_qos,
02073                                 const AVStreams::flowSpec &the_flows)
02074 {
02075   if (TAO_debug_level > 0)
02076     ACE_DEBUG ((LM_DEBUG,
02077                 "TAO_StreamEndPoint::change_qos\n"));
02078 
02079   TAO_AV_QoS qos (new_qos);
02080   for (int i = 0; (unsigned) i < the_flows.length (); i++)
02081     {
02082       TAO_Forward_FlowSpec_Entry entry;
02083       entry.parse (the_flows [i]);
02084       ACE_CString flow_name_key (entry.flowname ());
02085       Flow_Handler_Map_Entry *handler_entry;
02086       if (this->flow_handler_map_.find (flow_name_key,
02087                                         handler_entry) == 0)
02088         {
02089           AVStreams::QoS flow_qos;
02090           if (qos.get_flow_qos (entry.flowname (), flow_qos) != 0)
02091             ACE_DEBUG ((LM_DEBUG,
02092                         "New QoS for the flow %s is not specified\n",
02093                         entry.flowname ()));
02094           int result;
02095           result = handler_entry->int_id_->change_qos (flow_qos);
02096           if (result != 0)
02097             ACE_ERROR_RETURN ((LM_ERROR,
02098                                "Modifying QoS Failed\n"),
02099                               -1);
02100 
02101         }
02102     }
02103   return 0;
02104 }
02105 
02106 // Refers to modification of transport QoS.
02107 CORBA::Boolean
02108 TAO_StreamEndPoint::modify_QoS (AVStreams::streamQoS &new_qos,
02109                                 const AVStreams::flowSpec &the_flows)
02110 {
02111   if (TAO_debug_level > 0)
02112   ACE_DEBUG ((LM_DEBUG,
02113               "TAO_StreamEndPoint::modify_QoS\n"));
02114 
02115   int result =  this->change_qos (new_qos, the_flows);
02116 
02117   if (result != 0)
02118     return 0;
02119 
02120   return 1;
02121 
02122 }
02123 
02124 // Sets the list of protocols this streamendpoint can understand.
02125 
02126 CORBA::Boolean
02127 TAO_StreamEndPoint::set_protocol_restriction (const AVStreams::protocolSpec &protocols)
02128 {
02129   try
02130     {
02131       CORBA::Any protocol_restriction_any;
02132 
02133       protocol_restriction_any <<= protocols;
02134       this->define_property ("ProtocolRestriction",
02135                              protocol_restriction_any);
02136       this->protocols_ = protocols;
02137     }
02138   catch (const CORBA::Exception& ex)
02139     {
02140       ex._tao_print_exception (
02141         "TAO_StreamEndPoint::set_protocol_restriction");
02142       return 0;
02143     }
02144   return 1;
02145 }
02146 
02147 
02148 void
02149 TAO_StreamEndPoint::disconnect (const AVStreams::flowSpec &the_spec)
02150 {
02151   ACE_UNUSED_ARG (the_spec);
02152 }
02153 
02154 // Sets the status of the flow protocol.
02155 
02156 void
02157 TAO_StreamEndPoint::set_FPStatus (const AVStreams::flowSpec &/*the_spec*/,
02158                                   const char *fp_name,
02159                                   const CORBA::Any &fp_settings)
02160 {
02161   if (ACE_OS::strcmp (fp_name, "SFP1.0") != 0)
02162     return;
02163   fp_settings >>= this->sfp_status_;
02164   // @@Naga: We should call set_FPStatus on all the protocol objects.
02165 }
02166 
02167 
02168 CORBA::Object_ptr
02169 TAO_StreamEndPoint::get_fep (const char *flow_name)
02170 {
02171   ACE_CString fep_name_key (flow_name);
02172   AVStreams::FlowEndPoint_var fep_entry;
02173   if (this->fep_map_.find (fep_name_key, fep_entry) == 0)
02174     return fep_entry._retn();
02175   return 0;
02176 }
02177 
02178 char*
02179 TAO_StreamEndPoint::add_fep_i_add_property (AVStreams::FlowEndPoint_ptr fep)
02180 {
02181   ACE_CString flow_name;
02182 
02183   try
02184     {
02185       // exception implies the flow name is not defined and is system
02186       // generated.
02187       flow_name = "flow";
02188       char tmp[255];
02189       ACE_OS::sprintf (tmp, "%u", this->flow_num_++);
02190       flow_name += tmp;
02191 
02192       CORBA::Any flowname_any;
02193       flowname_any <<= flow_name.c_str ();
02194       fep->define_property ("Flow",
02195                             flowname_any);
02196     }
02197   catch (const CORBA::Exception& ex)
02198     {
02199       ex._tao_print_exception ("TAO_StreamEndPoint::add_fep");
02200       return 0;
02201     }
02202   return ACE_OS::strdup( flow_name.c_str () );
02203 }
02204 
02205 char*
02206 TAO_StreamEndPoint::add_fep_i (AVStreams::FlowEndPoint_ptr fep)
02207 {
02208   CORBA::String_var flow_name;
02209   try
02210     {
02211       CORBA::Any_var flow_name_any =
02212         fep->get_property_value ("FlowName");
02213 
02214       const char *tmp;
02215       flow_name_any >>= tmp;
02216       flow_name = CORBA::string_dup (tmp);
02217     }
02218   catch (const CORBA::Exception&)
02219     {
02220       flow_name =
02221         this->add_fep_i_add_property (fep);
02222     }
02223   return flow_name._retn ();
02224 }
02225 
02226 char *
02227 TAO_StreamEndPoint::add_fep (CORBA::Object_ptr fep_obj)
02228 {
02229   AVStreams::FlowEndPoint_var fep =
02230     AVStreams::FlowEndPoint::_narrow (fep_obj);
02231 
02232   CORBA::String_var flow_name =
02233     this->add_fep_i (fep.in ());
02234 
02235   try
02236     {
02237       fep->lock ();
02238       // Add it to the sequence of flowNames supported.
02239       // put the flowname and the flowendpoint in a hashtable.
02240       ACE_CString fep_name_key (CORBA::string_dup (flow_name.in ()));
02241       if (this->fep_map_.bind (fep_name_key, AVStreams::FlowEndPoint::_duplicate (fep.in ())) != 0)
02242         {
02243           throw AVStreams::streamOpFailed ();
02244         }
02245       // increment the flow count.
02246       this->flow_count_++;
02247       this->flows_.length (this->flow_count_);
02248       this->flows_[this->flow_count_-1] = flow_name;
02249       // define/modify the "Flows" property.
02250       CORBA::Any flows_any;
02251       flows_any <<= this->flows_;
02252       this->define_property ("Flows",
02253                              flows_any);
02254     }
02255   catch (const CORBA::Exception& ex)
02256     {
02257       ex._tao_print_exception ("TAO_StreamEndPoint::add_fep");
02258       return 0;
02259     }
02260   return flow_name._retn ();
02261 }
02262 
02263 
02264 void
02265 TAO_StreamEndPoint::remove_fep (const char *flow_name)
02266 {
02267   try
02268     {
02269       ACE_CString fep_name_key (flow_name);
02270       AVStreams::FlowEndPoint_var fep_entry;
02271       // Remove the fep from the hash table.
02272       if (this->fep_map_.unbind (fep_name_key, fep_entry)!= 0)
02273         throw AVStreams::streamOpFailed ();
02274       // redefine the "Flows" property
02275       AVStreams::flowSpec new_flows (this->flows_.length ());
02276       for (u_int i=0, j=0 ; i <this->flows_.length (); i++)
02277         if (ACE_OS::strcmp (flow_name, this->flows_[i]) != 0)
02278           new_flows[j++] = this->flows_[i];
02279 
02280       CORBA::Any flows;
02281       flows <<= new_flows;
02282       this->flows_ = new_flows;
02283       this->define_property ("Flows",
02284                              flows);
02285     }
02286   catch (const CORBA::Exception& ex)
02287     {
02288       ex._tao_print_exception ("TAO_StreamEndPoint::remove_fep");
02289     }
02290 }
02291 
02292 // Sets the negotiator object.
02293 void
02294 TAO_StreamEndPoint::set_negotiator (AVStreams::Negotiator_ptr new_negotiator)
02295 {
02296   try
02297     {
02298       CORBA::Any negotiator;
02299       negotiator <<= new_negotiator;
02300       this->define_property ("Negotiator",
02301                              negotiator);
02302       this->negotiator_ = AVStreams::Negotiator::_duplicate (new_negotiator);
02303     }
02304   catch (const CORBA::Exception& ex)
02305     {
02306       ex._tao_print_exception (
02307         "TAO_StreamEndPoint::set_negotiator");
02308     }
02309 }
02310 
02311 
02312 // Sets the public key used for this streamendpoint.
02313 void
02314 TAO_StreamEndPoint::set_key (const char *flow_name,
02315                              const AVStreams::key & the_key)
02316 {
02317   try
02318     {
02319       this->key_ = the_key;
02320       CORBA::Any PublicKey;
02321       PublicKey <<= the_key;
02322       char PublicKey_property [BUFSIZ];
02323       ACE_OS::sprintf (PublicKey_property, "%s_PublicKey", flow_name);
02324       this->define_property (PublicKey_property,
02325                              PublicKey);
02326     }
02327   catch (const CORBA::Exception& ex)
02328     {
02329       ex._tao_print_exception ("TAO_StreamEndPoint::set_key");
02330     }
02331 }
02332 
02333 // Set the source id.
02334 void
02335 TAO_StreamEndPoint::set_source_id (CORBA::Long source_id)
02336 {
02337   this->source_id_ = source_id;
02338 }
02339 
02340 CORBA::Boolean
02341 TAO_StreamEndPoint::multiconnect (AVStreams::streamQoS &/*the_qos*/,
02342                                   AVStreams::flowSpec &/*flow_spec*/)
02343 {
02344   if (TAO_debug_level > 0)
02345     ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::multiconnect\n"));
02346   return 0;
02347 }
02348 
02349 TAO_StreamEndPoint::~TAO_StreamEndPoint (void)
02350 {
02351   //this->handle_close ();
02352   TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
02353   TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
02354 
02355   int i=0;
02356   // @@ Naga: Will the iterator always give the entries in the order of insertion.
02357   // or is it an implementation fact of ACE containers.
02358   for ( ; begin != end; ++begin, ++i)
02359     {
02360 //       if (i >= FLOWSPEC_MAX)
02361 //         {
02362           TAO_FlowSpec_Entry *entry = *begin;
02363           delete entry;
02364           //        }
02365     }
02366   begin = this->reverse_flow_spec_set.begin ();
02367   end = this->reverse_flow_spec_set.end ();
02368   i = 0;
02369   for (; begin != end; ++begin)
02370     {
02371 //       if (i >= FLOWSPEC_MAX)
02372 //         {
02373           TAO_FlowSpec_Entry *entry = *begin;
02374           delete entry;
02375           //        }
02376     }
02377 }
02378 
02379 
02380 // ----------------------------------------------------------------------
02381 // TAO_StreamEndPoint_A
02382 // ----------------------------------------------------------------------
02383 
02384 TAO_StreamEndPoint_A::TAO_StreamEndPoint_A (void)
02385 {
02386   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_StreamEndPoint_A::TAO_StreamEndPoint_A: created\n"));
02387 }
02388 
02389 // IP Multicast style connect.
02390 CORBA::Boolean
02391 TAO_StreamEndPoint_A::multiconnect (AVStreams::streamQoS &stream_qos,
02392                                     AVStreams::flowSpec &flow_spec)
02393 {
02394   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPointA::multiconnect\n"));
02395   try
02396     {
02397       int result = 0;
02398       TAO_AV_QoS qos (stream_qos);
02399       for (u_int i=0;i< flow_spec.length ();i++)
02400         {
02401           TAO_Forward_FlowSpec_Entry *forward_entry = 0;
02402           ACE_NEW_RETURN (forward_entry,
02403                           TAO_Forward_FlowSpec_Entry,
02404                           0);
02405           forward_entry->parse (flow_spec[i]);
02406           ACE_CString mcast_key (forward_entry->flowname ());
02407           AVStreams::FlowEndPoint_var flow_endpoint;
02408 
02409           // @@Naga: There is a problem in the full profile case for multiconnect. Since
02410           // multiconnect on sep_a is called everytime a sink is added and if called for
02411           // the same flow twice, the following code will just call add producer on the flow connection.
02412           // It is however very hard to find out if the flow producer is already in the flow connection
02413           // since comparing object references will not work and the flowproducer reference is
02414           // generated by _narrow. Our only hope is that _narrow on the same fep will return the same
02415           // pointer for the flowproducer in which case we can find out if the flowproducer exists in
02416           // fep set for that flowconnection.
02417           if (this->fep_map_.find (mcast_key, flow_endpoint) == 0)
02418             {
02419               try
02420                 {
02421                   AVStreams::QoS flow_qos;
02422                   result = qos.get_flow_qos (forward_entry->flowname (), flow_qos);
02423                   if (result < 0)
02424                     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "QoS not found for %s\n", forward_entry->flowname ()));
02425                   // Narrow it to FlowProducer.
02426                   AVStreams::FlowProducer_var producer;
02427                   producer = AVStreams::FlowProducer::_narrow (flow_endpoint.in());
02428                   //
02429                   // Else narrow succeeeded.
02430                   if (!CORBA::is_nil (producer.in ()))
02431                     {
02432                       AVStreams::FlowConnection_var flow_connection;
02433                       try
02434                         {
02435                           if (CORBA::is_nil (this->streamctrl_.in ()))
02436                               {
02437                                 CORBA::Any_var streamctrl_any;
02438                                 streamctrl_any = this->get_property_value ("Related_StreamCtrl");
02439                                 AVStreams::StreamCtrl_ptr streamctrl;
02440                                 streamctrl_any.in () >>= streamctrl;
02441                                 this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (streamctrl);
02442                               }
02443 
02444                           CORBA::Object_var flow_connection_obj =
02445                             this->streamctrl_->get_flow_connection (forward_entry->flowname ());
02446                           flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj.in ());
02447                         }
02448                       catch (const CORBA::Exception&)
02449                         {
02450                           TAO_FlowConnection *flowConnection;
02451                           ACE_NEW_RETURN (flowConnection,
02452                                           TAO_FlowConnection,
02453                                           0);
02454                           //@@ Strategize the multicast address allocation.
02455                           flowConnection->set_mcast_addr (this->mcast_addr_, this->mcast_port_);
02456                           this->mcast_port_++;
02457                           flowConnection->set_protocol (forward_entry->carrier_protocol_str ());
02458                           flow_connection = flowConnection->_this ();
02459                           this->streamctrl_->set_flow_connection (forward_entry->flowname (),
02460                                                                   flow_connection.in ());
02461                         }
02462                       if (ACE_OS::strcmp (forward_entry->flow_protocol_str (), "") != 0)
02463                         {
02464                           CORBA::Any fp_settings;
02465                           flow_connection->use_flow_protocol (forward_entry->flow_protocol_str (),
02466                                                               fp_settings);
02467                         }
02468                       result = flow_connection->add_producer (producer.in (),
02469                                                               flow_qos);
02470                       if (result == 0)
02471                         ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_A::multiconnect: add_producer failed\n"), 0);
02472                     }
02473                 }
02474               catch (const CORBA::Exception& ex)
02475                 {
02476                   // Narrow failed and since its not a flowproducer its an error.
02477                   ex._tao_print_exception (
02478                     "FlowProducer::_narrow");
02479                   ACE_ERROR_RETURN ((LM_ERROR, "sep_a doesn't contain a flowproducer"), 0);
02480                 }
02481             }
02482           else
02483             {
02484               ACE_INET_Addr *mcast_addr;
02485               TAO_FlowSpec_Entry *entry = 0;
02486               result = this->mcast_entry_map_.find (mcast_key, entry);
02487               if (result == 0)
02488                 {
02489                   mcast_addr = dynamic_cast<ACE_INET_Addr *> (entry->address ());
02490                   char str_addr [BUFSIZ];
02491                   result = mcast_addr->addr_to_string (str_addr, BUFSIZ);
02492                   if (result < 0)
02493                     ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPointA::multiconnect ::addr_to_string failed\n"), 0);
02494                   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint_A::multiconnect:%s\n", str_addr));
02495                   TAO_Forward_FlowSpec_Entry new_entry (entry->flowname (),
02496                                                         entry->direction_str (),
02497                                                         entry->format (),
02498                                                         entry->flow_protocol_str (),
02499                                                         entry->carrier_protocol_str (),
02500                                                         entry->address ());
02501                   flow_spec[i] = CORBA::string_dup (new_entry.entry_to_string ());
02502                 }
02503               else
02504                 {
02505 
02506                   switch (forward_entry->direction ())
02507                     {
02508                     case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
02509                       {
02510                         ACE_NEW_RETURN (mcast_addr,
02511                                         ACE_INET_Addr,
02512                                         0);
02513                         mcast_addr->set (this->mcast_port_, this->mcast_addr_.c_str ());
02514                         this->mcast_port_++;
02515                         char buf[BUFSIZ];
02516                         mcast_addr->addr_to_string (buf, BUFSIZ);
02517                         if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%s\n", buf));
02518                         TAO_Forward_FlowSpec_Entry *new_entry;
02519                         ACE_NEW_RETURN (new_entry,
02520                                         TAO_Forward_FlowSpec_Entry (forward_entry->flowname (),
02521                                                                     forward_entry->direction_str (),
02522                                                                     forward_entry->format (),
02523                                                                     forward_entry->flow_protocol_str (),
02524                                                                     forward_entry->carrier_protocol_str (),
02525                                                                     mcast_addr),
02526                                         0);
02527                         flow_spec[i] = CORBA::string_dup (new_entry->entry_to_string ());
02528                         //new_entry->is_multicast (1);
02529 
02530                         this->forward_flow_spec_set.insert (new_entry);
02531                         TAO_AV_Acceptor_Registry *acceptor_registry = TAO_AV_CORE::instance ()->acceptor_registry ();
02532                         result = acceptor_registry->open (this,
02533                                                           TAO_AV_CORE::instance (),
02534                                                           this->forward_flow_spec_set);
02535                         if (result < 0)
02536                           ACE_ERROR_RETURN ((LM_ERROR, "Acceptor_Registry::open failed\n"), 0);
02537                         result = this->mcast_entry_map_.bind (mcast_key, new_entry);
02538                         if (result < 0)
02539                           ACE_ERROR_RETURN ((LM_ERROR, "mcast_entry::bind failed"), 0);
02540                       }
02541                       break;
02542                     case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
02543                       // OUT implies we're the sink.
02544                       break;
02545                     default:
02546                       break;
02547                     }
02548                 }
02549             }
02550         }
02551     }
02552   catch (const CORBA::Exception& ex)
02553     {
02554       ex._tao_print_exception (
02555         "TAO_StreamEndPoint_A::multiconnect");
02556       return 0;
02557     }
02558   return 1;
02559 }
02560 
02561 // ATM style Multicast is not supported yet.
02562 CORBA::Boolean
02563 TAO_StreamEndPoint_A::connect_leaf (AVStreams::StreamEndPoint_B_ptr /* the_ep */,
02564                                     AVStreams::streamQoS & /* the_qos */,
02565                                     const AVStreams::flowSpec & /* the_flows */)
02566 {
02567   throw AVStreams::notSupported ();
02568 }
02569 
02570 // Multicast not supported yet.
02571 void
02572 TAO_StreamEndPoint_A::disconnect_leaf (AVStreams::StreamEndPoint_B_ptr /* the_ep */,
02573                                        const AVStreams::flowSpec & /* theSpec */)
02574 
02575 {
02576 
02577   throw AVStreams::notSupported ();
02578 
02579 }
02580 
02581 TAO_StreamEndPoint_A::~TAO_StreamEndPoint_A (void)
02582 {
02583 }
02584 
02585 // ----------------------------------------------------------------------
02586 // TAO_StreamEndPoint_B
02587 // ----------------------------------------------------------------------
02588 
02589 TAO_StreamEndPoint_B::TAO_StreamEndPoint_B (void)
02590 {
02591   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
02592                                        "\n(%P|%t) TAO_StreamEndPoint_B::TAO_StreamEndPoint_B: created"));
02593 }
02594 
02595 CORBA::Boolean
02596 TAO_StreamEndPoint_B::multiconnect (AVStreams::streamQoS &stream_qos,
02597                                     AVStreams::flowSpec &flow_spec)
02598 {
02599   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint_B::multiconnect\n"));
02600   try
02601     {
02602       int result = 0;
02603       TAO_AV_QoS qos (stream_qos);
02604       for (u_int i=0;i< flow_spec.length ();i++)
02605         {
02606           TAO_Forward_FlowSpec_Entry *forward_entry;
02607           ACE_NEW_RETURN (forward_entry,
02608                           TAO_Forward_FlowSpec_Entry,
02609                           0);
02610           forward_entry->parse (flow_spec[i]);
02611           ACE_CString mcast_key (forward_entry->flowname ());
02612           AVStreams::FlowEndPoint_var flow_endpoint;
02613           if (this->fep_map_.find (mcast_key, flow_endpoint ) == 0)
02614             {
02615               AVStreams::FlowConsumer_var consumer;
02616               try
02617                 {
02618                   consumer = AVStreams::FlowConsumer::_narrow (flow_endpoint.in ());
02619                 }
02620               catch (const CORBA::Exception& ex)
02621                 {
02622                   ex._tao_print_exception (
02623                     "FlowConsumer::_narrow");
02624                   ACE_ERROR_RETURN ((LM_ERROR, "sep_b doesn't contain a flowconsumer"), 0);
02625                 }
02626               AVStreams::QoS flow_qos;
02627               result = qos.get_flow_qos (forward_entry->flowname (), flow_qos);
02628               if (result < 0)
02629                 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "QoS not found for %s", forward_entry->flowname ()));
02630               AVStreams::FlowConnection_var flow_connection;
02631               try
02632                 {
02633                   if (CORBA::is_nil (this->streamctrl_.in ()))
02634                     {
02635                       CORBA::Any_var streamctrl_any;
02636                       streamctrl_any = this->get_property_value ("Related_StreamCtrl");
02637                       AVStreams::StreamCtrl_ptr streamctrl;
02638                       streamctrl_any.in () >>= streamctrl;
02639                       this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (streamctrl);
02640                     }
02641                   CORBA::Object_var flow_connection_obj =
02642                     this->streamctrl_->get_flow_connection (forward_entry->flowname ());
02643                   flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj.in ());
02644                 }
02645               catch (const CORBA::Exception& ex)
02646                 {
02647                   ex._tao_print_exception (
02648                     "TAO_StreamEndPoint_B::multiconnect::get_flow_connection");
02649                   return 0;
02650                 }
02651               result = flow_connection->add_consumer (consumer.in (),
02652                                                       flow_qos);
02653               if (result == 0)
02654                 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_B::multiconnect:add_consumer failed\n"), 0);
02655             }
02656           else
02657             {
02658               TAO_FlowSpec_Entry *mcast_entry = 0;
02659               ACE_INET_Addr *mcast_addr;
02660               mcast_addr = dynamic_cast<ACE_INET_Addr *> (forward_entry->address ());
02661               if (mcast_addr == 0)
02662                 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_B::multiconnect::Address missing in flowspec_entry\n"), 0);
02663               result = this->mcast_entry_map_.find (mcast_key, mcast_entry);
02664               if (result == 0)
02665                 {
02666                   ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_B::multiconnect::handler already found\n"), 0);
02667                 }
02668               else
02669                 {
02670                   switch (forward_entry->direction ())
02671                     {
02672                     case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
02673                       {
02674                         // IN means we're the sink.
02675                         // @@ We have to take care of this.
02676                         //                 result = this->make_dgram_mcast_flow_handler (mcast_dgram);
02677                         //                 if (result < 0)
02678                         //                   return 0;
02679 
02680                         this->forward_flow_spec_set.insert (forward_entry);
02681                         TAO_AV_Connector_Registry *connector_registry = TAO_AV_CORE::instance ()->connector_registry ();
02682                         result = connector_registry->open (this,
02683                                                            TAO_AV_CORE::instance (),
02684                                                            this->forward_flow_spec_set);
02685                         if (result < 0)
02686                           ACE_ERROR_RETURN ((LM_ERROR, "connector_registry::open failed\n"), 0);
02687                         result = this->mcast_entry_map_.bind (mcast_key, forward_entry);
02688                         if (result < 0)
02689                           ACE_ERROR_RETURN ((LM_ERROR, "dgram_mcast_handler::bind failed"), 0);
02690                       }
02691                       break;
02692                     case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
02693                       // OUT implies we're the source., which is an error.
02694                       break;
02695                     default:
02696                       break;
02697                     }
02698                 }
02699             }
02700         }
02701     }
02702   catch (const CORBA::Exception& ex)
02703     {
02704       ex._tao_print_exception (
02705         "TAO_StreamEndPoint_B::multiconnect");
02706       return 0;
02707     }
02708   return 1;
02709 }
02710 
02711 TAO_StreamEndPoint_B::~TAO_StreamEndPoint_B (void)
02712 {
02713 }
02714 
02715 // ----------------------------------------------------------------------
02716 // TAO_VDev
02717 // ----------------------------------------------------------------------
02718 
02719 TAO_VDev::TAO_VDev (void)
02720 {
02721   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
02722               "(%P|%t) TAO_VDev::TAO_VDev: created\n"));
02723 }
02724 
02725 // StreamCtrl will call this to give us a reference to itself, and to
02726 // our peer vdev..
02727 CORBA::Boolean
02728 TAO_VDev::set_peer (AVStreams::StreamCtrl_ptr the_ctrl,
02729                     AVStreams::VDev_ptr the_peer_dev,
02730                     AVStreams::streamQoS &the_qos,
02731                     const AVStreams::flowSpec &the_spec)
02732 {
02733   ACE_UNUSED_ARG (the_qos);
02734   ACE_UNUSED_ARG (the_spec);
02735 
02736   CORBA::Boolean result = 0;
02737   try
02738     {
02739       if (TAO_debug_level > 0)
02740         ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_VDev::set_peer: called\n"));
02741 
02742 
02743       CORBA::Any anyval;
02744       anyval <<= the_peer_dev;
02745       this->define_property ("Related_VDev",
02746                              anyval);
02747 
02748 
02749       this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (the_ctrl);
02750       this->peer_ = AVStreams::VDev::_duplicate (the_peer_dev);
02751 
02752       CORBA::Any_var anyptr;
02753       anyptr = this->peer_->get_property_value ("Related_MediaCtrl");
02754 
02755       CORBA::Object_ptr media_ctrl_obj = 0;
02756 
02757       anyptr.in () >>= CORBA::Any::to_object(media_ctrl_obj);
02758 
02759       result = this->set_media_ctrl (media_ctrl_obj);
02760     }
02761   catch (const CORBA::Exception& ex)
02762     {
02763       ex._tao_print_exception ("TAO_VDev::set_peer");
02764       return 0;
02765     }
02766   return result;
02767 }
02768 
02769 CORBA::Boolean
02770 TAO_VDev::set_media_ctrl (CORBA::Object_ptr media_ctrl)
02771 
02772 {
02773   //  since the media ctrl is not stored or used, delete it.
02774 
02775   CORBA::release( media_ctrl);
02776 
02777   return 1;
02778 }
02779 
02780 // Sets the multicast VDev peer.
02781 CORBA::Boolean
02782 TAO_VDev::set_Mcast_peer (AVStreams::StreamCtrl_ptr /* the_ctrl */,
02783                           AVStreams::MCastConfigIf_ptr mcast_peer,
02784                           AVStreams::streamQoS &/* the_qos */,
02785                           const AVStreams::flowSpec &/* the_spec */)
02786 {
02787   this->mcast_peer_ = AVStreams::MCastConfigIf::_duplicate (mcast_peer);
02788   return 1;
02789 }
02790 
02791 // applications should override this to handle configuration changes.
02792 void
02793 TAO_VDev::configure (const CosPropertyService::Property &/*the_config_mesg*/)
02794 {
02795 }
02796 
02797 // sets the media format used for the flowname as a property.
02798 void
02799 TAO_VDev::set_format (const char *flowName,
02800                       const char *format_name)
02801 {
02802   try
02803     {
02804       if (flowName == 0 || format_name == 0)
02805         ACE_ERROR ((LM_ERROR, "TAO_VDev::set_format: flowName or format_name is null\n"));
02806       char format_property [BUFSIZ];
02807       ACE_OS::sprintf (format_property, "%s_currFormat", flowName);
02808       CORBA::Any format;
02809       format <<= format_name;
02810       this->define_property (format_property,
02811                              format);
02812     }
02813   catch (const CORBA::Exception& ex)
02814     {
02815       ex._tao_print_exception ("TAO_VDev::set_format");
02816       return;
02817     }
02818   return;
02819 }
02820 
02821 // sets the device parameters for the flowname as a property.
02822 void
02823 TAO_VDev::set_dev_params (const char *flowName,
02824                           const CosPropertyService::Properties &new_params)
02825 {
02826   try
02827     {
02828       if (flowName == 0)
02829         ACE_ERROR ((LM_ERROR, "TAO_VDev::set_dev_params:flowName is null\n"));
02830       char devParams_property[BUFSIZ];
02831       ACE_OS::sprintf (devParams_property, "%s_devParams", flowName);
02832       CORBA::Any devParams;
02833       devParams <<= new_params;
02834       this->define_property (devParams_property,
02835                              devParams);
02836     }
02837   catch (const CORBA::Exception& ex)
02838     {
02839       ex._tao_print_exception ("TAO_VDev::set_dev_params");
02840       return;
02841     }
02842   return;
02843 }
02844 
02845 // QoS Modification should be handled by the application currently.
02846 CORBA::Boolean
02847 TAO_VDev::modify_QoS (AVStreams::streamQoS &the_qos,
02848                       const AVStreams::flowSpec &flowspec)
02849 {
02850   if (TAO_debug_level > 0)
02851   ACE_DEBUG ((LM_DEBUG,
02852              "TAO_VDev::modify_QoS\n"));
02853 
02854   if (flowspec.length () != 0)
02855     {
02856       TAO_Forward_FlowSpec_Entry entry;
02857       entry.parse (flowspec [0]);
02858       int direction = entry.direction ();
02859       if (direction == 0)
02860         {
02861           AVStreams::StreamEndPoint_A_ptr sep_a;
02862 
02863           CORBA::Any_ptr streamendpoint_a_any =
02864           this->get_property_value ("Related_StreamEndpoint");
02865 
02866           *streamendpoint_a_any >>= sep_a;
02867           if (sep_a != 0)
02868             {
02869               sep_a->modify_QoS (the_qos, flowspec);
02870             }
02871           else ACE_DEBUG ((LM_DEBUG,
02872                            "Stream EndPoint Not Found\n"));
02873         }
02874       else
02875         {
02876           AVStreams::StreamEndPoint_B_ptr sep_b;
02877 
02878           CORBA::Any_ptr streamendpoint_b_any =
02879           this->get_property_value ("Related_StreamEndpoint");
02880           *streamendpoint_b_any >>= sep_b;
02881           sep_b->modify_QoS (the_qos, flowspec);
02882         }
02883   }
02884   return 1;
02885 }
02886 
02887 TAO_VDev::~TAO_VDev (void)
02888 {
02889 }
02890 
02891 // ----------------------------------------------------------------------
02892 // TAO_MMDevice
02893 // ----------------------------------------------------------------------
02894 
02895 
02896 TAO_MMDevice::TAO_MMDevice (TAO_AV_Endpoint_Strategy *endpoint_strategy)
02897   : endpoint_strategy_ (endpoint_strategy),
02898     flow_count_ (0),
02899     flow_num_ (0),
02900     stream_ctrl_ (0)
02901 {
02902 }
02903 
02904 // create a streamctrl which is colocated with me, use that streamctrl
02905 // to bind the peer_device with me.
02906 AVStreams::StreamCtrl_ptr
02907 TAO_MMDevice::bind (AVStreams::MMDevice_ptr peer_device,
02908                     AVStreams::streamQoS &the_qos,
02909                     CORBA::Boolean_out is_met,
02910                     const AVStreams::flowSpec &the_spec)
02911 {
02912   AVStreams::StreamCtrl_ptr streamctrl (AVStreams::StreamCtrl::_nil ());
02913   try
02914     {
02915       ACE_UNUSED_ARG (is_met);
02916       ACE_NEW_RETURN (this->stream_ctrl_,
02917                       TAO_StreamCtrl,
02918                       0);
02919       AVStreams::MMDevice_var mmdevice = this->_this ();
02920       this->stream_ctrl_->bind_devs (peer_device,
02921                                      mmdevice.in (),
02922                                      the_qos,
02923                                      the_spec);
02924       streamctrl = this->stream_ctrl_->_this ();
02925     }
02926   catch (const CORBA::Exception& ex)
02927     {
02928       ex._tao_print_exception ("TAO_MMDevice::bind");
02929       return streamctrl;
02930     }
02931   return streamctrl;
02932 }
02933 
02934 // Multicast is not supported yet.
02935 AVStreams::StreamCtrl_ptr
02936 TAO_MMDevice::bind_mcast (AVStreams::MMDevice_ptr first_peer,
02937                           AVStreams::streamQoS &the_qos,
02938                           CORBA::Boolean_out is_met,
02939                           const AVStreams::flowSpec &the_spec)
02940 {
02941   ACE_UNUSED_ARG (first_peer);
02942   ACE_UNUSED_ARG (the_qos);
02943   ACE_UNUSED_ARG (is_met);
02944   ACE_UNUSED_ARG (the_spec);
02945 
02946   return 0;
02947 }
02948 
02949 AVStreams::StreamEndPoint_ptr
02950 TAO_MMDevice::create_A_B (MMDevice_Type type,
02951                           AVStreams::StreamCtrl_ptr streamctrl,
02952                           AVStreams::VDev_out the_vdev,
02953                           AVStreams::streamQoS &the_qos,
02954                           CORBA::Boolean_out met_qos,
02955                           char *&/*named_vdev*/,
02956                           const AVStreams::flowSpec &flow_spec)
02957 {
02958   AVStreams::StreamEndPoint_A_ptr sep_a (AVStreams::StreamEndPoint_A::_nil ());
02959   AVStreams::StreamEndPoint_B_ptr sep_b (AVStreams::StreamEndPoint_B::_nil ());
02960   AVStreams::StreamEndPoint_ptr sep (AVStreams::StreamEndPoint::_nil ());
02961   try
02962     {
02963       switch (type)
02964         {
02965         case MMDEVICE_A:
02966           {
02967             if (this->endpoint_strategy_->create_A (sep_a,
02968                                                     the_vdev) == -1)
02969               ACE_ERROR_RETURN ((LM_ERROR,
02970                                  "TAO_MMDevice::create_A_B (%P|%t) - "
02971                                  "error in create_A\n"),
02972                                 0);
02973             sep = sep_a;
02974           }
02975           break;
02976         case MMDEVICE_B:
02977           {
02978             if (this->endpoint_strategy_->create_B (sep_b,
02979                                                     the_vdev) == -1)
02980               ACE_ERROR_RETURN ((LM_ERROR,
02981                                  "TAO_MMDevice::create_A_B (%P|%t) - "
02982                                  "error in create_B\n"),
02983                                 0);
02984             sep = sep_b;
02985           }
02986           break;
02987         default:
02988           break;
02989         }
02990       if (this->fdev_map_.current_size () > 0)
02991         {
02992           TAO_AV_QoS qos (the_qos);
02993           // create flowendpoints from the FDevs.
02994           for (u_int i=0;i<flow_spec.length ();i++)
02995             {
02996               TAO_Forward_FlowSpec_Entry forward_entry;
02997               forward_entry.parse (flow_spec[i]);
02998               ACE_CString flow_key (forward_entry.flowname ());
02999               AVStreams::FDev_var flow_dev;
03000               AVStreams::FlowConnection_var flowconnection;
03001               try
03002                 {
03003                   // Get the flowconnection for this flow.
03004                   //static int blah = 0; if(blah == 1){blah=0; abort();}else{blah=1;}
03005                   CORBA::Object_var flowconnection_obj =
03006                     streamctrl->get_flow_connection (forward_entry.flowname ());
03007                   ACE_OS::printf("successfully called get_flow_connection\n");
03008                   if (!CORBA::is_nil (flowconnection_obj.in ()))
03009                     {
03010                       flowconnection = AVStreams::FlowConnection::_narrow (flowconnection_obj.in ());
03011                     }
03012                 }
03013               catch (const AVStreams::noSuchFlow&)
03014                 {
03015                           TAO_FlowConnection *flowConnection;
03016                           ACE_NEW_RETURN (flowConnection,
03017                                           TAO_FlowConnection,
03018                                           0);
03019                           flowconnection = flowConnection->_this ();
03020                           streamctrl->set_flow_connection (forward_entry.flowname(),
03021                                                      flowconnection.in ());
03022                 }
03023               catch (const CORBA::Exception& ex)
03024                 {
03025                   //if (TAO_debug_level >= 0)
03026                     ex._tao_print_exception (
03027                       "TAO_MMDevice::create_a::get_flow_connection");
03028                 }
03029 
03030               int result = this->fdev_map_.find (flow_key, flow_dev);
03031               if (result < 0)
03032                 ACE_ERROR_RETURN ((LM_ERROR, "(%N,%l) fdev_map::find failed\n"), 0);
03033 
03034               CORBA::String_var named_fdev;
03035               AVStreams::FlowEndPoint_var flow_endpoint;
03036               AVStreams::QoS flow_qos;
03037               result = qos.get_flow_qos (forward_entry.flowname (), flow_qos);
03038               if (result < 0)
03039                 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%N,%l) get_flow_qos failed for %s\n", forward_entry.flowname ()));
03040               switch (forward_entry.direction ())
03041                 {
03042                 case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
03043                   {
03044                     switch (type)
03045                       {
03046                       case MMDEVICE_A:
03047                         {
03048                           // In implies flow is from A to B and
03049                           // hence A is the producer for this flow and B is the consumer for this flow.
03050                           // We have to create a producer from the FDev for this flow.
03051                           flow_endpoint =
03052                             flow_dev->create_producer (flowconnection.in (),
03053                                                        flow_qos,
03054                                                        met_qos,
03055                                                        named_fdev.inout ());
03056                         }
03057                         break;
03058                       case MMDEVICE_B:
03059                         {
03060                           flow_endpoint =
03061                             flow_dev->create_consumer (flowconnection.in (),
03062                                                        flow_qos,
03063                                                        met_qos,
03064                                                        named_fdev.inout ());
03065                         }
03066                         break;
03067                       }
03068                   }
03069                   break;
03070                 case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
03071                   {
03072                     switch (type)
03073                       {
03074                       case MMDEVICE_A:
03075                         {
03076                           // OUT implies flow is from B to A and
03077                           // hence B is the producer for this flow and A is the consumer for this flow.
03078                           // We have to create a consumer from the FDev for this flow.
03079                           flow_endpoint =
03080                             flow_dev->create_consumer (flowconnection.in (),
03081                                                        flow_qos,
03082                                                        met_qos,
03083                                                        named_fdev.inout ());
03084                         }
03085                         break;
03086                       case MMDEVICE_B:
03087                         {
03088                           // In implies flow is from A to B and
03089                           // hence A is the producer for this flow and B is the consumer for this flow.
03090                           // We have to create a producer from the FDev for this flow.
03091                           flow_endpoint =
03092                             flow_dev->create_producer (flowconnection.in (),
03093                                                        flow_qos,
03094                                                        met_qos,
03095                                                        named_fdev.inout ());
03096                         }
03097                         break;
03098                       }
03099                   }
03100                   break;
03101                 default:
03102                   break;
03103                 }
03104               CORBA::Any flowname_any;
03105               flowname_any <<= forward_entry.flowname ();
03106               flow_endpoint->define_property ("FlowName", flowname_any);
03107               sep->add_fep (flow_endpoint.in ());
03108             }
03109         }
03110     }
03111   catch (const CORBA::Exception& ex)
03112     {
03113       ex._tao_print_exception ("TAO_MMDevice::create_A");
03114       return sep;
03115     }
03116   return sep;
03117 }
03118 
03119 AVStreams::StreamEndPoint_A_ptr
03120 TAO_MMDevice::create_A (AVStreams::StreamCtrl_ptr streamctrl,
03121                         AVStreams::VDev_out the_vdev,
03122                         AVStreams::streamQoS &stream_qos,
03123                         CORBA::Boolean_out met_qos,
03124                         char *&named_vdev,
03125                         const AVStreams::flowSpec &flow_spec)
03126 {
03127   AVStreams::StreamEndPoint_A_ptr sep_a = 0;
03128   AVStreams::StreamEndPoint_var sep;
03129   try
03130     {
03131       sep = this->create_A_B (MMDEVICE_A, streamctrl, the_vdev, stream_qos, met_qos, named_vdev, flow_spec);
03132       sep_a = AVStreams::StreamEndPoint_A::_narrow (sep.in());
03133 
03134       ACE_ASSERT( !CORBA::is_nil( sep_a ) );
03135     }
03136   catch (const CORBA::Exception& ex)
03137     {
03138       ex._tao_print_exception ("TAO_MMDevice::create_A");
03139       return sep_a;
03140     }
03141 
03142   return sep_a;
03143 }
03144 
03145 
03146 AVStreams::StreamEndPoint_B_ptr
03147 TAO_MMDevice::create_B (AVStreams::StreamCtrl_ptr streamctrl,
03148                         AVStreams::VDev_out the_vdev,
03149                         AVStreams::streamQoS &stream_qos,
03150                         CORBA::Boolean_out met_qos,
03151                         char *&named_vdev,
03152                         const AVStreams::flowSpec &flow_spec)
03153 {
03154   AVStreams::StreamEndPoint_B_ptr sep_b = AVStreams::StreamEndPoint_B::_nil ();
03155   AVStreams::StreamEndPoint_var sep;
03156 
03157   try
03158     {
03159       sep = this->create_A_B (MMDEVICE_B, streamctrl, the_vdev, stream_qos, met_qos, named_vdev, flow_spec);
03160       sep_b = AVStreams::StreamEndPoint_B::_narrow (sep.in());
03161 
03162       ACE_ASSERT ( !CORBA::is_nil( sep_b ) );
03163     }
03164   catch (const CORBA::Exception& ex)
03165     {
03166       ex._tao_print_exception ("TAO_MMDevice::create_B");
03167       return sep_b;
03168     }
03169   return sep_b;
03170 }
03171 
03172 
03173 // destroys the streamendpoint and the Vdev.
03174 void
03175 TAO_MMDevice::destroy (AVStreams::StreamEndPoint_ptr /* the_ep */,
03176                        const char * /* vdev_name */)
03177 {
03178   // Remove self from POA.  Because of reference counting, the POA
03179   // will automatically delete the servant when all pending requests
03180   // on this servant are complete.
03181   int result = TAO_AV_Core::deactivate_servant (this);
03182   if (result < 0)
03183     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_MMDevice::destroy failed\n"));
03184 }
03185 
03186 char *
03187 TAO_MMDevice::add_fdev_i (AVStreams::FDev_ptr fdev)
03188 {
03189   char* tmp;
03190   ACE_NEW_RETURN (tmp,
03191                   char[64],
03192                   0);
03193   CORBA::String_var flow_name = tmp;
03194 
03195   try
03196     {
03197       // exception implies the flow name is not defined and is system
03198       // generated.
03199       ACE_OS::sprintf (tmp, "flow%d", flow_num_++);
03200       CORBA::Any flowname_any;
03201       flowname_any <<= flow_name.in ();
03202       fdev->define_property ("Flow", flowname_any);
03203     }
03204   catch (const CORBA::Exception& ex)
03205     {
03206       ex._tao_print_exception ("TAO_MMDevice::add_fdev");
03207       return 0;
03208     }
03209   return flow_name._retn ();
03210 }
03211 
03212 // Adds the fdev object to the MMDevice.
03213 char *
03214 TAO_MMDevice::add_fdev (CORBA::Object_ptr fdev_obj)
03215 {
03216   CORBA::String_var flow_name;
03217   AVStreams::FDev_var fdev;
03218   try
03219     {
03220       CORBA::Any_ptr flow_name_any;
03221       fdev = AVStreams::FDev::_narrow (fdev_obj);
03222 
03223       if (CORBA::is_nil (fdev.in ()))
03224           return 0;
03225 
03226 
03227       flow_name_any = fdev->get_property_value ("Flow");
03228 
03229       const char *tmp;
03230       *flow_name_any >>= tmp;
03231       flow_name = CORBA::string_dup (tmp);
03232     }
03233   catch (const CORBA::Exception&)
03234     {
03235       flow_name =
03236         this->add_fdev_i (fdev.in ());
03237     }
03238 
03239 
03240   // Add it to the sequence of flowNames supported.
03241   // put the flowname and the fdev in a hashtable.
03242   ACE_CString fdev_name_key ( flow_name.in () );
03243 
03244 
03245   if ( (this->fdev_map_.bind (fdev_name_key, fdev )) != 0)
03246     throw AVStreams::streamOpFailed ();
03247   // increment the flow count.
03248   this->flow_count_++;
03249   this->flows_.length (this->flow_count_);
03250   this->flows_ [this->flow_count_-1] = flow_name;
03251   // define/modify the "Flows" property.
03252   CORBA::Any flows_any;
03253   flows_any <<= this->flows_;
03254   try
03255     {
03256       this->define_property ("Flows",
03257                              flows_any);
03258     }
03259   catch (const CORBA::Exception& ex)
03260     {
03261       ex._tao_print_exception ("TAO_MMDevice::add_fdev");
03262       return 0;
03263     }
03264   return flow_name._retn ();
03265 }
03266 
03267 // Gets the FDev object associated with this flow.
03268 CORBA::Object_ptr
03269 TAO_MMDevice::get_fdev (const char *flow_name)
03270 {
03271 
03272   ACE_CString fdev_name_key (flow_name);
03273   AVStreams::FDev_var fdev_entry;
03274   if (this->fdev_map_.find (fdev_name_key, fdev_entry) == 0)
03275     return fdev_entry._retn() ;
03276   return 0;
03277 }
03278 
03279 // Removes the fdev from this MMDevice.
03280 void
03281 TAO_MMDevice::remove_fdev (const char *flow_name)
03282 {
03283   try
03284     {
03285       ACE_CString fdev_name_key (flow_name);
03286       AVStreams::FDev_var fdev_entry;
03287       // Remove the fep from the hash table.
03288       if (this->fdev_map_.unbind (fdev_name_key, fdev_entry)!= 0)
03289         throw AVStreams::streamOpFailed ();
03290 
03291       AVStreams::flowSpec new_flows (this->flows_.length ());
03292       for (u_int i=0, j=0 ; i <this->flows_.length (); i++)
03293         if (ACE_OS::strcmp (flow_name, this->flows_[i]) != 0)
03294           new_flows[j++] = this->flows_[i];
03295 
03296       CORBA::Any flows;
03297       flows <<= new_flows;
03298       this->flows_ = new_flows;
03299       this->define_property ("Flows",
03300                              flows);
03301     }
03302   catch (const CORBA::Exception& ex)
03303     {
03304       ex._tao_print_exception ("TAO_MMDevice::remove_fdev");
03305     }
03306 }
03307 
03308 // destructor.
03309 TAO_MMDevice::~TAO_MMDevice (void)
03310 {
03311   delete this->stream_ctrl_;
03312 }
03313 
03314 //------------------------------------------------------------------
03315 // TAO_FlowConnection
03316 //------------------------------------------------------------------
03317 
03318 // default constructor.
03319 TAO_FlowConnection::TAO_FlowConnection (void)
03320   :fp_name_ (CORBA::string_dup ("")),
03321    ip_multicast_ (0)
03322 {
03323 }
03324 
03325 // int
03326 // TAO_FlowConnection::set_mcast_addr (ACE_UINT32 mcast_addr, u_short mcast_port)
03327 // {
03328 //   this->mcast_addr_ = mcast_addr;
03329 //   this->mcast_port_ = mcast_port;
03330 //   return 0;
03331 // }
03332 
03333 int
03334 TAO_FlowConnection::set_mcast_addr (ACE_CString mcast_addr, u_short mcast_port)
03335 {
03336   this->mcast_addr_ = mcast_addr;
03337   this->mcast_port_ = mcast_port;
03338   return 0;
03339 }
03340 
03341 void
03342 TAO_FlowConnection::set_protocol (const char *protocol)
03343 {
03344   this->protocol_ = protocol;
03345 }
03346 
03347 // stop this flow.
03348 void
03349 TAO_FlowConnection::stop (void)
03350 {
03351   try
03352     {
03353       FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03354         ();
03355       for (FlowProducer_SetItor producer_end =
03356              this->flow_producer_set_.end ();
03357            producer_begin != producer_end; ++producer_begin)
03358         {
03359           (*producer_begin)->stop ();
03360         }
03361       FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03362         ();
03363       for (FlowConsumer_SetItor consumer_end =
03364              this->flow_consumer_set_.end ();
03365            consumer_begin != consumer_end; ++consumer_begin)
03366         {
03367           (*consumer_begin)->stop ();
03368         }
03369     }
03370   catch (const CORBA::Exception& ex)
03371     {
03372       ex._tao_print_exception ("TAO_FlowConnection::stop");
03373       return;
03374     }
03375 }
03376 
03377 // start this flow.
03378 void
03379 TAO_FlowConnection::start (void)
03380 {
03381   try
03382     {
03383       FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03384         ();
03385       for (FlowConsumer_SetItor consumer_end =
03386              this->flow_consumer_set_.end ();
03387            consumer_begin != consumer_end; ++consumer_begin)
03388         {
03389           (*consumer_begin)->start ();
03390         }
03391       FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03392         ();
03393       for (FlowProducer_SetItor producer_end =
03394              this->flow_producer_set_.end ();
03395            producer_begin != producer_end; ++producer_begin)
03396         {
03397           (*producer_begin)->start ();
03398         }
03399     }
03400   catch (const CORBA::Exception& ex)
03401     {
03402       ex._tao_print_exception ("TAO_FlowConnection::start");
03403       return;
03404     }
03405 }
03406 
03407 // destroy this flow.
03408 void
03409 TAO_FlowConnection::destroy (void)
03410 {
03411   try
03412     {
03413       FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03414         ();
03415       for (FlowProducer_SetItor producer_end =
03416              this->flow_producer_set_.end ();
03417            producer_begin != producer_end; ++producer_begin)
03418         {
03419           (*producer_begin)->destroy ();
03420         }
03421       FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03422         ();
03423       for (FlowConsumer_SetItor consumer_end =
03424              this->flow_consumer_set_.end ();
03425            consumer_begin != consumer_end; ++consumer_begin)
03426         {
03427           (*consumer_begin)->destroy ();
03428         }
03429     }
03430   catch (const CORBA::Exception& ex)
03431     {
03432       ex._tao_print_exception ("TAO_FlowConnection::destroy");
03433       return;
03434     }
03435   int result = TAO_AV_Core::deactivate_servant (this);
03436   if (result < 0)
03437     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::destroy failed\n"));
03438 }
03439 
03440 // modify the QoS for this flow.
03441 CORBA::Boolean
03442 TAO_FlowConnection::modify_QoS (AVStreams::QoS & new_qos)
03443 {
03444   ACE_UNUSED_ARG (new_qos);
03445   return 0;
03446 }
03447 
03448 // use the specified flow protocol for this flow.
03449 CORBA::Boolean
03450 TAO_FlowConnection::use_flow_protocol (const char * fp_name,
03451                                        const CORBA::Any & fp_settings)
03452 {
03453   this->fp_name_ = fp_name;
03454   this->fp_settings_ = fp_settings;
03455   FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03456     ();
03457   for (FlowProducer_SetItor producer_end =
03458          this->flow_producer_set_.end ();
03459        producer_begin != producer_end; ++producer_begin)
03460     {
03461       (*producer_begin)->use_flow_protocol
03462         (fp_name, fp_settings);
03463     }
03464   FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03465     ();
03466   for (FlowConsumer_SetItor consumer_end =
03467          this->flow_consumer_set_.end ();
03468        consumer_begin != consumer_end; ++consumer_begin)
03469     {
03470       (*consumer_begin)->use_flow_protocol
03471         (fp_name, fp_settings);
03472     }
03473   return 1;
03474 }
03475 
03476 void
03477 TAO_FlowConnection::push_event (const AVStreams::streamEvent & the_event)
03478 {
03479   ACE_UNUSED_ARG (the_event);
03480 }
03481 
03482 CORBA::Boolean
03483 TAO_FlowConnection::connect_devs (AVStreams::FDev_ptr a_party,
03484                                   AVStreams::FDev_ptr b_party,
03485                                   AVStreams::QoS & flow_qos)
03486 {
03487   CORBA::Boolean result = 0;
03488   try
03489     {
03490       AVStreams::FlowConnection_var flowconnection = this->_this ();
03491       CORBA::Boolean met_qos;
03492       CORBA::String_var named_fdev ((const char *)"");
03493       AVStreams::FlowProducer_var producer =
03494         a_party->create_producer (flowconnection.in (),
03495                                   flow_qos,
03496                                   met_qos,
03497                                   named_fdev.inout ());
03498       AVStreams::FlowConsumer_var consumer =
03499         b_party->create_consumer (flowconnection.in (),
03500                                   flow_qos,
03501                                   met_qos,
03502                                   named_fdev.inout ());
03503       result = this->connect (producer.in (),
03504                               consumer.in (),
03505                               flow_qos);
03506     }
03507   catch (const CORBA::Exception& ex)
03508     {
03509       ex._tao_print_exception (
03510         "TAO_FlowConnection::connect_devs");
03511       return 0;
03512     }
03513   return result;
03514 }
03515 
03516 // connect the producer and the consumer
03517 CORBA::Boolean
03518 TAO_FlowConnection::connect (AVStreams::FlowProducer_ptr producer,
03519                              AVStreams::FlowConsumer_ptr consumer,
03520                              AVStreams::QoS & the_qos)
03521 {
03522   try
03523     {
03524 
03525       AVStreams::FlowProducer_ptr flow_producer =
03526         AVStreams::FlowProducer::_duplicate (producer);
03527       AVStreams::FlowConsumer_ptr flow_consumer =
03528         AVStreams::FlowConsumer::_duplicate (consumer);
03529 
03530       this->flow_producer_set_.insert (flow_producer);
03531       this->flow_consumer_set_.insert (flow_consumer);
03532       AVStreams::FlowConnection_var flowconnection =
03533         this->_this ();
03534 
03535       flow_producer->set_peer (flowconnection.in (),
03536                                flow_consumer,
03537                                the_qos);
03538 
03539       flow_consumer->set_peer (flowconnection.in (),
03540                                flow_producer,
03541                                the_qos);
03542 
03543       char *consumer_address =
03544         flow_consumer->go_to_listen (the_qos,
03545                                      0, // false for is_mcast
03546                                      flow_producer,
03547                                      this->fp_name_.inout ());
03548 
03549       if (ACE_OS::strcmp (consumer_address, "") == 0)
03550         {
03551           // Consumer is not willing to listen, so try the producer.
03552           consumer_address = flow_producer->go_to_listen (the_qos,
03553                                                           0, // false for is_mcast
03554                                                           flow_consumer,
03555                                                           this->fp_name_.inout ());
03556           flow_consumer->connect_to_peer (the_qos,
03557                                           consumer_address,
03558                                           this->fp_name_.inout ());
03559           // @@ Naga: We have to find means to set the reverse channel for the producer.
03560           // Its broken in the point-to_point case for UDP.
03561         }
03562       else
03563         {
03564           if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::connect_to_peer addres: %s", consumer_address));
03565           flow_producer->connect_to_peer (the_qos,
03566                                           consumer_address,
03567                                           this->fp_name_.inout ());
03568         }
03569     }
03570   catch (const CORBA::Exception& ex)
03571     {
03572       ex._tao_print_exception ("TAO_FlowConnection::connect");
03573     }
03574   return 1;
03575 }
03576 
03577 
03578 CORBA::Boolean
03579 TAO_FlowConnection::disconnect (void)
03580 {
03581   return  0;
03582 }
03583 
03584 CORBA::Boolean
03585 TAO_FlowConnection::add_producer (AVStreams::FlowProducer_ptr producer,
03586                                   AVStreams::QoS & the_qos)
03587 {
03588   try
03589     {
03590           AVStreams::FlowProducer_ptr flow_producer =
03591             AVStreams::FlowProducer::_duplicate (producer);
03592           // @@Naga:Sometimes the same producer could be added with a different object reference.
03593           // There's no portable way of comparing obj refs. but we have to do this till we find
03594           // a permanent solution.For eg. 2 different flowproducers for the same flow in a
03595           // Multipoint to Multipoint binding will have the same flowname and hence cannot be
03596           // used for resolving ties.
03597           FlowProducer_SetItor begin = this->flow_producer_set_.begin ();
03598           FlowProducer_SetItor end = this->flow_producer_set_.end ();
03599           for (; begin != end; ++begin)
03600             {
03601               if ((*begin)->_is_equivalent (producer))
03602               // producer exists in the set, a duplicate.
03603               ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1);
03604             }
03605           // We need to check the return value of the insert into the flow producer
03606           // set, since multiconnect could be called many times which will lead to
03607           // a call to add_producer every time a sink is added. If the producer is already
03608           // present in our list we just return immediately.
03609           int result = this->flow_producer_set_.insert (flow_producer);
03610           if (result == 1)
03611             {
03612               // producer exists in the set, a duplicate.
03613               ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1);
03614             }
03615           CORBA::Boolean met_qos;
03616           char mcast_address[BUFSIZ];
03617           if (this->producer_address_.in () == 0)
03618             {
03619               ACE_INET_Addr mcast_addr;
03620               mcast_addr.set (this->mcast_port_,
03621                               this->mcast_addr_.c_str ()
03622                               );
03623 
03624               char buf [BUFSIZ];
03625               mcast_addr.addr_to_string (buf, BUFSIZ);
03626               ACE_OS::sprintf (mcast_address, "%s=%s", this->protocol_.in (), buf);
03627             }
03628           else
03629             {
03630               ACE_OS::strcpy (mcast_address, this->producer_address_.in ());
03631             }
03632           char *address = flow_producer->connect_mcast (the_qos,
03633                                                         met_qos,
03634                                                         mcast_address,
03635                                                         this->fp_name_.in ());
03636 
03637           if (this->producer_address_.in () == 0)
03638             {
03639               TAO_Forward_FlowSpec_Entry entry ("", "", "", "", address);
03640               if (entry.address () != 0)
03641                 {
03642                   // Internet multicasting is in use.
03643                   this->producer_address_ = address;
03644                 }
03645               else
03646                 {
03647                   // ATM Multicasting is in use.
03648                   this->ip_multicast_ = 0;
03649                 }
03650             }
03651           // set the multicast peer.
03652           if (CORBA::is_nil (this->mcastconfigif_.in ()))
03653             {
03654               ACE_NEW_RETURN (this->mcastconfigif_i_,
03655                               TAO_MCastConfigIf,
03656                               0);
03657               this->mcastconfigif_ = this->mcastconfigif_i_->_this ();
03658             }
03659           AVStreams::FlowConnection_var flowconnection = this->_this ();
03660           flow_producer->set_Mcast_peer (flowconnection.in (),
03661                                          this->mcastconfigif_.in (),
03662                                          the_qos);
03663     }
03664   catch (const CORBA::Exception& ex)
03665     {
03666       ex._tao_print_exception (
03667         "TAO_FlowConnection::add_producer");
03668       return 0;
03669     }
03670   return 1;
03671 }
03672 
03673 CORBA::Boolean
03674 TAO_FlowConnection::add_consumer (AVStreams::FlowConsumer_ptr consumer,
03675                                   AVStreams::QoS & the_qos)
03676 {
03677   try
03678     {
03679       AVStreams::FlowConsumer_ptr flow_consumer =
03680         AVStreams::FlowConsumer::_duplicate (consumer);
03681       FlowConsumer_SetItor begin = this->flow_consumer_set_.begin ();
03682       FlowConsumer_SetItor end = this->flow_consumer_set_.end ();
03683       for (; begin != end; ++begin)
03684         {
03685           if ((*begin)->_is_equivalent (consumer))
03686             // Consumer exists in the set, a duplicate.
03687             ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_Consumer: Consumer already exists\n"), 1);
03688         }
03689       int result = this->flow_consumer_set_.insert (flow_consumer);
03690       if (result == 1)
03691         {
03692           // consumer exists in the set, a duplicate.
03693           ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_consumer: consumer already exists\n"), 1);
03694         }
03695 
03696       FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin ();
03697       // @@Lets take that the first entry as the only producer. We're
03698       // not sure if we can have multiple flow producers in a
03699       // flowconnection. We can have multiple producer in the MtM binding,
03700       // in which case the first producer that gets added is the leader.
03701       AVStreams::FlowProducer_ptr flow_producer = (*producer_begin);
03702 
03703       AVStreams::protocolSpec protocols (1);
03704       protocols.length (1);
03705       protocols [0] = CORBA::string_dup (this->producer_address_.in ());
03706 
03707       if (!this->ip_multicast_)
03708         {
03709           flow_consumer->set_protocol_restriction (protocols);
03710           char * address =
03711             flow_consumer->go_to_listen (the_qos,
03712                                          1,
03713                                          flow_producer,
03714                                          this->fp_name_.inout ());
03715           CORBA::Boolean is_met;
03716           flow_producer->connect_mcast (the_qos,
03717                                         is_met,
03718                                         address,
03719                                         this->fp_name_.inout ());
03720         }
03721        else
03722         {
03723           // The spec says go_to_listen is called with the multicast
03724           // address returned from the connect_mcast call called
03725           // during add_producer. But go_to_listen doesn't have a
03726           // address parameter. I guess it should be connect_to_peer.
03727           // IP Multicasting.
03728           flow_consumer->connect_to_peer (the_qos,
03729                                           this->producer_address_.in (),
03730                                           this->fp_name_.inout ());
03731 
03732           //  char * address =
03733           //             flow_consumer->go_to_listen (the_qos,
03734           //                                          1,
03735           //                                          flow_producer,
03736           //                                          this->fp_name_.inout ()
03737           //);
03738 
03739         }
03740       if (CORBA::is_nil (this->mcastconfigif_.in ()))
03741         ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowConnection::add_consumer: first add a producer and then a consumer\n"), 0);
03742       // @@ Is this the right place to do set_peer?
03743       AVStreams::flowSpec flow_spec;
03744       AVStreams::streamQoS stream_qos (1);
03745       stream_qos.length (1);
03746       stream_qos [0] = the_qos;
03747       this->mcastconfigif_->set_peer (flow_consumer,
03748                                       stream_qos,
03749                                       flow_spec);
03750     }
03751   catch (const CORBA::Exception& ex)
03752     {
03753       ex._tao_print_exception (
03754         "TAO_FlowConnection::add_consumer");
03755       return 0;
03756     }
03757   return 1;
03758 }
03759 
03760 CORBA::Boolean
03761 TAO_FlowConnection::drop (AVStreams::FlowEndPoint_ptr target)
03762 {
03763   ACE_UNUSED_ARG (target);
03764   return 0;
03765 }
03766 
03767 // -----------------------------------------------------------------
03768 // TAO_FlowEndPoint
03769 // -----------------------------------------------------------------
03770 
03771 //default constructor.
03772 TAO_FlowEndPoint::TAO_FlowEndPoint (void)
03773   :lock_ (0)
03774 {
03775 }
03776 
03777 TAO_FlowEndPoint::TAO_FlowEndPoint (const char *flowname,
03778                                     AVStreams::protocolSpec &protocols,
03779                                     const char *format)
03780 {
03781   this->open (flowname, protocols, format);
03782 }
03783 
03784 void
03785 TAO_FlowEndPoint::set_flow_handler (const char * /*flowname*/,
03786                                     TAO_AV_Flow_Handler * /*handler*/)
03787 {
03788 }
03789 
03790 int
03791 TAO_FlowEndPoint::open (const char *flowname,
03792                         AVStreams::protocolSpec &protocols,
03793                         const char *format)
03794 {
03795   this->flowname_ = flowname;
03796   this->format_ = format;
03797 
03798   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowEndPoint::open\n"));
03799   try
03800     {
03801       CORBA::Any flowname_any;
03802       flowname_any <<= flowname;
03803       this->define_property ("FlowName",
03804                              flowname_any);
03805       this->set_format (format);
03806       this->protocol_addresses_ = protocols;
03807       AVStreams::protocolSpec protocol_spec (protocols.length ());
03808       protocol_spec.length (protocols.length ());
03809       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%N:%l\n"));
03810       for (u_int i=0;i<protocols.length ();i++)
03811         {
03812           CORBA::String_var address = CORBA::string_dup (protocols [i]);
03813           TAO_Forward_FlowSpec_Entry entry ("", "", "", "", address.in ());
03814           protocol_spec [i] = CORBA::string_dup (entry.carrier_protocol_str ());
03815           if (TAO_debug_level > 0)
03816             ACE_DEBUG ((LM_DEBUG,
03817                         "[%s]\n",
03818                         static_cast<char const*>(protocol_spec[i])));
03819         }
03820       this->set_protocol_restriction (protocol_spec);
03821     }
03822   catch (const CORBA::Exception& ex)
03823     {
03824       ex._tao_print_exception ("TAO_FlowEndPoint::open");
03825       return -1;
03826     }
03827   return 0;
03828 }
03829 
03830 
03831 int
03832 TAO_FlowEndPoint::set_flowname (const char *flowname)
03833 {
03834   this->flowname_ = flowname;
03835   return 0;
03836 }
03837 
03838 // used by one flowconnection so that multiple connections cant use
03839 // the same flowendpoint.
03840 CORBA::Boolean
03841 TAO_FlowEndPoint::lock (void)
03842 {
03843   // lock the current flowendpoint
03844 
03845   if (this->lock_)
03846     return 0;
03847   this->lock_ = 1;
03848   return 1;
03849 }
03850 
03851 // unlocks the flowendpoint , becomes free to be used in another flow.
03852 void
03853 TAO_FlowEndPoint::unlock (void)
03854 {
03855   this->lock_ = 0;
03856 }
03857 
03858 
03859 void
03860 TAO_FlowEndPoint::destroy (void)
03861 {
03862   int result = TAO_AV_Core::deactivate_servant (this);
03863   if (result < 0)
03864     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::destroy failed\n"));
03865   TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
03866   for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
03867        begin != end; ++begin)
03868     (*begin)->protocol_object ()->destroy ();
03869 }
03870 
03871 AVStreams::StreamEndPoint_ptr
03872 TAO_FlowEndPoint::related_sep (void)
03873 {
03874 
03875   return AVStreams::StreamEndPoint::_duplicate (this->related_sep_.in ());
03876 }
03877 
03878 void
03879 TAO_FlowEndPoint::related_sep (AVStreams::StreamEndPoint_ptr related_sep)
03880 {
03881   this->related_sep_ = AVStreams::StreamEndPoint::_duplicate (related_sep);
03882 }
03883 
03884 AVStreams::FlowConnection_ptr
03885 TAO_FlowEndPoint::related_flow_connection (void)
03886 {
03887   return AVStreams::FlowConnection::_duplicate (this->related_flow_connection_.in ());
03888 }
03889 
03890 void
03891 TAO_FlowEndPoint::related_flow_connection (AVStreams::FlowConnection_ptr related_flow_connection)
03892 {
03893   this->related_flow_connection_ = AVStreams::FlowConnection::_duplicate (related_flow_connection);
03894 }
03895 
03896 // returns the connected peer for this flow
03897 AVStreams::FlowEndPoint_ptr
03898 TAO_FlowEndPoint::get_connected_fep (void)
03899 {
03900   return AVStreams::FlowEndPoint::_duplicate (this->peer_fep_.in ());
03901 }
03902 
03903 CORBA::Boolean
03904 TAO_FlowEndPoint::use_flow_protocol (const char * fp_name,
03905                                      const CORBA::Any &)
03906 {
03907   try
03908     {
03909       // Define the property called FlowProtocol
03910       CORBA::Any flowname_property;
03911       flowname_property <<= fp_name;
03912       this->define_property ("FlowProtocol",
03913                              flowname_property);
03914     }
03915   catch (const CORBA::Exception& ex)
03916     {
03917       ex._tao_print_exception (
03918         "TAO_FlowEndPoint::use_flow_protocol");
03919       return 0;
03920     }
03921   return 1;
03922 }
03923 
03924 void
03925 TAO_FlowEndPoint::set_format (const char * format)
03926 {
03927   this->format_ = format;
03928   try
03929     {
03930       // make this a property so that is_fep_compatible can query this and
03931       // check if 2 flowendpoints are compatible.
03932       CORBA::Any format_val;
03933       format_val <<= format;
03934       this->define_property ("Format",
03935                              format_val);
03936     }
03937   catch (const CORBA::Exception& ex)
03938     {
03939       ex._tao_print_exception ("TAO_FlowEndpoint::set_format");
03940     }
03941 }
03942 
03943 void
03944 TAO_FlowEndPoint::set_dev_params (const CosPropertyService::Properties & new_settings)
03945 {
03946   this->dev_params_ = new_settings;
03947   try
03948     {
03949       CORBA::Any DevParams_property;
03950       DevParams_property <<= new_settings;
03951       this->define_property ("DevParams",
03952                              DevParams_property);
03953     }
03954   catch (const CORBA::Exception& ex)
03955     {
03956       ex._tao_print_exception (
03957         "TAO_FlowEndPoint::set_dev_params");
03958     }
03959 }
03960 
03961 void
03962 TAO_FlowEndPoint::set_protocol_restriction (const AVStreams::protocolSpec & protocols)
03963 {
03964   try
03965     {
03966       u_int i = 0;
03967       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%N:%l\n"));
03968       for (i=0;i<protocols.length ();i++)
03969         {
03970           const char *protocol = (protocols)[i];
03971           if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%s\n", protocol));
03972         }
03973       CORBA::Any AvailableProtocols_property;
03974       AvailableProtocols_property <<= protocols;
03975       this->define_property ("AvailableProtocols",
03976                              AvailableProtocols_property);
03977       AVStreams::protocolSpec *temp_spec;
03978       CORBA::Any_var temp_any = this->get_property_value ("AvailableProtocols");
03979       temp_any.in () >>= temp_spec;
03980       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%N:%l\n"));
03981       for (i=0;i<temp_spec->length ();i++)
03982         {
03983           const char *protocol = (*temp_spec)[i];
03984           if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%s\n", protocol));
03985         }
03986       this->protocols_ = protocols;
03987     }
03988   catch (const CORBA::Exception& ex)
03989     {
03990       ex._tao_print_exception (
03991         "TAO_FlowEndpoint::set_protocol_restriction");
03992     }
03993 }
03994 
03995 CORBA::Boolean
03996 TAO_FlowEndPoint::is_fep_compatible (AVStreams::FlowEndPoint_ptr peer_fep)
03997 {
03998   const char *exception_message = "";
03999   try
04000     {
04001       // check whether the passed flowendpoint is compatible with this flowendpoint.
04002       // should we check for the availableFormats and choose one format.
04003       // get my format value
04004       CORBA::Any_var format_ptr;
04005       CORBA::String_var my_format, peer_format;
04006 
04007       exception_message = "TAO_FlowEndPoint::is_fep_compatible - Format";
04008       format_ptr = this->get_property_value ("Format");
04009 
04010       const char *temp_format;
04011       format_ptr.in () >>= temp_format;
04012       my_format = CORBA::string_dup (temp_format);
04013       // get my peer's format value
04014       exception_message = "TAO_FlowEndPoint::is_fep_compatible - Format[2]";
04015       format_ptr = peer_fep->get_property_value ("Format");
04016       format_ptr.in () >>= temp_format;
04017       peer_format = CORBA::string_dup (temp_format);
04018       if (ACE_OS::strcmp (my_format.in (),
04019                           peer_format.in ()) != 0)
04020         return 0;
04021 
04022       // since formats are same, check for a common protocol
04023       CORBA::Any_var AvailableProtocols_ptr;
04024       AVStreams::protocolSpec my_protocol_spec, peer_protocol_spec;
04025       AVStreams::protocolSpec *temp_protocols;;
04026 
04027       exception_message =
04028         "TAO_FlowEndPoint::is_fep_compatible - AvailableProtocols";
04029       AvailableProtocols_ptr = this->get_property_value ("AvailableProtocols");
04030       AvailableProtocols_ptr.in () >>= temp_protocols;
04031       my_protocol_spec = *temp_protocols;
04032 
04033       exception_message =
04034         "TAO_FlowEndPoint::is_fep_compatible - AvailableProtocols[2]";
04035       AvailableProtocols_ptr = peer_fep->get_property_value ("AvailableProtocols");
04036       AvailableProtocols_ptr.in () >>= temp_protocols;
04037       peer_protocol_spec = *temp_protocols;
04038 
04039       int protocol_match = 0;
04040       for (u_int i=0;i<my_protocol_spec.length ();i++)
04041         {
04042           CORBA::String_var my_protocol_string;
04043           for (u_int j=0;j<peer_protocol_spec.length ();j++)
04044             {
04045               CORBA::String_var peer_protocol_string;
04046               my_protocol_string = CORBA::string_dup (my_protocol_spec[i]);
04047               peer_protocol_string = CORBA::string_dup (peer_protocol_spec[j]);
04048               if (ACE_OS::strcmp (my_protocol_string.in (), peer_protocol_string.in ()) == 0)
04049                 {
04050                   protocol_match = 1;
04051                   break;
04052                 }
04053             }
04054           if (protocol_match)
04055             break;
04056         }
04057       if (!protocol_match)
04058         return 0;
04059     }
04060   catch (const CosPropertyService::PropertyNotFound& nf)
04061     {
04062       nf._tao_print_exception (exception_message);
04063     }
04064   catch (const CORBA::Exception& ex)
04065     {
04066       ex._tao_print_exception ("TAO_FlowEndPoint::is_fep_compatible");
04067       return 0;
04068     }
04069   return 1;
04070 }
04071 
04072 CORBA::Boolean
04073 TAO_FlowEndPoint::set_peer (AVStreams::FlowConnection_ptr /* the_fc */,
04074                             AVStreams::FlowEndPoint_ptr the_peer_fep,
04075                             AVStreams::QoS & /* the_qos */)
04076 {
04077   this->peer_fep_ =
04078     AVStreams::FlowEndPoint::_duplicate (the_peer_fep);
04079   return 1;
04080 }
04081 
04082 CORBA::Boolean
04083 TAO_FlowEndPoint::set_Mcast_peer (AVStreams::FlowConnection_ptr /* the_fc */,
04084                                   AVStreams::MCastConfigIf_ptr mcast_peer,
04085                                   AVStreams::QoS & /* the_qos */)
04086 {
04087   this->mcast_peer_ = AVStreams::MCastConfigIf::_duplicate (mcast_peer);
04088   return 0;
04089 }
04090 
04091 char *
04092 TAO_FlowEndPoint::go_to_listen_i (TAO_FlowSpec_Entry::Role role,
04093                                   AVStreams::QoS & /*the_qos*/,
04094                                   CORBA::Boolean /*is_mcast*/,
04095                                   AVStreams::FlowEndPoint_ptr peer_fep,
04096                                   char *& flowProtocol)
04097 {
04098   char direction [BUFSIZ];
04099   switch (role)
04100     {
04101     case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
04102       ACE_OS::strcpy (direction, "IN");
04103       break;
04104     case TAO_FlowSpec_Entry::TAO_AV_CONSUMER:
04105       ACE_OS::strcpy (direction, "OUT");
04106       break;
04107     default:
04108       break;
04109     }
04110   AVStreams::protocolSpec my_protocol_spec, peer_protocol_spec;
04111   AVStreams::protocolSpec *temp_protocols;
04112   CORBA::Any_var AvailableProtocols_ptr =
04113     peer_fep->get_property_value ("AvailableProtocols");
04114   AvailableProtocols_ptr.in () >>= temp_protocols;
04115   peer_protocol_spec = *temp_protocols;
04116   AvailableProtocols_ptr =
04117     this->get_property_value ("AvailableProtocols");
04118   AvailableProtocols_ptr.in () >>= temp_protocols;
04119   my_protocol_spec = *temp_protocols;
04120   int protocol_match = 0;
04121   CORBA::String_var listen_protocol;
04122   u_int i =0;
04123   for (i=0;i<my_protocol_spec.length ();i++)
04124     {
04125       CORBA::String_var my_protocol_string;
04126       for (u_int j=0;j<peer_protocol_spec.length ();j++)
04127         {
04128           CORBA::String_var peer_protocol_string;
04129           my_protocol_string = CORBA::string_dup (my_protocol_spec[i]);
04130           peer_protocol_string = CORBA::string_dup (peer_protocol_spec[j]);
04131           if (ACE_OS::strcmp (my_protocol_string.in (), peer_protocol_string.in ()) == 0)
04132             {
04133               listen_protocol = my_protocol_string;
04134               protocol_match = 1;
04135               break;
04136             }
04137         }
04138       if (protocol_match)
04139         break;
04140     }
04141   if (!protocol_match)
04142     ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowEndPoint::go_to_listen failed: no protoocol match\n"), 0);
04143 
04144   for (u_int j=0;j<this->protocol_addresses_.length ();j++)
04145     if (ACE_OS::strncmp (this->protocol_addresses_ [j], listen_protocol.in (), ACE_OS::strlen (listen_protocol.in ())) == 0)
04146       {
04147         // Now listen on that protocol.
04148         TAO_Forward_FlowSpec_Entry *entry;
04149         ACE_NEW_RETURN (entry,
04150                         TAO_Forward_FlowSpec_Entry (this->flowname_.in (),
04151                                                     direction,
04152                                                     this->format_.in (),
04153                                                     flowProtocol,
04154                                                     this->protocol_addresses_ [j]),
04155                         0);
04156 
04157         TAO_AV_Acceptor_Registry *acceptor_registry = TAO_AV_CORE::instance ()->acceptor_registry ();
04158         this->flow_spec_set_.insert (entry);
04159         int result = acceptor_registry->open (this,
04160                                               TAO_AV_CORE::instance (),
04161                                               this->flow_spec_set_);
04162         if (result < 0)
04163           return 0;
04164         char *listen_address = entry->get_local_addr_str ();
04165         char *address;
04166         ACE_NEW_RETURN (address,
04167                         char [BUFSIZ],
04168                         0);
04169         ACE_OS::sprintf (address, "%s=%s", listen_protocol.in (), listen_address);
04170         return address;
04171       }
04172   return 0;
04173 }
04174 
04175 
04176 CORBA::Boolean
04177 TAO_FlowEndPoint::connect_to_peer_i (TAO_FlowSpec_Entry::Role role,
04178                                      AVStreams::QoS & /*the_qos*/,
04179                                      const char * address,
04180                                      const char * use_flow_protocol)
04181 {
04182   char direction [BUFSIZ];
04183   switch (role)
04184     {
04185     case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
04186       ACE_OS::strcpy (direction, "IN");
04187       break;
04188     case TAO_FlowSpec_Entry::TAO_AV_CONSUMER:
04189       ACE_OS::strcpy (direction, "OUT");
04190       break;
04191     default:
04192       break;
04193     }
04194   TAO_Forward_FlowSpec_Entry *entry;
04195   ACE_NEW_RETURN (entry,
04196                   TAO_Forward_FlowSpec_Entry (this->flowname_.in (),
04197                                               direction,
04198                                               this->format_.in (),
04199                                               use_flow_protocol,
04200                                               address),
04201                   0);
04202   this->flow_spec_set_.insert (entry);
04203   TAO_AV_Connector_Registry *connector_registry = TAO_AV_CORE::instance ()->connector_registry ();
04204   int result = connector_registry->open (this,
04205                                          TAO_AV_CORE::instance (),
04206                                          this->flow_spec_set_);
04207   if (result < 0)
04208     ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowEndPoint::connector_registry::open failed\n"), 0);
04209   this->reverse_channel_ = entry->get_local_addr_str ();
04210   return 1;
04211 }
04212 
04213 int
04214 TAO_FlowEndPoint::set_protocol_object (const char * /*flowname*/,
04215                                        TAO_AV_Protocol_Object * /*object*/)
04216 {
04217   return 0;
04218 }
04219 
04220 
04221 // ------------------------------------------------------------
04222 // TAO_FlowProducer class
04223 // ------------------------------------------------------------
04224 
04225 //default constructor
04226 TAO_FlowProducer::TAO_FlowProducer (void)
04227 {
04228 }
04229 
04230 TAO_FlowProducer::TAO_FlowProducer (const char *flowname,
04231                                     AVStreams::protocolSpec protocols,
04232                                     const char *format)
04233 {
04234   this->open (flowname, protocols, format);
04235 }
04236 
04237 // gets the reverse channel for feedback.
04238 char *
04239 TAO_FlowProducer::get_rev_channel (const char * /*pcol_name*/)
04240 {
04241   return 0;
04242 }
04243 
04244 // The start, stop and destroy are to be handled by the application.
04245 void
04246 TAO_FlowProducer::stop (void)
04247 {
04248   TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04249   for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04250        begin != end; ++begin)
04251     {
04252       TAO_FlowSpec_Entry *entry = (*begin);
04253       entry->handler ()->stop (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
04254     }
04255 }
04256 
04257 void
04258 TAO_FlowProducer::start (void)
04259 {
04260   TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04261   for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04262        begin != end; ++begin)
04263     {
04264       TAO_FlowSpec_Entry *entry = (*begin);
04265       if (entry->handler () != 0)
04266         {
04267           entry->handler ()->start (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
04268         }
04269       if (entry->control_handler () != 0)
04270         {
04271           entry->control_handler ()->start (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
04272         }
04273     }
04274 }
04275 
04276 char *
04277 TAO_FlowProducer::go_to_listen (AVStreams::QoS & the_qos,
04278                                 CORBA::Boolean is_mcast,
04279                                 AVStreams::FlowEndPoint_ptr peer_fep,
04280                                 char *& flowProtocol)
04281 {
04282   return this->go_to_listen_i (TAO_FlowSpec_Entry::TAO_AV_PRODUCER,
04283                                the_qos,
04284                                is_mcast,
04285                                peer_fep,
04286                                flowProtocol);
04287 }
04288 
04289 CORBA::Boolean
04290 TAO_FlowProducer::connect_to_peer (AVStreams::QoS & the_qos,
04291                                    const char * address,
04292                                    const char * use_flow_protocol)
04293 {
04294   return this->connect_to_peer_i (TAO_FlowSpec_Entry::TAO_AV_PRODUCER,
04295                                   the_qos,
04296                                   address,
04297                                   use_flow_protocol);
04298 }
04299 //  Connect to a IP multicast address.
04300 char *
04301 TAO_FlowProducer::connect_mcast (AVStreams::QoS & /* the_qos */,
04302                                  CORBA::Boolean_out /* is_met */,
04303                                  const char *address,
04304                                  const char * use_flow_protocol)
04305 {
04306   // The address variable gives the multicast address to subscribe to.
04307   for (u_int i=0;i<this->protocols_.length ();i++)
04308     {
04309       // choose the protocol which supports multicast.
04310     }
04311 
04312   if (address == 0)
04313     if (TAO_debug_level > 0)
04314       ACE_DEBUG ((LM_DEBUG, "TAO_FlowProducer::connect_mcast address is 0\n"));
04315   TAO_Forward_FlowSpec_Entry  *entry;
04316   ACE_NEW_RETURN (entry,
04317                   TAO_Forward_FlowSpec_Entry(this->flowname_.in (),
04318                                              "IN",
04319                                              this->format_.in (),
04320                                              use_flow_protocol,
04321                                              address),
04322                   0);
04323 
04324   this->flow_spec_set_.insert (entry);
04325   TAO_AV_Acceptor_Registry *acceptor_registry =
04326     TAO_AV_CORE::instance ()->acceptor_registry ();
04327   int result = acceptor_registry->open (this,
04328                                         TAO_AV_CORE::instance (),
04329                                         this->flow_spec_set_);
04330   if (result < 0)
04331     ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowProducer::connect_mcast:acceptor_registry open failed\n"), 0);
04332   // Now remove our handler from the reactor since we're a producer and dont want to get called for
04333   // multicast packets.
04334   ACE_Event_Handler *event_handler = entry->handler ()->event_handler ();
04335   event_handler->reactor ()->remove_handler (event_handler,
04336                                              ACE_Event_Handler::READ_MASK);
04337   return CORBA::string_dup (address);
04338 }
04339 
04340 // sets the key for this flow.
04341 void
04342 TAO_FlowProducer::set_key (const AVStreams::key & the_key)
04343 {
04344   try
04345     {
04346       CORBA::Any anyval;
04347       anyval <<= the_key;
04348       this->define_property ("PublicKey",
04349                              anyval);
04350     }
04351   catch (const CORBA::Exception& ex)
04352     {
04353       ex._tao_print_exception ("TAO_FlowProducer::set_key");
04354     }
04355 }
04356 
04357 // source id to be used to distinguish this source from others.
04358 void
04359 TAO_FlowProducer::set_source_id (CORBA::Long source_id)
04360 {
04361   this->source_id_ = source_id;
04362 }
04363 
04364 // ------------------------------------------------------------
04365 // TAO_FlowConsumer
04366 // ------------------------------------------------------------
04367 
04368 
04369 // default constructor.
04370 TAO_FlowConsumer::TAO_FlowConsumer (void)
04371 {
04372 }
04373 
04374 TAO_FlowConsumer::TAO_FlowConsumer (const char *flowname,
04375                                     AVStreams::protocolSpec protocols,
04376                                     const char *format)
04377 {
04378   this->open (flowname, protocols, format);
04379 }
04380 
04381 // The start, stop and destroy are to be handled by the application.
04382 void
04383 TAO_FlowConsumer::stop (void)
04384 {
04385   TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04386   for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04387        begin != end; ++begin)
04388     (*begin)->handler ()->stop (TAO_FlowSpec_Entry::TAO_AV_CONSUMER);
04389 }
04390 
04391 void
04392 TAO_FlowConsumer::start (void)
04393 {
04394   TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04395   for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04396        begin != end; ++begin)
04397     {
04398       (*begin)->handler ()->start (TAO_FlowSpec_Entry::TAO_AV_CONSUMER);
04399     }
04400 }
04401 
04402 char *
04403 TAO_FlowConsumer::go_to_listen (AVStreams::QoS & the_qos,
04404                                 CORBA::Boolean is_mcast,
04405                                 AVStreams::FlowEndPoint_ptr peer_fep,
04406                                 char *& flowProtocol)
04407 {
04408   return this->go_to_listen_i (TAO_FlowSpec_Entry::TAO_AV_CONSUMER,
04409                                the_qos,
04410                                is_mcast,
04411                                peer_fep,
04412                                flowProtocol);
04413 }
04414 
04415 CORBA::Boolean
04416 TAO_FlowConsumer::connect_to_peer (AVStreams::QoS & the_qos,
04417                                    const char * address,
04418                                    const char * use_flow_protocol)
04419 {
04420   return this->connect_to_peer_i (TAO_FlowSpec_Entry::TAO_AV_CONSUMER,
04421                                   the_qos,
04422                                   address,
04423                                   use_flow_protocol);
04424 }
04425 
04426 //------------------------------------------------------------
04427 // TAO_Tokenizer
04428 //------------------------------------------------------------
04429 TAO_Tokenizer::TAO_Tokenizer (const char *string, char delimiter)
04430   :token_array_ (10),
04431    count_ (0)
04432 {
04433   this->parse (string, delimiter);
04434 }
04435 
04436 TAO_Tokenizer::~TAO_Tokenizer ()
04437 {
04438   for (unsigned int i=0; i<this->num_tokens_; i++)
04439     CORBA::string_free (this->token_array_[i]);
04440 }
04441 
04442 
04443 int
04444 TAO_Tokenizer::parse (const char *string, char delimiter)
04445 {
04446   ACE_CString new_string (string);
04447   u_int pos =0;
04448   ACE_CString::size_type slash_pos = 0;
04449   u_int count = 0;
04450   int result;
04451   while (pos < new_string.length ())
04452     {
04453       slash_pos = new_string.find (delimiter, pos);
04454       ACE_CString substring;
04455       if (slash_pos != new_string.npos)
04456         {
04457           substring = new_string.substring (pos,
04458                                             slash_pos - pos);
04459           pos = slash_pos + 1;
04460         }
04461       else
04462         {
04463           substring = new_string.substring (pos);
04464           pos = static_cast<int> (new_string.length ());
04465         }
04466       char *token = CORBA::string_dup (substring.c_str ());
04467       result = this->token_array_.set (token, count);
04468       if (result == -1)
04469         {
04470           this->token_array_.size (this->token_array_.size ()*2);
04471           result = this->token_array_.set (token, count);
04472           if (result == -1)
04473             ACE_ERROR_RETURN ((LM_ERROR, "TAO_Tokenizer::parse error"), -1);
04474         }
04475       count++;
04476     }
04477 
04478   /*
04479   ACE_OS::strcpy (this->string_ , string);
04480   char delimiter_str [2] = {0, 0};
04481   delimiter_str [0] = delimiter;
04482   char *token = ACE_OS::strtok (this->string_, delimiter_str);
04483 
04484   while (token != 0)
04485     {
04486       result = this->token_array_.set (token, count);
04487       if (result == -1)
04488         {
04489           this->token_array_.size (this->token_array_.size ()*2);
04490           result = this->token_array_.set (token, count);
04491           if (result == -1)
04492             ACE_ERROR_RETURN ((LM_ERROR, "TAO_Tokenizer::parse error"), -1);
04493         }
04494       token = ACE_OS::strtok (0, delimiter_str);
04495       count++;
04496     }
04497   */
04498   this->num_tokens_ = count;
04499   return 0;
04500 }
04501 
04502 char*
04503 TAO_Tokenizer::token (void)
04504 {
04505   if (count_ < num_tokens_)
04506     return CORBA::string_dup (this->token_array_[this->count_++]);
04507   else
04508     return 0;
04509 }
04510 
04511 int
04512 TAO_Tokenizer::num_tokens (void)
04513 {
04514   return static_cast<int> (this->num_tokens_);
04515 }
04516 
04517 const char *
04518 TAO_Tokenizer::operator [] (size_t index) const
04519 {
04520   if (index >= this->num_tokens_)
04521     return 0;
04522 
04523   return this->token_array_[index];
04524 }
04525 
04526 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Sun Jan 27 16:05:16 2008 for TAO_AV by doxygen 1.3.6