AVStreams_i.cpp

Go to the documentation of this file.
00001 // AVStreams_i.cpp,v 5.138 2006/04/24 13:53:27 jwillemsen Exp
00002 
00003 // ============================================================================
00004 //
00005 // = LIBRARY
00006 //    cos
00007 //
00008 // = FILENAME
00009 //   AVStreams_i.cpp
00010 //
00011 // = AUTHOR
00012 //    Sumedh Mungee <sumedh@cs.wustl.edu>
00013 //    Nagarajan Surendran <naga@cs.wustl.edu>
00014 //
00015 // ============================================================================
00016 
00017 #include "orbsvcs/AV/AVStreams_i.h"
00018 #include "orbsvcs/AV/sfp.h"
00019 #include "orbsvcs/AV/MCast.h"
00020 #include "orbsvcs/AV/RTCP.h"
00021 
00022 #include "tao/debug.h"
00023 #include "tao/ORB_Core.h"
00024 #include "tao/AnyTypeCode/Any.h"
00025 #include "ace/OS_NS_arpa_inet.h"
00026 
00027 #if !defined (__ACE_INLINE__)
00028 #include "orbsvcs/AV/AVStreams_i.i"
00029 #endif /* __ACE_INLINE__ */
00030 
00031 ACE_RCSID (AV,
00032            AVStreams_i,
00033            "AVStreams_i.cpp,v 5.138 2006/04/24 13:53:27 jwillemsen Exp")
00034 
00035 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00036 
00037 //------------------------------------------------------------
00038 // TAO_AV_Qos
00039 //------------------------------------------------------------
00040 
00041 TAO_AV_QoS::TAO_AV_QoS (void)
00042 {
00043 }
00044 
00045 TAO_AV_QoS::TAO_AV_QoS (AVStreams::streamQoS &stream_qos)
00046 {
00047   this->set (stream_qos);
00048 }
00049 
00050 int
00051 TAO_AV_QoS::convert (AVStreams::streamQoS &/*network_qos*/)
00052 {
00053   return -1;
00054 }
00055 
00056 
00057 // ----------------------------------------------------------------------
00058 // AV_Null_MediaCtrl
00059 // ----------------------------------------------------------------------
00060 AV_Null_MediaCtrl::AV_Null_MediaCtrl (void)
00061 {
00062 }
00063 
00064 AV_Null_MediaCtrl::~AV_Null_MediaCtrl (void)
00065 {
00066 }
00067 
00068 
00069 // ----------------------------------------------------------------------
00070 // TAO_Basic_StreamCtrl
00071 // ----------------------------------------------------------------------
00072 
00073 // Constructor
00074 TAO_Basic_StreamCtrl::TAO_Basic_StreamCtrl (void)
00075   :flow_count_ (0)
00076 {
00077 }
00078 
00079 
00080 // Stop the transfer of data of the stream
00081 // Empty the_spec means apply operation to all flows
00082 void
00083 TAO_Basic_StreamCtrl::stop (const AVStreams::flowSpec &flow_spec
00084                             ACE_ENV_ARG_DECL)
00085   ACE_THROW_SPEC ((CORBA::SystemException,
00086                    AVStreams::noSuchFlow))
00087 {
00088   ACE_TRY
00089     {
00090       // @@Call stop on the Related MediaCtrl.  call stop on the flow
00091       // connections.
00092       if (this->flow_connection_map_.current_size () > 0)
00093         {
00094           if (flow_spec.length () > 0)
00095             for (u_int i=0;i<flow_spec.length ();i++)
00096               {
00097                 char *flowname = TAO_AV_Core::get_flowname (flow_spec[i]);
00098                 ACE_CString flow_name_key (flowname);
00099                 AVStreams::FlowConnection_var flow_connection_entry;
00100                 if (this->flow_connection_map_.find (flow_name_key,
00101                                                      flow_connection_entry) == 0)
00102                   {
00103                     flow_connection_entry->stop (ACE_ENV_SINGLE_ARG_PARAMETER);
00104                     ACE_TRY_CHECK;
00105                   }
00106               }
00107           else
00108             {
00109               // call stop on all the flows.
00110               FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
00111               FlowConnection_Map_Entry *entry;
00112               for (;iterator.next (entry) !=  0;iterator.advance ())
00113                 {
00114                   entry->int_id_->stop (ACE_ENV_SINGLE_ARG_PARAMETER);
00115                   ACE_TRY_CHECK;
00116                 }
00117             }
00118         }
00119     }
00120   ACE_CATCHANY
00121     {
00122       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_Basic_StreamCtrl::stop");
00123       return;
00124     }
00125   ACE_ENDTRY;
00126   ACE_CHECK;
00127 }
00128 
00129 // Start the transfer of data in the stream.
00130 // Empty the_spec means apply operation to all flows
00131 void
00132 TAO_Basic_StreamCtrl::start (const AVStreams::flowSpec &flow_spec
00133                              ACE_ENV_ARG_DECL)
00134   ACE_THROW_SPEC ((CORBA::SystemException,
00135                    AVStreams::noSuchFlow))
00136 {
00137   ACE_TRY
00138     {
00139       // @@Call start on the Related MediaCtrl.
00140 
00141       // call start on the flow connections.
00142       if (this->flow_connection_map_.current_size () > 0)
00143         {
00144           if (flow_spec.length () > 0)
00145             for (u_int i = 0; i < flow_spec.length (); i++)
00146               {
00147                 char *flowname = TAO_AV_Core::get_flowname (flow_spec[i]);
00148                 ACE_CString flow_name_key (flowname);
00149                 FlowConnection_Map::ENTRY *flow_connection_entry = 0;
00150                 if (this->flow_connection_map_.find (flow_name_key,
00151                                                      flow_connection_entry) == 0)
00152                   {
00153                     flow_connection_entry->int_id_->start (ACE_ENV_SINGLE_ARG_PARAMETER);
00154                     ACE_TRY_CHECK;
00155                   }
00156               }
00157           else
00158             {
00159               // call start on all the flows.
00160               FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
00161               FlowConnection_Map_Entry *entry = 0;
00162               for (;iterator.next (entry) !=  0;iterator.advance ())
00163                 {
00164                   entry->int_id_->start (ACE_ENV_SINGLE_ARG_PARAMETER);
00165                   ACE_TRY_CHECK;
00166                 }
00167             }
00168         }
00169     }
00170   ACE_CATCHANY
00171     {
00172       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_Basic_StreamCtrl::start");
00173       return;
00174     }
00175   ACE_ENDTRY;
00176   ACE_CHECK;
00177 }
00178 
00179 // Tears down the stream. This will close the connection, and delete
00180 // the streamendpoint and vdev associated with this stream Empty
00181 // the_spec means apply operation to all flows
00182 
00183 void
00184 TAO_Basic_StreamCtrl::destroy (const AVStreams::flowSpec &flow_spec
00185                                ACE_ENV_ARG_DECL)
00186     ACE_THROW_SPEC ((CORBA::SystemException,
00187                      AVStreams::noSuchFlow))
00188 {
00189   ACE_TRY
00190     {
00191       // call stop on the flow connections.
00192       if (this->flow_connection_map_.current_size () > 0)
00193         {
00194           if (flow_spec.length () > 0)
00195           {
00196             for (u_int i=0;i<flow_spec.length ();i++)
00197               {
00198                 char *flowname = TAO_AV_Core::get_flowname (flow_spec[i]);
00199                 ACE_CString flow_name_key (flowname);
00200                 FlowConnection_Map::ENTRY *flow_connection_entry = 0;
00201                 if (this->flow_connection_map_.find (flow_name_key, flow_connection_entry) == 0)
00202                   {
00203                     flow_connection_entry->int_id_->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00204                     ACE_TRY_CHECK;
00205                   }
00206               }
00207           }
00208           else
00209             {
00210               // call destroy on all the flows.
00211               FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
00212               FlowConnection_Map_Entry *entry = 0;
00213               for (;iterator.next (entry) !=  0;iterator.advance ())
00214                 {
00215                   entry->int_id_->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00216                   ACE_TRY_CHECK;
00217                 }
00218             }
00219         }
00220     }
00221   ACE_CATCHANY
00222     {
00223       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_Basic_StreamCtrl::destroy");
00224       return;
00225     }
00226   ACE_ENDTRY;
00227   ACE_CHECK;
00228 }
00229 
00230 // Changes the QoS associated with the stream
00231 // Empty the_spec means apply operation to all flows
00232 CORBA::Boolean
00233 
00234 TAO_Basic_StreamCtrl::modify_QoS (AVStreams::streamQoS & /*new_qos*/,
00235                                   const AVStreams::flowSpec &/*flowspec*/
00236                                   ACE_ENV_ARG_DECL_NOT_USED)
00237     ACE_THROW_SPEC ((CORBA::SystemException,
00238                      AVStreams::noSuchFlow,
00239                      AVStreams::QoSRequestFailed))
00240 {
00241   return 1;
00242 }
00243 
00244 // Used by StreamEndPoint and VDev to inform StreamCtrl of events.
00245 // E.g., loss of flow, reestablishment of flow, etc..
00246 void
00247 TAO_Basic_StreamCtrl::push_event (const struct CosPropertyService::Property &/*the_event*/
00248                                   ACE_ENV_ARG_DECL_NOT_USED)
00249   ACE_THROW_SPEC ((CORBA::SystemException))
00250 {
00251   if (TAO_debug_level > 0)
00252     ACE_DEBUG ((LM_DEBUG, "\n(%P|%t) Recieved event \""));
00253 }
00254 
00255 // Sets the flow protocol status.
00256 void
00257 TAO_Basic_StreamCtrl::set_FPStatus (const AVStreams::flowSpec &flow_spec,
00258                                     const char  *fp_name,
00259                                     const CORBA::Any &fp_settings
00260                                     ACE_ENV_ARG_DECL)
00261   ACE_THROW_SPEC ((CORBA::SystemException,
00262                    AVStreams::noSuchFlow,
00263                    AVStreams::FPError))
00264 
00265 {
00266   if (!CORBA::is_nil (this->sep_a_.in ()))
00267     {
00268       this->sep_a_->set_FPStatus (flow_spec, fp_name, fp_settings ACE_ENV_ARG_PARAMETER);
00269       ACE_CHECK;
00270     }
00271 }
00272 
00273 // Gets the flow connection.
00274 CORBA::Object_ptr
00275 TAO_Basic_StreamCtrl::get_flow_connection (const char *flow_name
00276                                            ACE_ENV_ARG_DECL)
00277     ACE_THROW_SPEC ((CORBA::SystemException,
00278                      AVStreams::noSuchFlow,
00279                      AVStreams::notSupported))
00280 {
00281   ACE_CString flow_name_key (flow_name);
00282   AVStreams::FlowConnection_var flow_connection_entry;
00283 
00284   if (this->flow_connection_map_.find (flow_name_key, flow_connection_entry) == 0){
00285     return flow_connection_entry._retn();
00286   }
00287   else{
00288     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%N,%l) Cannot find flow: %s\n", flow_name ));
00289     ACE_THROW_RETURN (AVStreams::noSuchFlow (), CORBA::Object::_nil ());
00290   }
00291 }
00292 
00293 // Sets the flow connection.
00294 void
00295 TAO_Basic_StreamCtrl::set_flow_connection (const char *flow_name,
00296                                            CORBA::Object_ptr flow_connection_obj
00297                                            ACE_ENV_ARG_DECL)
00298   ACE_THROW_SPEC ((CORBA::SystemException,
00299                    AVStreams::noSuchFlow,
00300                    AVStreams::notSupported))
00301 {
00302   AVStreams::FlowConnection_var flow_connection;
00303   ACE_TRY
00304     {
00305       flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj ACE_ENV_ARG_PARAMETER);
00306       ACE_TRY_CHECK;
00307     }
00308   ACE_CATCHANY
00309     {
00310       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_Basic_StreamCtrl::set_flow_connection");
00311       return;
00312     }
00313   ACE_ENDTRY;
00314   ACE_CHECK;
00315   // add the flowname and the flowconnection to the hashtable.
00316   this->flows_.length (this->flow_count_ + 1);
00317   this->flows_ [this->flow_count_++] = CORBA::string_dup (flow_name);
00318   ACE_CString flow_name_key (flow_name);
00319   if (this->flow_connection_map_.bind (flow_name_key, flow_connection) != 0)
00320   {
00321     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%N,%l) Cannot find flow: %s\n", flow_name ));
00322     ACE_THROW (AVStreams::noSuchFlow ());// is this right?
00323   }
00324 }
00325 
00326 TAO_Basic_StreamCtrl::~TAO_Basic_StreamCtrl (void)
00327 {
00328 }
00329 
00330 // ----------------------------------------------------------------------
00331 // TAO_Negotiator
00332 // ----------------------------------------------------------------------
00333 
00334 CORBA::Boolean
00335 TAO_Negotiator::negotiate (AVStreams::Negotiator_ptr /* remote_negotiator */,
00336                            const AVStreams::streamQoS &/* qos_spec */
00337                            ACE_ENV_ARG_DECL_NOT_USED)
00338   ACE_THROW_SPEC ((CORBA::SystemException))
00339 {
00340   ACE_DEBUG ((LM_DEBUG,
00341               "TAO_Negotiator::negotiate\n"));
00342   return 0;
00343 }
00344 
00345 // ----------------------------------------------------------------------
00346 // MMDevice_Map_Hash_Key
00347 // ----------------------------------------------------------------------
00348 
00349 const int MMDevice_Map_Hash_Key::hash_maximum_ = 10000;
00350 
00351 //default constructor.
00352 MMDevice_Map_Hash_Key::MMDevice_Map_Hash_Key (void)
00353 {
00354   this->mmdevice_ = AVStreams::MMDevice::_nil ();
00355 }
00356 
00357 // constructor.
00358 MMDevice_Map_Hash_Key::MMDevice_Map_Hash_Key (AVStreams::MMDevice_ptr mmdevice)
00359 {
00360   this->mmdevice_ = AVStreams::MMDevice::_duplicate (mmdevice);
00361 }
00362 
00363 // copy constructor.
00364 MMDevice_Map_Hash_Key::MMDevice_Map_Hash_Key (const MMDevice_Map_Hash_Key& hash_key)
00365 {
00366   this->mmdevice_ = AVStreams::MMDevice::_duplicate (hash_key.mmdevice_);
00367 }
00368 
00369 // destructor.
00370 MMDevice_Map_Hash_Key::~MMDevice_Map_Hash_Key (void)
00371 {
00372   CORBA::release (this->mmdevice_);
00373 }
00374 
00375 bool
00376 MMDevice_Map_Hash_Key::operator == (const MMDevice_Map_Hash_Key &hash_key) const
00377 {
00378   CORBA::Boolean result = 0;
00379   ACE_DECLARE_NEW_CORBA_ENV;
00380   ACE_TRY
00381     {
00382       result =
00383         this->mmdevice_->_is_equivalent (hash_key.mmdevice_
00384                                          ACE_ENV_ARG_PARAMETER);
00385       ACE_TRY_CHECK;
00386     }
00387   ACE_CATCHANY
00388     {
00389       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "MMDevice_Map_Hash_Key::operator == ");
00390       return false;
00391     }
00392   ACE_ENDTRY;
00393   ACE_CHECK_RETURN (false);
00394 
00395   return result;
00396 }
00397 
00398 bool
00399 operator < (const MMDevice_Map_Hash_Key &left,
00400             const MMDevice_Map_Hash_Key &right)
00401 {
00402   bool result = false;
00403 
00404   ACE_DECLARE_NEW_CORBA_ENV;
00405   ACE_TRY
00406     {
00407       const CORBA::ULong left_hash =
00408         left.mmdevice_->_hash (left.hash_maximum_
00409                                ACE_ENV_ARG_PARAMETER);
00410       ACE_TRY_CHECK;
00411 
00412       const CORBA::ULong right_hash =
00413         right.mmdevice_->_hash (right.hash_maximum_
00414                                 ACE_ENV_ARG_PARAMETER);
00415       ACE_TRY_CHECK;
00416 
00417       result = left_hash < right_hash;
00418     }
00419   ACE_CATCHANY
00420     {
00421       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00422                            "operator < for MMDevice_Map_Hash_Key");
00423       return false;
00424     }
00425   ACE_ENDTRY;
00426   ACE_CHECK_RETURN (false);
00427 
00428   return result;
00429 }
00430 
00431 u_long
00432 MMDevice_Map_Hash_Key::hash (void)  const
00433 {
00434   u_long result = 0;
00435   ACE_DECLARE_NEW_CORBA_ENV;
00436   ACE_TRY
00437     {
00438       result = this->mmdevice_->_hash (this->hash_maximum_
00439                                        ACE_ENV_ARG_PARAMETER);
00440       ACE_TRY_CHECK;
00441     }
00442   ACE_CATCHANY
00443     {
00444       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "MMDevice_Map_Hash_Key::hash");
00445       return 0;
00446     }
00447   ACE_ENDTRY;
00448   ACE_CHECK_RETURN (0);
00449   return result;
00450 }
00451 
00452 // ----------------------------------------------------------------------
00453 // TAO_StreamCtrl
00454 // ----------------------------------------------------------------------
00455 
00456 TAO_StreamCtrl::TAO_StreamCtrl (void)
00457   :mcastconfigif_ (0)
00458 {
00459   ACE_DECLARE_NEW_CORBA_ENV;
00460   ACE_TRY
00461     {
00462       this->streamctrl_ = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
00463       ACE_TRY_CHECK;
00464       char buf [BUFSIZ];
00465       int result = ACE_OS::hostname (buf, BUFSIZ);
00466       unsigned long ipaddr = 0;
00467       if (result == 0)
00468         ipaddr = ACE_OS::inet_addr (buf);
00469       this->source_id_ = TAO_AV_RTCP::alloc_srcid (ipaddr);
00470     }
00471   ACE_CATCHANY
00472     {
00473       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamCtrl::TAO_StreamCtrl");
00474     }
00475   ACE_ENDTRY;
00476   ACE_CHECK;
00477 }
00478 
00479 TAO_StreamCtrl::~TAO_StreamCtrl (void)
00480 {
00481   delete this->mcastconfigif_;
00482 }
00483 
00484 
00485 // Stop the transfer of data of the stream
00486 // Empty the_spec means apply operation to all flows
00487 void
00488 TAO_StreamCtrl::stop (const AVStreams::flowSpec &flow_spec
00489                       ACE_ENV_ARG_DECL)
00490   ACE_THROW_SPEC ((CORBA::SystemException,
00491                    AVStreams::noSuchFlow))
00492 {
00493   ACE_TRY
00494     {
00495       TAO_Basic_StreamCtrl::stop (flow_spec ACE_ENV_ARG_PARAMETER);
00496       ACE_TRY_CHECK;
00497       if (this->flow_connection_map_.current_size () > 0)
00498         return;
00499       MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
00500       MMDevice_Map::ENTRY *entry = 0;
00501       for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
00502         {
00503           entry->int_id_.sep_->stop (flow_spec ACE_ENV_ARG_PARAMETER);
00504           ACE_TRY_CHECK;
00505         }
00506       MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
00507       for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
00508         {
00509           entry->int_id_.sep_->stop (flow_spec ACE_ENV_ARG_PARAMETER);
00510           ACE_TRY_CHECK;
00511         }
00512     }
00513   ACE_CATCHANY
00514     {
00515       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_Basic_StreamCtrl::stop");
00516       return;
00517     }
00518   ACE_ENDTRY;
00519   ACE_CHECK;
00520 }
00521 
00522 // Start the transfer of data in the stream.
00523 // Empty the_spec means apply operation to all flows
00524 void
00525 TAO_StreamCtrl::start (const AVStreams::flowSpec &flow_spec
00526                        ACE_ENV_ARG_DECL)
00527   ACE_THROW_SPEC ((CORBA::SystemException,
00528                    AVStreams::noSuchFlow))
00529 {
00530   ACE_TRY
00531     {
00532       TAO_Basic_StreamCtrl::start (flow_spec ACE_ENV_ARG_PARAMETER);
00533       ACE_TRY_CHECK;
00534       if (this->flow_connection_map_.current_size () > 0)
00535         return;
00536 
00537       MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
00538       MMDevice_Map::ENTRY *entry = 0;
00539       for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
00540         {
00541           entry->int_id_.sep_->start (flow_spec ACE_ENV_ARG_PARAMETER);
00542           ACE_TRY_CHECK;
00543         }
00544       MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
00545       for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
00546         {
00547           entry->int_id_.sep_->start (flow_spec ACE_ENV_ARG_PARAMETER);
00548           ACE_TRY_CHECK;
00549         }
00550     }
00551   ACE_CATCHANY
00552     {
00553       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamCtrl::start");
00554       return;
00555     }
00556   ACE_ENDTRY;
00557   ACE_CHECK;
00558 }
00559 
00560 // Tears down the stream. This will close the connection, and delete
00561 // the streamendpoint and vdev associated with this stream
00562 // Empty the_spec means apply operation to all flows
00563 void
00564 TAO_StreamCtrl::destroy (const AVStreams::flowSpec &flow_spec
00565                          ACE_ENV_ARG_DECL)
00566     ACE_THROW_SPEC ((CORBA::SystemException,
00567                      AVStreams::noSuchFlow))
00568 {
00569   ACE_TRY
00570     {
00571       TAO_Basic_StreamCtrl::destroy (flow_spec ACE_ENV_ARG_PARAMETER);
00572       ACE_TRY_CHECK;
00573       if (this->flow_connection_map_.current_size () > 0)
00574         return;
00575 
00576       MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
00577       MMDevice_Map::ENTRY *entry = 0;
00578       for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
00579         {
00580           entry->int_id_.sep_->destroy (flow_spec ACE_ENV_ARG_PARAMETER);
00581           ACE_TRY_CHECK;
00582         }
00583       MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
00584       for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
00585         {
00586           entry->int_id_.sep_->destroy (flow_spec ACE_ENV_ARG_PARAMETER);
00587           ACE_TRY_CHECK;
00588         }
00589     }
00590   ACE_CATCHANY
00591     {
00592       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamCtrl::destroy");
00593       return;
00594     }
00595   ACE_ENDTRY;
00596   ACE_CHECK;
00597 
00598   int result = TAO_AV_Core::deactivate_servant (this);
00599   if (result < 0)
00600     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamCtrl::destroy failed\n"));
00601 }
00602 
00603 // request the two MMDevices to create vdev and stream endpoints. save
00604 // the references returned.
00605 
00606 // The interaction diagram for this method is on page 13 of the spec
00607 CORBA::Boolean
00608 TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party,
00609                            AVStreams::MMDevice_ptr b_party,
00610                            AVStreams::streamQoS &the_qos,
00611                            const AVStreams::flowSpec &the_flows
00612                            ACE_ENV_ARG_DECL)
00613   ACE_THROW_SPEC ((CORBA::SystemException,
00614                    AVStreams::streamOpFailed,
00615                    AVStreams::noSuchFlow,
00616                    AVStreams::QoSRequestFailed))
00617 {
00618   ACE_TRY
00619     {
00620       if (CORBA::is_nil (a_party) && CORBA::is_nil (b_party))
00621         ACE_ERROR_RETURN ((LM_ERROR, "Both parties are nil\n"), 0);
00622       // Check to see if we have non-nil parties to bind!
00623       if (TAO_debug_level > 0)
00624         if (CORBA::is_nil (a_party) ||
00625             CORBA::is_nil (b_party))
00626           if (TAO_debug_level > 0)
00627             ACE_DEBUG ((LM_DEBUG,
00628                         "(%P|%t) TAO_StreamCtrl::bind_devs: "
00629                         "a_party or b_party is null"
00630                         "Multicast mode\n"));
00631 
00632       // Request a_party to create the endpoint and vdev
00633       CORBA::Boolean met_qos;
00634       CORBA::String_var named_vdev;
00635 
00636       if (!CORBA::is_nil (a_party))
00637         {
00638           MMDevice_Map_Hash_Key find_key (a_party);
00639           MMDevice_Map_Entry find_entry;
00640           int result =
00641             this->mmdevice_a_map_.find (find_key, find_entry);
00642           if (result == 0)
00643             {
00644               if (TAO_debug_level > 0)
00645                 {
00646                   // Already in the map.
00647                   if (TAO_debug_level > 0)
00648                     ACE_DEBUG ((LM_DEBUG, "mmdevice a_party is already bound\n"));
00649                 }
00650               return 1;
00651             }
00652           else
00653             {
00654               this->sep_a_ =
00655                 a_party-> create_A (this->streamctrl_.in (),
00656                                     this->vdev_a_.out (),
00657                                     the_qos,
00658                                     met_qos,
00659                                     named_vdev.inout (),
00660                                     the_flows
00661                                     ACE_ENV_ARG_PARAMETER);
00662               ACE_TRY_CHECK;
00663 
00664               if (TAO_debug_level > 0)
00665                 ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_StreamCtrl::create_A: succeeded\n"));
00666               // Define ourselves as the related_streamctrl property of the sep.
00667               CORBA::Any streamctrl_any;
00668               streamctrl_any <<= this->streamctrl_.in ();
00669               this->sep_a_->define_property ("Related_StreamCtrl",
00670                                              streamctrl_any
00671                                              ACE_ENV_ARG_PARAMETER);
00672               ACE_TRY_CHECK;
00673 
00674               CORBA::Any vdev_a_any;
00675               vdev_a_any <<= this->vdev_a_.in ();
00676               this->sep_a_->define_property ("Related_VDev",
00677                                              vdev_a_any
00678                                              ACE_ENV_ARG_PARAMETER);
00679               ACE_TRY_CHECK;
00680 
00681               CORBA::Any streamendpoint_a_any;
00682               streamendpoint_a_any <<= this->sep_a_.in ();
00683               this->vdev_a_->define_property ("Related_StreamEndpoint",
00684                                               streamendpoint_a_any
00685                                               ACE_ENV_ARG_PARAMETER);
00686 
00687               ACE_TRY_CHECK;
00688 
00689               CORBA::Any mmdevice_a_any;
00690               mmdevice_a_any <<= a_party;
00691               this->vdev_a_->define_property ("Related_MMDevice",
00692                                               mmdevice_a_any
00693                                               ACE_ENV_ARG_PARAMETER);
00694               ACE_TRY_CHECK;
00695 
00696               // add the mmdevice, sep and vdev to the map.
00697               MMDevice_Map_Entry map_entry;
00698               MMDevice_Map_Hash_Key key (a_party);
00699               map_entry.sep_ = AVStreams::StreamEndPoint_A::_duplicate (this->sep_a_.in ());
00700               map_entry.vdev_ = AVStreams::VDev::_duplicate (this->vdev_a_.in ());
00701               map_entry.flowspec_ = the_flows;
00702               map_entry.qos_ = the_qos;
00703               result =
00704                 this->mmdevice_a_map_.bind (key, map_entry);
00705               if (result < 0)
00706                 if (TAO_debug_level > 0)
00707                   ACE_DEBUG ((LM_DEBUG, "Error binding mmdevice entry in the a_map"));
00708             }
00709         }
00710       // Request b_party to create the endpoint and vdev
00711       if (!CORBA::is_nil (b_party))
00712         {
00713           MMDevice_Map_Hash_Key find_key (b_party);
00714           MMDevice_Map_Entry find_entry;
00715           int result =
00716             this->mmdevice_b_map_.find (find_key, find_entry);
00717           if (result == 0)
00718             {
00719               // Already in the map.
00720               if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "mmdevice b_party is already bound\n"));
00721               return 1;
00722             }
00723           else
00724             {
00725               this->sep_b_ =
00726                 b_party-> create_B (this->streamctrl_.in (),
00727                                     this->vdev_b_.out (),
00728                                     the_qos,
00729                                     met_qos,
00730                                     named_vdev.inout (),
00731                                     the_flows
00732                                     ACE_ENV_ARG_PARAMETER);
00733               ACE_TRY_CHECK;
00734 
00735               if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_StreamCtrl::create_B: succeeded\n"));
00736 
00737               if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
00738                           "\n(%P|%t)stream_endpoint_b_ = %s",
00739                           TAO_ORB_Core_instance ()->orb ()->object_to_string (this->sep_b_.in ()
00740                                                                               ACE_ENV_ARG_PARAMETER)));
00741               ACE_TRY_CHECK;
00742               // Define ourselves as the related_streamctrl property of the sep.
00743               CORBA::Any streamctrl_any;
00744               streamctrl_any <<= this->streamctrl_.in ();
00745               this->sep_b_->define_property ("Related_StreamCtrl",
00746                                              streamctrl_any
00747                                              ACE_ENV_ARG_PARAMETER);
00748               ACE_TRY_CHECK;
00749 
00750               CORBA::Any vdev_b_any;
00751               vdev_b_any <<= this->vdev_b_.in ();
00752               this->sep_b_->define_property ("Related_VDev",
00753                                              vdev_b_any
00754                                              ACE_ENV_ARG_PARAMETER);
00755               ACE_TRY_CHECK;
00756 
00757               CORBA::Any streamendpoint_b_any;
00758               streamendpoint_b_any <<= this->sep_b_.in ();
00759               this->vdev_b_->define_property ("Related_StreamEndpoint",
00760                                               streamendpoint_b_any
00761                                               ACE_ENV_ARG_PARAMETER);
00762 
00763               ACE_TRY_CHECK;
00764 
00765               CORBA::Any mmdevice_b_any;
00766               mmdevice_b_any <<= b_party;
00767               this->vdev_b_->define_property ("Related_MMDevice",
00768                                               mmdevice_b_any
00769                                               ACE_ENV_ARG_PARAMETER);
00770               ACE_TRY_CHECK;
00771               // add the mmdevice, sep and vdev to the map.
00772               MMDevice_Map_Entry map_entry;
00773               MMDevice_Map_Hash_Key key (b_party);
00774               map_entry.sep_ = AVStreams::StreamEndPoint::_duplicate (this->sep_b_.in ());
00775               map_entry.vdev_ = AVStreams::VDev::_duplicate(this->vdev_b_.in ());
00776               map_entry.flowspec_ = the_flows;
00777               map_entry.qos_ = the_qos;
00778               int result =
00779                 this->mmdevice_b_map_.bind (key, map_entry);
00780               if (result < 0)
00781                 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Error binding mmdevice entry in the b_map"));
00782             }
00783         }
00784 
00785       // Tell the endpoints about each other.
00786       if ((!CORBA::is_nil (a_party)) && (!CORBA::is_nil (b_party)))
00787         {
00788           CORBA::Any sep_a_peer_any;
00789           CORBA::Any sep_b_peer_any;
00790 
00791           sep_a_peer_any <<= this->sep_b_.in();
00792           sep_b_peer_any <<= this->sep_a_.in();
00793           this->sep_a_->define_property ("PeerAdapter",
00794                                           sep_a_peer_any
00795                                           ACE_ENV_ARG_PARAMETER);
00796           ACE_TRY_CHECK;
00797 
00798           this->sep_b_->define_property ("PeerAdapter",
00799                                          sep_b_peer_any
00800                                          ACE_ENV_ARG_PARAMETER);
00801           ACE_TRY_CHECK;
00802         }
00803 
00804       // In the full profile case there's no VDev.
00805       if (CORBA::is_nil (b_party) && (!CORBA::is_nil (this->vdev_a_.in ())))
00806         {
00807           // Now set the source id for this A endpoint.
00808           // If the sep contains flow producers then set the source ids for those
00809           // instead.
00810           ACE_TRY_EX (set_source_id)
00811             {
00812               CORBA::Any_ptr flows_any = this->sep_a_->get_property_value ("Flows"
00813                                                                            ACE_ENV_ARG_PARAMETER);
00814               ACE_TRY_CHECK_EX (set_source_id);
00815               AVStreams::flowSpec_var flows;
00816               *flows_any >>= flows.out ();
00817               for (CORBA::ULong i=0; i< flows->length ();++i)
00818                 {
00819                   CORBA::Object_var fep_obj =
00820                     this->sep_a_->get_fep (flows [i] ACE_ENV_ARG_PARAMETER);
00821                   ACE_TRY_CHECK_EX (set_source_id);
00822                   ACE_TRY_EX (producer_check)
00823                     {
00824                       AVStreams::FlowProducer_var producer =
00825                         AVStreams::FlowProducer::_narrow (fep_obj.in ()
00826                                                           ACE_ENV_ARG_PARAMETER);
00827                       ACE_TRY_CHECK_EX (producer_check);
00828                       producer->set_source_id (this->source_id_++);
00829                     }
00830                   ACE_CATCHANY
00831                     {
00832                       if (TAO_debug_level > 0)
00833                         ACE_DEBUG ((LM_DEBUG, " %s ", static_cast<char const*>(flows[i])));
00834 
00835                       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "producer_check: not a producer");
00836 
00837                     }
00838                   ACE_ENDTRY;
00839                   ACE_CHECK_RETURN (0);
00840                 }
00841             }
00842           ACE_CATCHANY
00843             {
00844               // Since the full profile failed try setting the source id
00845               // for the sep instead.
00846               // @@Naga: What do we do if in the light profile the sep has
00847               // many producers who do not have flow interfaces. Then
00848               // the streamctrl has to give an array of source ids to
00849               // the sep.
00850               this->sep_a_->set_source_id (this->source_id_++
00851                                            ACE_ENV_ARG_PARAMETER);
00852               ACE_TRY_CHECK;
00853             }
00854           ACE_ENDTRY;
00855           ACE_CHECK_RETURN (0);
00856           if (!this->mcastconfigif_)
00857             {
00858               ACE_NEW_RETURN (this->mcastconfigif_,
00859                               TAO_MCastConfigIf,
00860                               0);
00861               // @@: Deactivating the object thru poa means calling remove_ref after _this.
00862               this->mcastconfigif_ptr_ = this->mcastconfigif_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
00863               ACE_TRY_CHECK;
00864             }
00865           // Multicast source being added.
00866           CORBA::Boolean result = this->vdev_a_->set_Mcast_peer (this->streamctrl_.in (),
00867                                                                  this->mcastconfigif_ptr_.in (),
00868                                                                  the_qos,
00869                                                                  the_flows
00870                                                                  ACE_ENV_ARG_PARAMETER);
00871           if (!result)
00872             ACE_ERROR_RETURN ((LM_ERROR, "set_Mcast_peer failed\n"), 0);
00873         }
00874 
00875       if (CORBA::is_nil (a_party))
00876         {
00877           if (!CORBA::is_nil (this->vdev_b_.in ()))
00878             {
00879               // Multicast sink being added.
00880               if (!this->mcastconfigif_)
00881                 ACE_ERROR_RETURN ((LM_ERROR, "first add a source and then a sink\n"), 0);
00882               this->mcastconfigif_->set_peer (this->vdev_b_.in (),
00883                                               the_qos,
00884                                               the_flows
00885                                               ACE_ENV_ARG_PARAMETER);
00886               ACE_TRY_CHECK;
00887             }
00888 
00889           int connect_leaf_success = 0;
00890           ACE_TRY_EX (connect_leaf)
00891             {
00892               // @@: define null interfaces for Atm so that they can be implemented once
00893               //  ACE adds support for ATM multicast.
00894               connect_leaf_success = this->sep_a_->connect_leaf (this->sep_b_.in (),
00895                                                                  the_qos,
00896                                                                  the_flows
00897                                                                  ACE_ENV_ARG_PARAMETER);
00898               ACE_TRY_CHECK_EX (connect_leaf);
00899               connect_leaf_success = 1;
00900             }
00901           ACE_CATCH (AVStreams::notSupported, ex)
00902             {
00903               if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "connect_leaf failed\n"));
00904               connect_leaf_success = 0;
00905             }
00906           ACE_CATCHANY
00907             {
00908               ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamCtrl::bind_devs");
00909             }
00910           ACE_ENDTRY;
00911           ACE_CHECK_RETURN (0);
00912           if (!connect_leaf_success)
00913             {
00914               if (TAO_debug_level > 0)
00915                 ACE_DEBUG ((LM_DEBUG,"TAO_StreamCtrl::bind_devs Multiconnect\n"));
00916               AVStreams::flowSpec connect_flows = the_flows;
00917               this->sep_a_->multiconnect (the_qos, connect_flows ACE_ENV_ARG_PARAMETER);
00918               ACE_TRY_CHECK;
00919               this->sep_b_->multiconnect (the_qos, connect_flows ACE_ENV_ARG_PARAMETER);
00920               ACE_TRY_CHECK;
00921             }
00922         }
00923 
00924       if (!CORBA::is_nil (a_party) && !CORBA::is_nil (b_party))
00925         {
00926           // Check to see if the MMDevice contains FDev objects
00927           // If it contains FDev objects, then we are using the
00928           // Full profile, and we want to call bind() instead
00929           // of connect() on the the streamctrl
00930           if( a_party->is_property_defined("Flows") &&
00931               b_party->is_property_defined("Flows") )
00932           {
00933               if (TAO_debug_level > 0) {
00934                 ACE_DEBUG ((LM_DEBUG, "(%N,%l) Full profile, invoking bind()\n"));
00935               }
00936 
00937               // It is full profile
00938               // we have feps in the sep then dont call connect
00939               // instead call bind on the streamctrl.
00940               this->bind (this->sep_a_.in (),
00941                           this->sep_b_.in (),
00942                           the_qos,
00943                           the_flows
00944                           ACE_ENV_ARG_PARAMETER);
00945               ACE_TRY_CHECK;
00946 
00947 
00948 
00949           }
00950           // This is the light profile, call connect()
00951           else  if (!CORBA::is_nil (this->vdev_a_.in ()) && !CORBA::is_nil (this->vdev_b_.in ()))
00952           {
00953               if (TAO_debug_level > 0) {
00954                 ACE_DEBUG ((LM_DEBUG, "(%N,%l) Light profile, invoking connect()\n"));
00955               }
00956 
00957               // Tell the 2 VDev's about one another
00958               this->vdev_a_->set_peer (this->streamctrl_.in (),
00959                                        this->vdev_b_.in (),
00960                                        the_qos,
00961                                        the_flows
00962                                        ACE_ENV_ARG_PARAMETER);
00963 
00964               ACE_TRY_CHECK;
00965               this->vdev_b_->set_peer (this->streamctrl_.in (),
00966                                        this->vdev_a_.in (),
00967                                        the_qos,
00968                                        the_flows
00969                                        ACE_ENV_ARG_PARAMETER);
00970 
00971               ACE_TRY_CHECK;
00972 
00973               // Now connect the streams together. This will
00974               // establish the connection
00975               CORBA::Boolean result  =
00976                 this->sep_a_->connect (this->sep_b_.in (),
00977                                        the_qos,
00978                                        the_flows
00979                                        ACE_ENV_ARG_PARAMETER);
00980               ACE_TRY_CHECK;
00981               if (result == 0)
00982                 ACE_ERROR_RETURN ((LM_ERROR, "sep_a->connect (sep_b) failed\n"), 0);
00983           }
00984         }
00985     }
00986   ACE_CATCHANY
00987     {
00988       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamCtrl::bind_devs");
00989       return 0;
00990     }
00991   ACE_ENDTRY;
00992   ACE_CHECK_RETURN (0);
00993   return 1;
00994 }
00995 
00996 // Used to establish a connection between two endpoints
00997 // directly, i.e. without a MMDevice
00998 CORBA::Boolean
00999 TAO_StreamCtrl::bind (AVStreams::StreamEndPoint_A_ptr sep_a,
01000                       AVStreams::StreamEndPoint_B_ptr sep_b,
01001                       AVStreams::streamQoS &stream_qos,
01002                       const AVStreams::flowSpec &flow_spec
01003                       ACE_ENV_ARG_DECL)
01004   ACE_THROW_SPEC ((CORBA::SystemException,
01005                    AVStreams::streamOpFailed,
01006                    AVStreams::noSuchFlow,
01007                    AVStreams::QoSRequestFailed))
01008 {
01009   this->sep_a_ = AVStreams::StreamEndPoint_A::_duplicate(sep_a);
01010   this->sep_b_ = AVStreams::StreamEndPoint_B::_duplicate(sep_b);
01011 
01012   int result = 0;
01013   ACE_TRY
01014     {
01015       if (CORBA::is_nil (sep_a_.in() ) ||
01016           CORBA::is_nil (sep_b_.in() ))
01017         ACE_ERROR_RETURN ((LM_ERROR,
01018                            "(%P|%t) TAO_StreamCtrl::bind:"
01019                            "a_party or b_party null!"),
01020                           0);
01021 
01022       // Define each other as their peers.
01023       CORBA::Any sep_any;
01024       sep_any <<= sep_b;
01025       sep_a_->define_property ("PeerAdapter",
01026                               sep_any
01027                               ACE_ENV_ARG_PARAMETER);
01028       ACE_TRY_CHECK;
01029       sep_any <<= sep_a;
01030       sep_b_->define_property ("PeerAdapter",
01031                               sep_any
01032                               ACE_ENV_ARG_PARAMETER);
01033       ACE_TRY_CHECK;
01034       // since its full profile we do the viable stream setup algorithm.
01035       // get the flows for the A streamendpoint.
01036       // the flows spec is empty and hence we do a exhaustive match.
01037       AVStreams::flowSpec a_flows, b_flows;
01038       CORBA::Any_var flows_any;
01039       flows_any = sep_a_->get_property_value ("Flows" ACE_ENV_ARG_PARAMETER);
01040       ACE_TRY_CHECK;
01041       AVStreams::flowSpec *temp_flows;
01042       flows_any.in () >>= temp_flows;
01043       a_flows = *temp_flows;
01044       flows_any = sep_b_->get_property_value ("Flows" ACE_ENV_ARG_PARAMETER);
01045       ACE_TRY_CHECK;
01046       flows_any.in () >>= temp_flows;
01047       b_flows = *temp_flows;
01048       u_int i;
01049       FlowEndPoint_Map *a_fep_map;
01050       FlowEndPoint_Map *b_fep_map;
01051       ACE_NEW_RETURN (a_fep_map,
01052                       FlowEndPoint_Map,
01053                       0);
01054       ACE_NEW_RETURN (b_fep_map,
01055                       FlowEndPoint_Map,
01056                       0);
01057       for (i=0;i<a_flows.length ();i++)
01058         {
01059           const char *flowname = a_flows[i];
01060           // get the flowendpoint references.
01061           CORBA::Object_var fep_obj =
01062             sep_a_->get_fep (flowname
01063                             ACE_ENV_ARG_PARAMETER);
01064           ACE_TRY_CHECK;
01065           AVStreams::FlowEndPoint_var fep =
01066             AVStreams::FlowEndPoint::_narrow (fep_obj.in ()
01067                                               ACE_ENV_ARG_PARAMETER);
01068           ACE_TRY_CHECK;
01069           ACE_CString fep_key (flowname);
01070           result = a_fep_map->bind (fep_key, fep);
01071           if (result == -1)
01072             if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamCtrl::bind failed for %s\n", flowname));
01073         }
01074       // get the flowendpoints for streamendpoint_b
01075       for (i=0;i<b_flows.length ();i++)
01076         {
01077           const char *flowname = b_flows[i];
01078           // get the flowendpoint references.
01079           CORBA::Object_var fep_obj =
01080             sep_b->get_fep (flowname
01081                             ACE_ENV_ARG_PARAMETER);
01082           ACE_TRY_CHECK;
01083           AVStreams::FlowEndPoint_var fep =
01084             AVStreams::FlowEndPoint::_narrow (fep_obj.in ()
01085                                               ACE_ENV_ARG_PARAMETER);
01086           ACE_TRY_CHECK;
01087           ACE_CString fep_key (flowname);
01088           result = b_fep_map->bind (fep_key, fep);
01089           if (result == -1)
01090             if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamCtrl::bind failed for %s\n", flowname));
01091         }
01092       FlowEndPoint_Map *map_a = 0, *map_b = 0;
01093       if (flow_spec.length () == 0)
01094         {
01095           map_a = a_fep_map;
01096           map_b = b_fep_map;
01097         }
01098       else
01099         {
01100           FlowEndPoint_Map *spec_fep_map_a, *spec_fep_map_b;
01101           ACE_NEW_RETURN (spec_fep_map_a,
01102                           FlowEndPoint_Map,
01103                           0);
01104           ACE_NEW_RETURN (spec_fep_map_b,
01105                           FlowEndPoint_Map,
01106                           0);
01107           for (i=0; i< flow_spec.length ();i++)
01108             {
01109               TAO_Forward_FlowSpec_Entry *entry = 0;
01110               ACE_NEW_RETURN (entry,
01111                               TAO_Forward_FlowSpec_Entry,
01112                               0);
01113               entry->parse (flow_spec[i]);
01114               ACE_CString fep_key (entry->flowname ());
01115               AVStreams::FlowEndPoint_var fep;
01116               result = a_fep_map->find (fep_key, fep);
01117               if (result == -1)
01118                 ACE_ERROR_RETURN ((LM_ERROR, "Fep not found on A side for flowname: %s\n", flow_spec[i]), 0);
01119 
01120               result = spec_fep_map_a->bind (fep_key, fep);
01121               if (result == -1)
01122                 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Bind failed for %s\n", flow_spec[i]));
01123 
01124               result = b_fep_map->find (fep_key, fep);
01125               if (result == -1)
01126                 ACE_ERROR_RETURN ((LM_ERROR, "Fep not found on B side for flowname: %s\n", flow_spec[i]), 0);
01127 
01128               result = spec_fep_map_b->bind (fep_key, fep);
01129               if (result == -1)
01130                 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Bind failed for %s\n", flow_spec[i]));
01131             }
01132           map_a = spec_fep_map_a;
01133           map_b = spec_fep_map_b;
01134         }
01135 
01136       TAO_AV_QoS qos (stream_qos);
01137       // Now go thru the list of flow endpoint and match them.
01138       // uses the first match policy.
01139       FlowEndPoint_Map_Iterator a_feps_iterator (*map_a);
01140       FlowEndPoint_Map_Entry *a_feps_entry, *b_feps_entry;
01141       ACE_TRY_EX (flow_connect)
01142         {
01143 
01144           for (;a_feps_iterator.next (a_feps_entry) != 0;
01145                a_feps_iterator.advance ())
01146             {
01147               AVStreams::FlowEndPoint_var fep_a = a_feps_entry->int_id_;
01148               AVStreams::FlowEndPoint_var connected_to =
01149                 fep_a->get_connected_fep (ACE_ENV_SINGLE_ARG_PARAMETER);
01150               ACE_TRY_CHECK_EX (flow_connect);
01151 
01152               if (!CORBA::is_nil (connected_to.in ()))
01153                 {
01154                   // Skip this one, it is already connected...
01155                   continue;
01156                 }
01157 
01158               FlowEndPoint_Map_Iterator b_feps_iterator (*map_b);
01159               for (;b_feps_iterator.next (b_feps_entry) != 0;
01160                    b_feps_iterator.advance ())
01161                 {
01162                   AVStreams::FlowEndPoint_var fep_b = b_feps_entry->int_id_;
01163                   AVStreams::FlowConnection_var flow_connection;
01164 
01165                   AVStreams::FlowEndPoint_var connected_to =
01166                     fep_b->get_connected_fep (ACE_ENV_SINGLE_ARG_PARAMETER);
01167                   ACE_TRY_CHECK_EX (flow_connect);
01168 
01169                   if (!CORBA::is_nil (connected_to.in ()))
01170                     {
01171                       // Skip this one, it is already connected...
01172                       continue;
01173                     }
01174 
01175                   if (fep_a->is_fep_compatible (fep_b.in()
01176                                                 ACE_ENV_ARG_PARAMETER) == 1)
01177                     {
01178                       ACE_TRY_CHECK_EX (flow_connect);
01179                       // assume that flow names are same so that we
01180                       // can use either of them.
01181                       CORBA::Object_var flow_connection_obj;
01182                       CORBA::Any_var flowname_any =
01183                         fep_a->get_property_value ("FlowName"
01184                                                    ACE_ENV_ARG_PARAMETER);
01185                       ACE_TRY_CHECK_EX (flow_connect);
01186                       const char *flowname = 0;
01187                       flowname_any.in () >>= flowname;
01188                       ACE_TRY_EX (flow_connection)
01189                         {
01190                           flow_connection_obj =
01191                             this->get_flow_connection (flowname
01192                                                        ACE_ENV_ARG_PARAMETER);
01193                           ACE_TRY_CHECK_EX (flow_connection);
01194                           flow_connection =
01195                             AVStreams::FlowConnection::_narrow (flow_connection_obj.in ()
01196                                                                 ACE_ENV_ARG_PARAMETER);
01197                           ACE_TRY_CHECK_EX (flow_connection);
01198                         }
01199                       ACE_CATCHANY
01200                         {
01201                           TAO_FlowConnection *flowConnection;
01202                           ACE_NEW_RETURN (flowConnection,
01203                                           TAO_FlowConnection,
01204                                           0);
01205                           flow_connection = flowConnection->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
01206                           ACE_TRY_CHECK_EX (flow_connect);
01207                           this->set_flow_connection (flowname,
01208                                                      flow_connection.in ()
01209                                                      ACE_ENV_ARG_PARAMETER);
01210                           ACE_TRY_CHECK_EX (flow_connect);
01211                         }
01212                       ACE_ENDTRY;
01213                       ACE_CHECK_RETURN (0);
01214 
01215                       // make sure that a_feps is flow_producer
01216                       // and b_feps is flow_consumer
01217                       // There should be a way to find which flow
01218                       // endpoint is producer and which is
01219                       // consumer.
01220 
01221                       AVStreams::FlowProducer_var producer;
01222                       AVStreams::FlowConsumer_var consumer;
01223 
01224                       ACE_TRY_EX (producer_check)
01225                         {
01226                           producer =
01227                             AVStreams::FlowProducer::_narrow (fep_a.in()
01228                                                               ACE_ENV_ARG_PARAMETER);
01229                           ACE_TRY_CHECK_EX (producer_check);
01230                           consumer =
01231                             AVStreams::FlowConsumer::_narrow (fep_b.in()
01232                                                               ACE_ENV_ARG_PARAMETER);
01233                           ACE_TRY_CHECK_EX (producer_check);
01234 
01235                           // If the types don't match then try in
01236                           // the opposite order
01237                           if (CORBA::is_nil (producer.in ()))
01238                             {
01239                               producer =
01240                                 AVStreams::FlowProducer::_narrow (fep_b.in()
01241                                                                   ACE_ENV_ARG_PARAMETER);
01242                               ACE_TRY_CHECK_EX (producer_check);
01243                               consumer =
01244                                 AVStreams::FlowConsumer::_narrow (fep_a.in()
01245                                                                   ACE_ENV_ARG_PARAMETER);
01246                               ACE_TRY_CHECK_EX (producer_check);
01247                             }
01248                           // At this point they should both be
01249                           // non-nil
01250                           // @@ raise an exception (which one?) if
01251                           // this is not true...
01252                           ACE_ASSERT (!CORBA::is_nil (producer.in ()));
01253                           ACE_ASSERT (!CORBA::is_nil (consumer.in ()));
01254                         }
01255                       ACE_CATCHANY
01256                         {
01257                           //Yamuna : Recheck this
01258                           ACE_RE_THROW;//_EX (producer_check);
01259                         }
01260                       ACE_ENDTRY;
01261                       ACE_CHECK_RETURN (0);
01262                       CORBA::String_var fep_a_name, fep_b_name;
01263                       flowname_any = fep_a->get_property_value ("FlowName"
01264                                                                 ACE_ENV_ARG_PARAMETER);
01265                       const char *temp_name;
01266                       flowname_any.in () >>= temp_name;
01267                       fep_a_name = CORBA::string_dup (temp_name);
01268                       flowname_any = fep_b->get_property_value ("FlowName"
01269                                                                 ACE_ENV_ARG_PARAMETER);
01270                       flowname_any.in () >>= temp_name;
01271                       fep_b_name = CORBA::string_dup (temp_name);
01272                       AVStreams::QoS flow_qos;
01273                       flow_qos.QoSType = fep_a_name;
01274                       flow_qos.QoSParams.length (0);
01275                       result = qos.get_flow_qos (fep_a_name.in (), flow_qos);
01276                       if (result == -1)
01277                         {
01278                           flow_qos.QoSType = fep_b_name;
01279                           result = qos.get_flow_qos (fep_b_name.in (),
01280                                                      flow_qos);
01281                           if (result == -1 && TAO_debug_level > 0)
01282                             ACE_DEBUG ((LM_DEBUG,
01283                                         "No QoS Specified for this flow <%s>\n", flowname));
01284                         }
01285                       flow_connection->connect (producer.in (),
01286                                                 consumer.in (),
01287                                                 flow_qos
01288                                                 ACE_ENV_ARG_PARAMETER);
01289                       ACE_TRY_CHECK_EX (flow_connect);
01290                     }
01291                 }
01292             }
01293         }
01294       ACE_CATCHANY
01295         {
01296           ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
01297                                "TAO_StreamCtrl::bind:flow_connect block");
01298           return 0;
01299         }
01300       ACE_ENDTRY;
01301       ACE_CHECK_RETURN (0);
01302     }
01303   ACE_CATCHANY
01304     {
01305       // error was thrown because one of the streamendpoints is light profile.
01306       // Now connect the streams together
01307       this->sep_a_->connect (this->sep_b_.in (),
01308                              stream_qos,
01309                              flow_spec
01310                              ACE_ENV_ARG_PARAMETER);
01311       ACE_TRY_CHECK;
01312     }
01313   ACE_ENDTRY;
01314   ACE_CHECK_RETURN (0);
01315   return 1;
01316 }
01317 
01318 void
01319 TAO_StreamCtrl::unbind (ACE_ENV_SINGLE_ARG_DECL)
01320   ACE_THROW_SPEC ((CORBA::SystemException,
01321                    AVStreams::streamOpFailed))
01322 {
01323   ACE_TRY
01324     {
01325       if (this->flow_connection_map_.current_size () > 0)
01326         return;
01327 
01328       AVStreams::flowSpec flow_spec;
01329       flow_spec.length(0);
01330 
01331       MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
01332       MMDevice_Map::ENTRY *entry = 0;
01333       for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
01334         {
01335           entry->int_id_.sep_->destroy (flow_spec ACE_ENV_ARG_PARAMETER);
01336           ACE_TRY_CHECK;
01337         }
01338       MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
01339       for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
01340         {
01341           entry->int_id_.sep_->destroy (flow_spec ACE_ENV_ARG_PARAMETER);
01342           ACE_TRY_CHECK;
01343         }
01344     }
01345   ACE_CATCHANY
01346     {
01347       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamCtrl::unbind");
01348       return;
01349     }
01350   ACE_ENDTRY;
01351   ACE_CHECK;
01352 }
01353 
01354 void
01355 TAO_StreamCtrl::unbind_party (AVStreams::StreamEndPoint_ptr /* the_ep */,
01356                               const AVStreams::flowSpec &/* the_spec */
01357                               ACE_ENV_ARG_DECL_NOT_USED)
01358   ACE_THROW_SPEC ((CORBA::SystemException,
01359                    AVStreams::streamOpFailed,
01360                    AVStreams::noSuchFlow))
01361 {
01362 }
01363 
01364 void
01365 TAO_StreamCtrl::unbind_dev (AVStreams::MMDevice_ptr /* dev */,
01366                             const AVStreams::flowSpec & /* the_spec */
01367                             ACE_ENV_ARG_DECL_NOT_USED)
01368   ACE_THROW_SPEC ((CORBA::SystemException,
01369                    AVStreams::streamOpFailed,
01370                    AVStreams::noSuchFlow))
01371 {
01372 }
01373 
01374 AVStreams::VDev_ptr
01375 TAO_StreamCtrl::get_related_vdev (AVStreams::MMDevice_ptr adev,
01376                                   AVStreams::StreamEndPoint_out sep
01377                                   ACE_ENV_ARG_DECL_NOT_USED)
01378   ACE_THROW_SPEC ((CORBA::SystemException,
01379                    AVStreams::streamOpFailed))
01380 {
01381   MMDevice_Map_Hash_Key key (adev);
01382   MMDevice_Map_Entry entry;
01383   int result = -1;
01384   result = this->mmdevice_a_map_.find (key, entry);
01385   if (result < 0)
01386     {
01387       result = this->mmdevice_a_map_.find (key, entry);
01388       if (result < 0)
01389         return AVStreams::VDev::_nil ();
01390     }
01391   sep = AVStreams::StreamEndPoint::_duplicate (entry.sep_.in ());
01392   return AVStreams::VDev::_duplicate (entry.vdev_.in ());
01393 }
01394 
01395 CORBA::Boolean
01396 
01397 TAO_StreamCtrl::modify_QoS (AVStreams::streamQoS &new_qos,
01398                             const AVStreams::flowSpec &the_spec
01399                             ACE_ENV_ARG_DECL)
01400   ACE_THROW_SPEC ((CORBA::SystemException,
01401                    AVStreams::noSuchFlow,
01402                    AVStreams::QoSRequestFailed))
01403 {
01404   if (TAO_debug_level > 0)
01405   ACE_DEBUG ((LM_DEBUG,
01406               "TAO_StreamCtrl::modify_QoS\n"));
01407 
01408 
01409   if (this->mcastconfigif_ != 0)
01410     {
01411       // call modify_Qos on the root VDev which is the mcast configif.
01412       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Cannot Modify the Qos for multipoint streams\n"));
01413     }
01414   else
01415     {
01416       ACE_TRY
01417         {
01418           AVStreams::flowSpec in_flowspec;
01419           AVStreams::flowSpec out_flowspec;
01420 
01421           in_flowspec.length (0);
01422           out_flowspec.length (0);
01423 
01424           int in_index = 0;
01425           int out_index = 0;
01426 
01427           AVStreams::flowSpec flowspec;
01428           if (the_spec.length () == 0)
01429             {
01430               // Apply modify_qos to all the flows
01431               flowspec = this->flows_;
01432               MMDevice_Map_Iterator iterator (this->mmdevice_a_map_);
01433               MMDevice_Map::ENTRY *entry = 0;
01434               for (;iterator.next (entry) !=  0;iterator.advance ())
01435                 {
01436                   flowspec = entry->int_id_.flowspec_;
01437                 }
01438             }
01439           else
01440             {
01441               flowspec = the_spec;
01442             }
01443 
01444           if (TAO_debug_level > 0)
01445             ACE_DEBUG ((LM_DEBUG,
01446                         "TAO_StreamCtrl::modify_QoS\n"));
01447 
01448 
01449           for (u_int i=0;i < flowspec.length ();i++)
01450             {
01451               TAO_Forward_FlowSpec_Entry entry;
01452               entry.parse (flowspec [i]);
01453               int direction = entry.direction ();
01454               if (direction == 0)
01455                 {
01456                   in_flowspec.length (in_index + 1);
01457                   in_flowspec [in_index++] = CORBA::string_dup (entry.entry_to_string ());
01458                 }
01459               else
01460                 {
01461                   out_flowspec.length (out_index + 1);
01462                   out_flowspec [out_index++] = CORBA::string_dup (entry.entry_to_string ());
01463                 }
01464             }
01465 
01466           if (in_flowspec.length () != 0)
01467             {
01468               this->vdev_a_->modify_QoS (new_qos, in_flowspec ACE_ENV_ARG_PARAMETER);
01469               ACE_TRY_CHECK;
01470             }
01471 
01472           if (out_flowspec.length () != 0)
01473             {
01474               this->vdev_b_->modify_QoS (new_qos, out_flowspec ACE_ENV_ARG_PARAMETER);
01475               ACE_TRY_CHECK;
01476             }
01477         }
01478       ACE_CATCHANY
01479         {
01480           ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamCtrl::modify_QoS");
01481           return 0;
01482         }
01483       ACE_ENDTRY;
01484       ACE_CHECK_RETURN (0);
01485 
01486     }
01487   return 1;
01488 }
01489 
01490 // ----------------------------------------------------------------------
01491 // TAO_MCastConfigIf
01492 // ----------------------------------------------------------------------
01493 
01494 TAO_MCastConfigIf::TAO_MCastConfigIf (void)
01495   :peer_list_iterator_ (peer_list_)
01496 {
01497 }
01498 
01499 TAO_MCastConfigIf::~TAO_MCastConfigIf (void)
01500 {
01501   //no-op
01502 }
01503 
01504 // In future this should be a multicast message instead of point-to-point unicasts.
01505 CORBA::Boolean
01506 TAO_MCastConfigIf::set_peer (CORBA::Object_ptr peer,
01507                              AVStreams::streamQoS & qos,
01508                              const AVStreams::flowSpec & flow_spec
01509                              ACE_ENV_ARG_DECL)
01510   ACE_THROW_SPEC ((CORBA::SystemException,
01511                    AVStreams::QoSRequestFailed,
01512                    AVStreams::streamOpFailed))
01513 {
01514   ACE_TRY
01515     {
01516       Peer_Info *info;
01517       ACE_NEW_RETURN (info,
01518                       Peer_Info,
01519                       0);
01520       info->peer_ = AVStreams::VDev::_narrow (peer ACE_ENV_ARG_PARAMETER);
01521       ACE_TRY_CHECK;
01522       info->qos_ = qos;
01523       info->flow_spec_ = flow_spec;
01524       this->peer_list_.insert_tail (info);
01525     }
01526   ACE_CATCHANY
01527     {
01528       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MCastConfigIf::set_peer");
01529       return 0;
01530     }
01531   ACE_ENDTRY;
01532   ACE_CHECK_RETURN (0);
01533   return 1;
01534 }
01535 
01536 // In future this should be a multicast message instead of point-to-point unicasts.
01537 void
01538 TAO_MCastConfigIf::configure (const CosPropertyService::Property & a_configuration
01539                               ACE_ENV_ARG_DECL)
01540   ACE_THROW_SPEC ((CORBA::SystemException))
01541 {
01542   Peer_Info *info;
01543   ACE_TRY
01544     {
01545       for (this->peer_list_iterator_.first ();
01546            (info = this->peer_list_iterator_.next ()) != 0;
01547            this->peer_list_iterator_.advance ())
01548         {
01549           info->peer_->configure (a_configuration ACE_ENV_ARG_PARAMETER);
01550           ACE_TRY_CHECK;
01551         }
01552     }
01553   ACE_CATCHANY
01554     {
01555       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MCastConfigIf::set_configure");
01556       return;
01557     }
01558   ACE_ENDTRY;
01559   ACE_CHECK;
01560 }
01561 
01562 
01563 void
01564 TAO_MCastConfigIf::set_initial_configuration (const CosPropertyService::Properties &initial
01565                                               ACE_ENV_ARG_DECL_NOT_USED)
01566   ACE_THROW_SPEC ((CORBA::SystemException))
01567 {
01568   this->initial_configuration_ = initial;
01569 }
01570 
01571 // In future this should be a multicast message instead of point-to-point unicasts.
01572 void
01573 TAO_MCastConfigIf::set_format (const char * flowName,
01574                                const char * format_name
01575                                ACE_ENV_ARG_DECL)
01576   ACE_THROW_SPEC ((CORBA::SystemException,
01577                    AVStreams::notSupported))
01578 {
01579   Peer_Info *info;
01580   ACE_TRY
01581     {
01582       for (this->peer_list_iterator_.first ();
01583            (info = this->peer_list_iterator_.next ()) != 0;
01584            this->peer_list_iterator_.advance ())
01585         {
01586           if (this->in_flowSpec (info->flow_spec_, flowName))
01587             {
01588               info->peer_->set_format (flowName, format_name ACE_ENV_ARG_PARAMETER);
01589               ACE_TRY_CHECK;
01590             }
01591         }
01592     }
01593   ACE_CATCHANY
01594     {
01595       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MCastConfigIf::set_format");
01596       return;
01597     }
01598   ACE_ENDTRY;
01599   ACE_CHECK;
01600 }
01601 
01602 // In future this should be a multicast message instead of point-to-point unicasts.
01603 void
01604 TAO_MCastConfigIf::set_dev_params (const char * flowName,
01605                                    const CosPropertyService::Properties & new_params
01606                                    ACE_ENV_ARG_DECL)
01607   ACE_THROW_SPEC ((CORBA::SystemException,
01608                    AVStreams::PropertyException,
01609                    AVStreams::streamOpFailed))
01610 {
01611   Peer_Info *info;
01612   ACE_TRY
01613     {
01614 
01615       for (this->peer_list_iterator_.first ();
01616            (info = this->peer_list_iterator_.next ()) != 0;
01617            this->peer_list_iterator_.advance ())
01618         {
01619           if (this->in_flowSpec (info->flow_spec_, flowName))
01620             {
01621               info->peer_->set_dev_params (flowName, new_params ACE_ENV_ARG_PARAMETER);
01622               ACE_TRY_CHECK;
01623             }
01624         }
01625     }
01626   ACE_CATCHANY
01627     {
01628       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MCastConfigIf::set_dev_params");
01629       return;
01630     }
01631   ACE_ENDTRY;
01632   ACE_CHECK;
01633 }
01634 
01635 int
01636 TAO_MCastConfigIf::in_flowSpec (const AVStreams::flowSpec& flow_spec, const char *flow_name)
01637 {
01638   size_t len = ACE_OS::strlen (flow_name);
01639   for (CORBA::ULong i = 0; i < flow_spec.length (); i++)
01640     if (ACE_OS::strncmp (flow_spec[i], flow_name, len) == 0)
01641       {
01642         return 1;
01643       }
01644   return 0;
01645 }
01646 
01647 // ----------------------------------------------------------------------
01648 // TAO_Base_StreamEndPoint
01649 // ----------------------------------------------------------------------
01650 
01651 TAO_Base_StreamEndPoint::TAO_Base_StreamEndPoint (void)
01652   : protocol_object_set_ (0)
01653 {
01654 }
01655 
01656 TAO_Base_StreamEndPoint::~TAO_Base_StreamEndPoint (void)
01657 {
01658 }
01659 
01660 int
01661 TAO_Base_StreamEndPoint::handle_close (void)
01662 {
01663   // This method should not be defined, but EGCS complains endlessly
01664   // about it.
01665   return -1;
01666 }
01667 
01668 int
01669 TAO_Base_StreamEndPoint::handle_open (void)
01670 {
01671   return 0;
01672 }
01673 
01674 int
01675 TAO_Base_StreamEndPoint::handle_stop (const AVStreams::flowSpec &
01676                                       ACE_ENV_ARG_DECL_NOT_USED)
01677 {
01678   return 0;
01679 }
01680 
01681 int
01682 TAO_Base_StreamEndPoint::handle_start (const AVStreams::flowSpec &
01683                                        ACE_ENV_ARG_DECL_NOT_USED)
01684 {
01685   return 0;
01686 }
01687 
01688 int
01689 TAO_Base_StreamEndPoint::handle_destroy (const AVStreams::flowSpec &
01690                                          ACE_ENV_ARG_DECL_NOT_USED)
01691 {
01692   return 0;
01693 }
01694 
01695 // The following function is for backward compatibility.
01696 CORBA::Boolean
01697 TAO_Base_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &)
01698 {
01699   return 1;
01700 }
01701 
01702 // The following function is for backward compatibility.
01703 CORBA::Boolean
01704 TAO_Base_StreamEndPoint::handle_postconnect (AVStreams::flowSpec &)
01705 {
01706 
01707   while (!this->is_protocol_object_set ())
01708     TAO_AV_CORE::instance ()->orb ()->perform_work ();
01709   return 1;
01710 }
01711 
01712 // The following function is for backward compatibility.
01713 CORBA::Boolean
01714 TAO_Base_StreamEndPoint::handle_connection_requested (AVStreams::flowSpec &
01715                                                  ACE_ENV_ARG_DECL_NOT_USED)
01716 {
01717   return 1;
01718 }
01719 
01720 int
01721 TAO_Base_StreamEndPoint::set_protocol_object (const char * /*flowname*/,
01722                                               TAO_AV_Protocol_Object * /*sfp_object*/)
01723 {
01724   return -1;
01725 }
01726 
01727 void
01728 TAO_Base_StreamEndPoint::protocol_object_set (void)
01729 {
01730   this->protocol_object_set_ = 1;
01731 }
01732 
01733 
01734 int
01735 TAO_Base_StreamEndPoint::is_protocol_object_set (void)
01736 {
01737   return this->protocol_object_set_;
01738 }
01739 
01740 int
01741 TAO_Base_StreamEndPoint::get_callback (const char * /*flowname*/,
01742                                        TAO_AV_Callback *&/*sfp_callback*/)
01743 {
01744   return -1;
01745 }
01746 
01747 int
01748 TAO_Base_StreamEndPoint::get_control_callback (const char * /*flowname*/,
01749                                                TAO_AV_Callback *&/*sfp_callback*/)
01750 {
01751   return -1;
01752 }
01753 
01754 void
01755 TAO_Base_StreamEndPoint::set_flow_handler (const char *flowname,
01756                                            TAO_AV_Flow_Handler *handler)
01757 {
01758   if(TAO_debug_level > 1)
01759   {
01760      ACE_DEBUG ((LM_DEBUG, "(%N,%l) TAO_Base_StreamEndPoint::set_flow_handler(), flowname: %s\n", flowname));
01761   }
01762   ACE_CString flow_name_key (flowname);
01763   if (this->flow_handler_map_.bind (flow_name_key, handler) != 0)
01764     ACE_ERROR ((LM_ERROR,
01765                 "Error in storing flow handler\n"));
01766 }
01767 
01768 void
01769 TAO_Base_StreamEndPoint::set_control_flow_handler (const char *flowname,
01770                                                    TAO_AV_Flow_Handler *handler)
01771 {
01772   ACE_CString flow_name_key (flowname);
01773   if (this->control_flow_handler_map_.bind (flow_name_key, handler) != 0)
01774     ACE_ERROR ((LM_ERROR,
01775                 "Error in storing control flow handler\n"));
01776 }
01777 
01778 // ----------------------------------------------------------------------
01779 // TAO_StreamEndPoint
01780 // ----------------------------------------------------------------------
01781 
01782 // constructor.
01783 
01784 TAO_StreamEndPoint::TAO_StreamEndPoint (void)
01785   :flow_count_ (0),
01786    flow_num_ (0),
01787    mcast_port_ (ACE_DEFAULT_MULTICAST_PORT+1)
01788 {
01789   //is->mcast_addr_ = ACE_OS::inet_addr (ACE_DEFAULT_MULTICAST_ADDR);
01790   this->mcast_addr_.set (ACE_DEFAULT_MULTICAST_ADDR);
01791   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::TAO_StreamEndPoint::mcast_addr = %s", this->mcast_addr_.c_str ()));
01792   //  this->handle_open ();
01793 }
01794 
01795 
01796 CORBA::Boolean
01797 TAO_StreamEndPoint::connect (AVStreams::StreamEndPoint_ptr responder,
01798                              AVStreams::streamQoS &qos,
01799                              const AVStreams::flowSpec &the_spec
01800                              ACE_ENV_ARG_DECL)
01801   ACE_THROW_SPEC ((CORBA::SystemException,
01802                    AVStreams::noSuchFlow,
01803                    AVStreams::QoSRequestFailed,
01804                    AVStreams::streamOpFailed))
01805 {
01806   if (TAO_debug_level > 0)
01807     ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect ()\n"));
01808   CORBA::Boolean retv = 0;
01809   this->peer_sep_ = AVStreams::StreamEndPoint::_duplicate (responder);
01810   ACE_TRY_EX (negotiate)
01811     {
01812       if (!CORBA::is_nil (this->negotiator_.in ()))
01813         {
01814           ACE_DEBUG ((LM_DEBUG,
01815                       "NEGOTIATOR AVIALABLE\n"));
01816 
01817           CORBA::Any_var negotiator_any = responder->get_property_value ("Negotiator");
01818 
01819           AVStreams::Negotiator_ptr peer_negotiator;
01820           negotiator_any.in () >>= peer_negotiator;
01821           if (!CORBA::is_nil (peer_negotiator))
01822             {
01823               CORBA::Boolean result =
01824                 this->negotiator_->negotiate (peer_negotiator,
01825                                               qos
01826                                               ACE_ENV_ARG_PARAMETER);
01827               ACE_TRY_CHECK_EX (negotiate);
01828               if (!result)
01829                 if (TAO_debug_level > 0)
01830                   ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect (): negotiate failed\n"));
01831             }
01832         }
01833     }
01834   ACE_CATCHANY
01835     {
01836       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamEndPoint::negotiate");
01837     }
01838   ACE_ENDTRY;
01839   ACE_CHECK_RETURN (0);
01840 
01841   ACE_TRY_EX (available_protocols)
01842     {
01843       if (this->protocols_.length () > 0)
01844         {
01845           // choose protocols based on what the remote endpoint can support.
01846           CORBA::Any_var protocols_any =
01847             responder->get_property_value ("AvailableProtocols" ACE_ENV_ARG_PARAMETER);
01848           ACE_TRY_CHECK_EX (available_protocols);
01849           AVStreams::protocolSpec peer_protocols;
01850           AVStreams::protocolSpec *temp_protocols;
01851           protocols_any.in () >>= temp_protocols;
01852           peer_protocols = *temp_protocols;
01853           for (u_int i=0;i<peer_protocols.length ();i++)
01854             {
01855               for (u_int j=0;j<this->protocols_.length ();j++)
01856                 if (ACE_OS::strcmp (peer_protocols [i],
01857                                     this->protocols_[j]) == 0)
01858                   {
01859                     // we'll agree upon the first protocol that matches.
01860                     this->protocol_ = CORBA::string_dup (peer_protocols [i]);
01861                     break;
01862                   }
01863             }
01864         }
01865     }
01866   ACE_CATCHANY
01867     {
01868       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Availableprotocols property not defined\n"));
01869     }
01870   ACE_ENDTRY;
01871   ACE_CHECK_RETURN (0);
01872   ACE_TRY
01873     {
01874       AVStreams::streamQoS network_qos;
01875       if (qos.length () > 0)
01876         {
01877           if (TAO_debug_level > 0)
01878             ACE_DEBUG ((LM_DEBUG,
01879                         "QoS is Specified\n"));
01880 
01881           int result = this->translate_qos (qos,
01882                                             network_qos);
01883           if (result != 0)
01884             if (TAO_debug_level > 0)
01885               ACE_DEBUG ((LM_DEBUG,
01886                           "QoS translation failed\n"));
01887 
01888           this->qos ().set (network_qos);
01889         }
01890 
01891 
01892       AVStreams::flowSpec flow_spec (the_spec);
01893       this->handle_preconnect (flow_spec);
01894 
01895       if (TAO_debug_level > 0)
01896         ACE_DEBUG ((LM_DEBUG,
01897                     "TAO_StreamEndPoint::connect: flow_spec_length = %d\n",
01898                     flow_spec.length ()));
01899       u_int i;
01900       for (i=0;i<flow_spec.length ();i++)
01901         {
01902           TAO_Forward_FlowSpec_Entry *entry = 0;
01903           ACE_NEW_RETURN (entry,
01904                           TAO_Forward_FlowSpec_Entry,
01905                           0);
01906 
01907           if (entry->parse (flow_spec[i]) == -1)
01908             return 0;
01909 
01910           if (TAO_debug_level > 0)
01911             ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect: %s\n",  entry->entry_to_string ()));
01912 
01913           this->forward_flow_spec_set.insert (entry);
01914         }
01915 
01916       int result =TAO_AV_CORE::instance ()->init_forward_flows (this,
01917                                                                 this->forward_flow_spec_set,
01918                                                                 TAO_AV_Core::TAO_AV_ENDPOINT_A,
01919                                                                 flow_spec);
01920 
01921 
01922       if (result < 0)
01923         ACE_ERROR_RETURN ((LM_ERROR, "%N:%l TAO_AV_Core::init_forward_flows failed\n"), 0);
01924 
01925 
01926       AVStreams::StreamEndPoint_var streamendpoint = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
01927       ACE_TRY_CHECK;
01928 
01929       retv = responder->request_connection (streamendpoint.in (),
01930                                             0,
01931                                             network_qos,
01932                                             flow_spec
01933                                             ACE_ENV_ARG_PARAMETER);
01934       ACE_TRY_CHECK;
01935 
01936       if (TAO_debug_level > 0)
01937          ACE_DEBUG ((LM_DEBUG, "%N:%l request_connection returned %d\n", retv));
01938 
01939       if (retv == 0)
01940         return retv;
01941       for (i=0;i<flow_spec.length ();i++)
01942         {
01943           TAO_Reverse_FlowSpec_Entry *entry = 0;
01944           ACE_NEW_RETURN (entry,
01945                           TAO_Reverse_FlowSpec_Entry,
01946                           0);
01947           if (entry->parse (flow_spec[i]) == -1)
01948             ACE_ERROR_RETURN ((LM_ERROR,
01949                                "Reverse_Flow_Spec_Set::parse failed\n"),
01950                               0);
01951 
01952           if (TAO_debug_level > 0)
01953             ACE_DEBUG ((LM_DEBUG,
01954                         "TAO_StreamEndPoint::Connect: Reverse Flow Spec %s\n",
01955                         entry->entry_to_string ()));
01956 
01957           this->reverse_flow_spec_set.insert (entry);
01958         }
01959 
01960       result = TAO_AV_CORE::instance ()->init_reverse_flows (this,
01961                                                              this->forward_flow_spec_set,
01962                                                              this->reverse_flow_spec_set,
01963                                                              TAO_AV_Core::TAO_AV_ENDPOINT_A);
01964       if (result < 0)
01965         ACE_ERROR_RETURN ((LM_ERROR,
01966                            "TAO_AV_Core::init_reverse_flows failed\n"),
01967                           0);
01968 
01969       // Make the upcall to the app
01970       retv = this->handle_postconnect (flow_spec);
01971     }
01972   ACE_CATCHANY
01973     {
01974       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamEndPoint::connect");
01975       return 0;
01976     }
01977   ACE_ENDTRY;
01978   ACE_CHECK_RETURN (0);
01979   return retv;
01980 }
01981 
01982 int
01983 TAO_StreamEndPoint::translate_qos (const AVStreams::streamQoS& application_qos,
01984                                    AVStreams::streamQoS& network_qos)
01985 {
01986   u_int len = application_qos.length ();
01987   network_qos.length (len);
01988   for (u_int i=0;i<len;i++)
01989     {
01990       network_qos [i].QoSType = application_qos [i].QoSType;
01991       network_qos [i].QoSParams = application_qos [i].QoSParams;
01992     }
01993   return 0;
01994 }
01995 
01996 // Stop the physical flow of data on the stream
01997 // Empty the_spec --> apply to all flows
01998 
01999 void
02000 TAO_StreamEndPoint::stop (const AVStreams::flowSpec &flow_spec
02001                           ACE_ENV_ARG_DECL)
02002   ACE_THROW_SPEC ((CORBA::SystemException,
02003                    AVStreams::noSuchFlow))
02004 {
02005   // Make the upcall into the app
02006   this->handle_stop (flow_spec ACE_ENV_ARG_PARAMETER);
02007   ACE_CHECK;
02008 
02009   if (flow_spec.length () > 0)
02010     {
02011 
02012       for (u_int i=0;i<flow_spec.length ();i++)
02013         {
02014           TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
02015           for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
02016                begin != end; ++begin)
02017             {
02018               TAO_Forward_FlowSpec_Entry entry;
02019               entry.parse (flow_spec[i]);
02020               if (ACE_OS::strcmp ((*begin)->flowname (), entry.flowname ()) == 0)
02021                {
02022                  TAO_FlowSpec_Entry *entry = *begin;
02023                  //                  (*begin)->protocol_object ()->stop ();
02024                  if (entry->handler() != 0)
02025                    entry->handler ()->stop (entry->role ());
02026                  if (entry->control_handler () != 0)
02027                    entry->control_handler ()->stop (entry->role ());
02028                  break;
02029                }
02030             }
02031         }
02032     }
02033   else
02034     {
02035       TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
02036       for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
02037            begin != end; ++begin)
02038         {
02039           TAO_FlowSpec_Entry *entry = *begin;
02040           //          entry->protocol_object ()->stop ();
02041           if (entry->handler() != 0)
02042             entry->handler ()->stop (entry->role ());
02043           if (entry->control_handler () != 0)
02044             entry->control_handler ()->stop (entry->role ());
02045         }
02046     }
02047 }
02048 
02049 // Start the physical flow of data on the stream
02050 // Empty the_spec --> apply to all flows
02051 void
02052 TAO_StreamEndPoint::start (const AVStreams::flowSpec &flow_spec
02053                            ACE_ENV_ARG_DECL)
02054   ACE_THROW_SPEC ((CORBA::SystemException,
02055                    AVStreams::noSuchFlow))
02056 {
02057   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::start\n"));
02058   // Make the upcall into the app
02059   this->handle_start (flow_spec ACE_ENV_ARG_PARAMETER);
02060   ACE_CHECK;
02061 
02062   if (flow_spec.length () > 0)
02063     {
02064       // Now call start on all the flow handlers.
02065       for (u_int i=0;i<flow_spec.length ();i++)
02066         {
02067           TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
02068           for (TAO_AV_FlowSpecSetItor forward_begin = this->forward_flow_spec_set.begin ();
02069                forward_begin != end; ++forward_begin)
02070             {
02071               TAO_FlowSpec_Entry *entry = *forward_begin;
02072               if (ACE_OS::strcmp (entry->flowname (), flow_spec [i]) == 0)
02073                 {
02074                   //                  entry->protocol_object ()->start ();
02075                   if (entry->handler () != 0)
02076                   {
02077                     entry->handler ()->start (entry->role ());
02078                   }
02079                   if (entry->control_handler () != 0)
02080                   {
02081                     entry->control_handler ()->start (entry->role ());
02082                   }
02083                 }
02084             }
02085 
02086           end = this->reverse_flow_spec_set.end ();
02087           for (TAO_AV_FlowSpecSetItor reverse_begin = this->reverse_flow_spec_set.begin ();
02088                reverse_begin != end; ++reverse_begin)
02089             {
02090               TAO_FlowSpec_Entry *entry = *reverse_begin;
02091               if (ACE_OS::strcmp (entry->flowname (), flow_spec [i]) == 0)
02092                 {
02093                   //                  entry->protocol_object ()->start ();
02094                   if (entry->handler () != 0)
02095                   {
02096                     entry->handler ()->start (entry->role ());
02097                   }
02098                   if (entry->control_handler () != 0)
02099                   {
02100                     entry->control_handler ()->start (entry->role ());
02101                   }
02102                 }
02103             }
02104         }
02105     }
02106   else
02107     {
02108       TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
02109       for (TAO_AV_FlowSpecSetItor forwardbegin = this->forward_flow_spec_set.begin ();
02110            forwardbegin != end; ++forwardbegin)
02111         {
02112           TAO_FlowSpec_Entry *entry = *forwardbegin;
02113           if (entry->handler () != 0)
02114             {
02115               entry->handler ()->start (entry->role ());
02116             }
02117           if (entry->control_handler () != 0)
02118             {
02119               entry->control_handler ()->start (entry->role ());
02120             }
02121         }
02122 
02123       end = this->reverse_flow_spec_set.end ();
02124       for (TAO_AV_FlowSpecSetItor reversebegin = this->reverse_flow_spec_set.begin ();
02125            reversebegin != end; ++reversebegin)
02126         {
02127           TAO_FlowSpec_Entry *entry = *reversebegin;
02128           //          entry->protocol_object ()->start ();
02129           if (entry->handler () != 0)
02130             {
02131               entry->handler ()->start (entry->role ());
02132             }
02133           if (entry->control_handler () != 0)
02134             {
02135               entry->control_handler ()->start (entry->role ());
02136             }
02137         }
02138     }
02139 }
02140 
02141 // Close the connection
02142 void
02143 TAO_StreamEndPoint::destroy (const AVStreams::flowSpec &flow_spec
02144                              ACE_ENV_ARG_DECL)
02145   ACE_THROW_SPEC ((CORBA::SystemException,
02146                    AVStreams::noSuchFlow))
02147 {
02148   CORBA::Any_var vdev_any = this->get_property_value ("Related_VDev"
02149                                                       ACE_ENV_ARG_PARAMETER);
02150 
02151   AVStreams::VDev_ptr vdev;
02152 
02153   vdev_any.in() >>= vdev;
02154   CORBA::Any_var mc_any = vdev->get_property_value ("Related_MediaCtrl"
02155                                                     ACE_ENV_ARG_PARAMETER);
02156 
02157   // The Related_MediaCtrl property was inserted as a CORBA::Object, so we
02158   // must extract it as the same type.
02159   CORBA::Object_var obj;
02160   mc_any.in() >>= CORBA::Any::to_object( obj.out() );
02161 
02162   AVStreams::MediaControl_var media_ctrl =
02163           AVStreams::MediaControl::_narrow( obj.in() );
02164 
02165   // deactivate the associated vdev and media ctrl
02166 
02167   if ( !CORBA::is_nil( vdev ) )
02168   {
02169     PortableServer::ServantBase_var vdev_servant =
02170         TAO_AV_CORE::instance()->poa()->reference_to_servant ( vdev );
02171     TAO_AV_Core::deactivate_servant (vdev_servant.in());
02172   }
02173 
02174   if ( !CORBA::is_nil ( media_ctrl.in () ) )
02175   {
02176     PortableServer::ServantBase_var mc_servant =
02177         TAO_AV_CORE::instance()->poa()->reference_to_servant (media_ctrl.in());
02178     TAO_AV_Core::deactivate_servant (mc_servant.in());
02179   }
02180 
02181   int result = TAO_AV_Core::deactivate_servant (this);
02182   if (result < 0)
02183     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::destroy failed\n"));
02184 
02185   if (flow_spec.length () > 0)
02186     {
02187       for (u_int i=0;i<flow_spec.length ();i++)
02188         {
02189           {
02190             TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
02191             for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
02192                  begin != end; ++begin)
02193               {
02194                 TAO_FlowSpec_Entry *entry = *begin;
02195                 TAO_Tokenizer flow_name (flow_spec [i], '\\');
02196                 if (ACE_OS::strcmp (entry->flowname (), flow_name[0]) == 0)
02197                   {
02198                     if (entry->protocol_object ())
02199                       {
02200                         entry->protocol_object ()->destroy ();
02201                       }
02202                     break;
02203                   }
02204               }
02205           }
02206           {
02207             TAO_AV_FlowSpecSetItor end = this->reverse_flow_spec_set.end ();
02208             for (TAO_AV_FlowSpecSetItor begin = this->reverse_flow_spec_set.begin ();
02209                  begin != end; ++begin)
02210               {
02211                 TAO_FlowSpec_Entry *entry = *begin;
02212                 TAO_Tokenizer flow_name (flow_spec [i], '\\');
02213                 if (ACE_OS::strcmp (entry->flowname (), flow_name[0]) == 0)
02214                   {
02215                     if (entry->protocol_object ())
02216                       {
02217                         entry->protocol_object ()->destroy ();
02218                       }
02219                     break;
02220                   }
02221               }
02222           }
02223         }
02224     }
02225   else
02226     {
02227       {
02228         TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
02229         for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
02230              begin != end; ++begin)
02231           {
02232             TAO_FlowSpec_Entry *entry = *begin;
02233             if (entry->protocol_object ())
02234               {
02235                 entry->protocol_object ()->stop ();
02236 
02237                 ACE_CString control_flowname =
02238                     TAO_AV_Core::get_control_flowname (entry->flowname ());
02239                 TAO_AV_CORE::instance()->remove_acceptor(entry->flowname());
02240                 TAO_AV_CORE::instance()->remove_acceptor(control_flowname.c_str());
02241 
02242                 entry->protocol_object ()->destroy ();
02243               }
02244           }
02245       }
02246       {
02247         TAO_AV_FlowSpecSetItor end = this->reverse_flow_spec_set.end ();
02248         for (TAO_AV_FlowSpecSetItor begin = this->reverse_flow_spec_set.begin ();
02249              begin != end; ++begin)
02250           {
02251             TAO_FlowSpec_Entry *entry = *begin;
02252             if (entry->protocol_object ())
02253               {
02254                 entry->protocol_object ()->stop ();
02255 
02256                 ACE_CString control_flowname =
02257                     TAO_AV_Core::get_control_flowname (entry->flowname ());
02258                 TAO_AV_CORE::instance()->remove_connector(entry->flowname());
02259                 TAO_AV_CORE::instance()->remove_connector(control_flowname.c_str());
02260                 entry->protocol_object ()->destroy ();
02261 
02262               }
02263           }
02264       }
02265     }
02266 
02267   // Make the upcall into the app
02268   //  this->handle_destroy (the_spec ACE_ENV_ARG_PARAMETER);
02269   //
02270 }
02271 
02272 // Called by our peer endpoint, requesting us to establish
02273 // a connection
02274 CORBA::Boolean
02275 TAO_StreamEndPoint::request_connection (AVStreams::StreamEndPoint_ptr /*initiator*/,
02276                                         CORBA::Boolean /*is_mcast*/,
02277                                         AVStreams::streamQoS &qos,
02278                                         AVStreams::flowSpec &flow_spec
02279                                         ACE_ENV_ARG_DECL)
02280   ACE_THROW_SPEC ((CORBA::SystemException,
02281                    AVStreams::streamOpDenied,
02282                    AVStreams::noSuchFlow,
02283                    AVStreams::QoSRequestFailed,
02284                    AVStreams::FPError))
02285 
02286 {
02287   if (TAO_debug_level > 0)
02288     ACE_DEBUG ((LM_DEBUG,
02289                 "\n(%P|%t) TAO_StreamEndPoint::request_connection called"));
02290 
02291   int result = 0;
02292   ACE_TRY
02293     {
02294       AVStreams::streamQoS network_qos;
02295       if (qos.length () > 0)
02296         {
02297          if (TAO_debug_level > 0)
02298           ACE_DEBUG ((LM_DEBUG,
02299                       "QoS is Specified\n"));
02300 
02301           int result = this->translate_qos (qos, network_qos);
02302           if (result != 0)
02303             if (TAO_debug_level > 0)
02304               ACE_DEBUG ((LM_DEBUG, "QoS translation failed\n"));
02305 
02306           this->qos ().set (network_qos);
02307         }
02308 
02309       if (TAO_debug_level > 0)
02310         ACE_DEBUG ((LM_DEBUG,
02311                     "\n(%P|%t) TAO_StreamEndPoint::request_connection: "
02312                     "flowspec has length = %d and the strings are:\n",
02313                     flow_spec.length ()));
02314       CORBA::ULong i;
02315 
02316       for (i=0;i<flow_spec.length ();i++)
02317         {
02318           TAO_Forward_FlowSpec_Entry *entry = 0;
02319           ACE_NEW_RETURN (entry,
02320                           TAO_Forward_FlowSpec_Entry,
02321                           0);
02322 
02323           CORBA::String_var string_entry = CORBA::string_dup (flow_spec[i]);
02324 
02325           if(TAO_debug_level > 0)
02326              ACE_DEBUG(( LM_DEBUG,
02327                          "%N:%l Parsing flow spec: [%s]\n",
02328                          string_entry.in ()));
02329 
02330           if (entry->parse (string_entry.in ()) == -1)
02331           {
02332             if (TAO_debug_level > 0)
02333               ACE_DEBUG ((LM_DEBUG,
02334                           "%N:%l Error parsing flow_spec: [%s]\n",
02335                           string_entry.in ()));
02336             return 0;
02337           }
02338           if (TAO_debug_level > 0)
02339             ACE_DEBUG ((LM_DEBUG,
02340                         "TAO_StreamEndPoint::request_connection flow spec [%s]\n",
02341                         entry->entry_to_string ()));
02342 
02343           this->forward_flow_spec_set.insert (entry);
02344         }
02345 
02346       result = TAO_AV_CORE::instance ()->init_forward_flows (this,
02347                                                              this->forward_flow_spec_set,
02348                                                              TAO_AV_Core::TAO_AV_ENDPOINT_B,
02349                                                              flow_spec);
02350 
02351       if (result < 0)
02352         return 0;
02353 
02354       // Make the upcall to the app
02355       result = this->handle_connection_requested (flow_spec ACE_ENV_ARG_PARAMETER);
02356       ACE_TRY_CHECK;
02357     }
02358   ACE_CATCHANY
02359     {
02360       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
02361                            "TAO_StreamEndpoint::request_connection");
02362       return 0;
02363     }
02364   ACE_ENDTRY;
02365   ACE_CHECK_RETURN (0);
02366   return result;
02367 }
02368 
02369 int
02370 TAO_StreamEndPoint::change_qos (AVStreams::streamQoS &new_qos,
02371                                 const AVStreams::flowSpec &the_flows
02372                                 ACE_ENV_ARG_DECL_NOT_USED)
02373 {
02374   if (TAO_debug_level > 0)
02375     ACE_DEBUG ((LM_DEBUG,
02376                 "TAO_StreamEndPoint::change_qos\n"));
02377 
02378   TAO_AV_QoS qos (new_qos);
02379   for (int i = 0; (unsigned) i < the_flows.length (); i++)
02380     {
02381       TAO_Forward_FlowSpec_Entry entry;
02382       entry.parse (the_flows [i]);
02383       ACE_CString flow_name_key (entry.flowname ());
02384       Flow_Handler_Map_Entry *handler_entry;
02385       if (this->flow_handler_map_.find (flow_name_key,
02386                                         handler_entry) == 0)
02387         {
02388           AVStreams::QoS flow_qos;
02389           if (qos.get_flow_qos (entry.flowname (), flow_qos) != 0)
02390             ACE_DEBUG ((LM_DEBUG,
02391                         "New QoS for the flow %s is not specified\n",
02392                         entry.flowname ()));
02393           int result;
02394           result = handler_entry->int_id_->change_qos (flow_qos);
02395           if (result != 0)
02396             ACE_ERROR_RETURN ((LM_ERROR,
02397                                "Modifying QoS Failed\n"),
02398                               -1);
02399 
02400         }
02401     }
02402   return 0;
02403 }
02404 
02405 // Refers to modification of transport QoS.
02406 CORBA::Boolean
02407 TAO_StreamEndPoint::modify_QoS (AVStreams::streamQoS &new_qos,
02408                                 const AVStreams::flowSpec &the_flows
02409                                 ACE_ENV_ARG_DECL)
02410   ACE_THROW_SPEC ((CORBA::SystemException,
02411                    AVStreams::noSuchFlow,
02412                    AVStreams::QoSRequestFailed))
02413 {
02414   if (TAO_debug_level > 0)
02415   ACE_DEBUG ((LM_DEBUG,
02416               "TAO_StreamEndPoint::modify_QoS\n"));
02417 
02418   int result =  this->change_qos (new_qos, the_flows ACE_ENV_ARG_PARAMETER);
02419   ACE_CHECK_RETURN (0);
02420 
02421   if (result != 0)
02422     return 0;
02423 
02424   return 1;
02425 
02426 }
02427 
02428 // Sets the list of protocols this streamendpoint can understand.
02429 
02430 CORBA::Boolean
02431 TAO_StreamEndPoint::set_protocol_restriction (const AVStreams::protocolSpec &protocols
02432                                               ACE_ENV_ARG_DECL)
02433   ACE_THROW_SPEC ((CORBA::SystemException))
02434 {
02435   ACE_TRY
02436     {
02437       CORBA::Any protocol_restriction_any;
02438 
02439       protocol_restriction_any <<= protocols;
02440       this->define_property ("ProtocolRestriction",
02441                              protocol_restriction_any
02442                              ACE_ENV_ARG_PARAMETER);
02443       ACE_TRY_CHECK;
02444       this->protocols_ = protocols;
02445     }
02446   ACE_CATCHANY
02447     {
02448       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamEndPoint::set_protocol_restriction");
02449       return 0;
02450     }
02451   ACE_ENDTRY;
02452   ACE_CHECK_RETURN (0);
02453   return 1;
02454 }
02455 
02456 
02457 void
02458 TAO_StreamEndPoint::disconnect (const AVStreams::flowSpec &the_spec
02459                                 ACE_ENV_ARG_DECL_NOT_USED)
02460   ACE_THROW_SPEC ((CORBA::SystemException,
02461                    AVStreams::noSuchFlow,
02462                    AVStreams::streamOpFailed))
02463 {
02464   ACE_UNUSED_ARG (the_spec);
02465 }
02466 
02467 // Sets the status of the flow protocol.
02468 
02469 void
02470 TAO_StreamEndPoint::set_FPStatus (const AVStreams::flowSpec &/*the_spec*/,
02471                                   const char *fp_name,
02472                                   const CORBA::Any &fp_settings
02473                                   ACE_ENV_ARG_DECL_NOT_USED)
02474   ACE_THROW_SPEC ((CORBA::SystemException,
02475                    AVStreams::noSuchFlow,
02476                    AVStreams::FPError))
02477 {
02478   if (ACE_OS::strcmp (fp_name, "SFP1.0") != 0)
02479     return;
02480   fp_settings >>= this->sfp_status_;
02481   // @@Naga: We should call set_FPStatus on all the protocol objects.
02482 }
02483 
02484 
02485 CORBA::Object_ptr
02486 TAO_StreamEndPoint::get_fep (const char *flow_name
02487                              ACE_ENV_ARG_DECL_NOT_USED)
02488   ACE_THROW_SPEC ((CORBA::SystemException,
02489                    AVStreams::notSupported,
02490                    AVStreams::noSuchFlow))
02491 {
02492   ACE_CString fep_name_key (flow_name);
02493   AVStreams::FlowEndPoint_var fep_entry;
02494   if (this->fep_map_.find (fep_name_key, fep_entry) == 0)
02495     return fep_entry._retn();
02496   return 0;
02497 }
02498 
02499 char*
02500 TAO_StreamEndPoint::add_fep_i_add_property (AVStreams::FlowEndPoint_ptr fep
02501                                             ACE_ENV_ARG_DECL)
02502     ACE_THROW_SPEC ((CORBA::SystemException,
02503                      AVStreams::notSupported,
02504                      AVStreams::streamOpFailed))
02505 {
02506   ACE_CString flow_name;
02507 
02508   ACE_TRY
02509     {
02510       // exception implies the flow name is not defined and is system
02511       // generated.
02512       flow_name = "flow";
02513       char tmp[255];
02514       ACE_OS::sprintf (tmp, "%u", this->flow_num_++);
02515       flow_name += tmp;
02516 
02517       CORBA::Any flowname_any;
02518       flowname_any <<= flow_name.c_str ();
02519       fep->define_property ("Flow",
02520                             flowname_any
02521                             ACE_ENV_ARG_PARAMETER);
02522       ACE_TRY_CHECK;
02523     }
02524   ACE_CATCHANY
02525     {
02526       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
02527                            "TAO_StreamEndPoint::add_fep");
02528       return 0;
02529     }
02530   ACE_ENDTRY;
02531   return ACE_OS::strdup( flow_name.c_str () );
02532 }
02533 
02534 char*
02535 TAO_StreamEndPoint::add_fep_i (AVStreams::FlowEndPoint_ptr fep
02536                                ACE_ENV_ARG_DECL)
02537     ACE_THROW_SPEC ((CORBA::SystemException,
02538                      AVStreams::notSupported,
02539                      AVStreams::streamOpFailed))
02540 {
02541   CORBA::String_var flow_name;
02542   ACE_TRY
02543     {
02544       CORBA::Any_var flow_name_any =
02545         fep->get_property_value ("FlowName" ACE_ENV_ARG_PARAMETER);
02546       ACE_TRY_CHECK;
02547 
02548       const char *tmp;
02549       flow_name_any >>= tmp;
02550       flow_name = CORBA::string_dup (tmp);
02551     }
02552   ACE_CATCHANY
02553     {
02554       flow_name =
02555         this->add_fep_i_add_property (fep ACE_ENV_ARG_PARAMETER);
02556       ACE_CHECK_RETURN (0);
02557     }
02558   ACE_ENDTRY;
02559   return flow_name._retn ();
02560 }
02561 
02562 char *
02563 TAO_StreamEndPoint::add_fep (CORBA::Object_ptr fep_obj
02564                              ACE_ENV_ARG_DECL)
02565     ACE_THROW_SPEC ((CORBA::SystemException,
02566                      AVStreams::notSupported,
02567                      AVStreams::streamOpFailed))
02568 {
02569   AVStreams::FlowEndPoint_var fep =
02570     AVStreams::FlowEndPoint::_narrow (fep_obj ACE_ENV_ARG_PARAMETER);
02571   ACE_CHECK_RETURN (0);
02572 
02573   CORBA::String_var flow_name =
02574     this->add_fep_i (fep.in () ACE_ENV_ARG_PARAMETER);
02575   ACE_CHECK_RETURN (0);
02576 
02577   ACE_TRY
02578     {
02579       fep->lock (ACE_ENV_SINGLE_ARG_PARAMETER);
02580       ACE_TRY_CHECK;
02581       // Add it to the sequence of flowNames supported.
02582       // put the flowname and the flowendpoint in a hashtable.
02583       ACE_CString fep_name_key (CORBA::string_dup (flow_name.in ()));
02584       if (this->fep_map_.bind (fep_name_key, AVStreams::FlowEndPoint::_duplicate (fep.in ())) != 0)
02585         {
02586           ACE_THROW_RETURN (AVStreams::streamOpFailed (), 0);
02587         }
02588       // increment the flow count.
02589       this->flow_count_++;
02590       this->flows_.length (this->flow_count_);
02591       this->flows_[this->flow_count_-1] = flow_name;
02592       // define/modify the "Flows" property.
02593       CORBA::Any flows_any;
02594       flows_any <<= this->flows_;
02595       this->define_property ("Flows",
02596                              flows_any
02597                              ACE_ENV_ARG_PARAMETER);
02598       ACE_TRY_CHECK;
02599     }
02600   ACE_CATCHANY
02601     {
02602       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamEndPoint::add_fep");
02603       return 0;
02604     }
02605   ACE_ENDTRY;
02606   return flow_name._retn ();
02607 }
02608 
02609 
02610 void
02611 TAO_StreamEndPoint::remove_fep (const char *flow_name
02612                                 ACE_ENV_ARG_DECL)
02613     ACE_THROW_SPEC ((CORBA::SystemException,
02614                      AVStreams::notSupported,
02615                      AVStreams::streamOpFailed))
02616 {
02617   ACE_TRY
02618     {
02619       ACE_CString fep_name_key (flow_name);
02620       AVStreams::FlowEndPoint_var fep_entry;
02621       // Remove the fep from the hash table.
02622       if (this->fep_map_.unbind (fep_name_key, fep_entry)!= 0)
02623         ACE_THROW (AVStreams::streamOpFailed ());
02624       // redefine the "Flows" property
02625       AVStreams::flowSpec new_flows (this->flows_.length ());
02626       for (u_int i=0, j=0 ; i <this->flows_.length (); i++)
02627         if (ACE_OS::strcmp (flow_name, this->flows_[i]) != 0)
02628           new_flows[j++] = this->flows_[i];
02629 
02630       CORBA::Any flows;
02631       flows <<= new_flows;
02632       this->flows_ = new_flows;
02633       this->define_property ("Flows",
02634                              flows
02635                              ACE_ENV_ARG_PARAMETER);
02636       ACE_TRY_CHECK;
02637     }
02638   ACE_CATCHANY
02639     {
02640       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamEndPoint::remove_fep");
02641     }
02642   ACE_ENDTRY;
02643   ACE_CHECK;
02644 }
02645 
02646 // Sets the negotiator object.
02647 void
02648 TAO_StreamEndPoint::set_negotiator (AVStreams::Negotiator_ptr new_negotiator
02649                                     ACE_ENV_ARG_DECL)
02650   ACE_THROW_SPEC ((CORBA::SystemException))
02651 {
02652   ACE_TRY
02653     {
02654       CORBA::Any negotiator;
02655       negotiator <<= new_negotiator;
02656       this->define_property ("Negotiator",
02657                              negotiator
02658                              ACE_ENV_ARG_PARAMETER);
02659       ACE_TRY_CHECK;
02660       this->negotiator_ = AVStreams::Negotiator::_duplicate (new_negotiator);
02661     }
02662   ACE_CATCHANY
02663     {
02664       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamEndPoint::set_negotiator");
02665     }
02666   ACE_ENDTRY;
02667   ACE_CHECK;
02668 }
02669 
02670 
02671 // Sets the public key used for this streamendpoint.
02672 void
02673 TAO_StreamEndPoint::set_key (const char *flow_name,
02674                              const AVStreams::key & the_key
02675                              ACE_ENV_ARG_DECL)
02676   ACE_THROW_SPEC ((CORBA::SystemException))
02677 {
02678   ACE_TRY
02679     {
02680       this->key_ = the_key;
02681       CORBA::Any PublicKey;
02682       PublicKey <<= the_key;
02683       char PublicKey_property [BUFSIZ];
02684       ACE_OS::sprintf (PublicKey_property, "%s_PublicKey", flow_name);
02685       this->define_property (PublicKey_property,
02686                              PublicKey
02687                              ACE_ENV_ARG_PARAMETER);
02688       ACE_TRY_CHECK;
02689     }
02690   ACE_CATCHANY
02691     {
02692       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamEndPoint::set_key");
02693     }
02694   ACE_ENDTRY;
02695   ACE_CHECK;
02696 }
02697 
02698 // Set the source id.
02699 void
02700 TAO_StreamEndPoint::set_source_id (CORBA::Long source_id
02701                                    ACE_ENV_ARG_DECL_NOT_USED)
02702   ACE_THROW_SPEC ((CORBA::SystemException))
02703 {
02704   this->source_id_ = source_id;
02705 }
02706 
02707 CORBA::Boolean
02708 TAO_StreamEndPoint::multiconnect (AVStreams::streamQoS &/*the_qos*/,
02709                                   AVStreams::flowSpec &/*flow_spec*/
02710                                   ACE_ENV_ARG_DECL_NOT_USED)
02711 {
02712   if (TAO_debug_level > 0)
02713     ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::multiconnect\n"));
02714   return 0;
02715 }
02716 
02717 TAO_StreamEndPoint::~TAO_StreamEndPoint (void)
02718 {
02719   //this->handle_close ();
02720   TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
02721   TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
02722 
02723   int i=0;
02724   // @@ Naga: Will the iterator always give the entries in the order of insertion.
02725   // or is it an implementation fact of ACE containers.
02726   for ( ; begin != end; ++begin, ++i)
02727     {
02728 //       if (i >= FLOWSPEC_MAX)
02729 //         {
02730           TAO_FlowSpec_Entry *entry = *begin;
02731           delete entry;
02732           //        }
02733     }
02734   begin = this->reverse_flow_spec_set.begin ();
02735   end = this->reverse_flow_spec_set.end ();
02736   i = 0;
02737   for (; begin != end; ++begin)
02738     {
02739 //       if (i >= FLOWSPEC_MAX)
02740 //         {
02741           TAO_FlowSpec_Entry *entry = *begin;
02742           delete entry;
02743           //        }
02744     }
02745 }
02746 
02747 
02748 // ----------------------------------------------------------------------
02749 // TAO_StreamEndPoint_A
02750 // ----------------------------------------------------------------------
02751 
02752 TAO_StreamEndPoint_A::TAO_StreamEndPoint_A (void)
02753 {
02754   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_StreamEndPoint_A::TAO_StreamEndPoint_A: created\n"));
02755 }
02756 
02757 // IP Multicast style connect.
02758 CORBA::Boolean
02759 TAO_StreamEndPoint_A::multiconnect (AVStreams::streamQoS &stream_qos,
02760                                     AVStreams::flowSpec &flow_spec
02761                                     ACE_ENV_ARG_DECL)
02762   ACE_THROW_SPEC ((CORBA::SystemException,
02763                    AVStreams::noSuchFlow,
02764                    AVStreams::QoSRequestFailed,
02765                    AVStreams::streamOpFailed))
02766 {
02767   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPointA::multiconnect\n"));
02768   ACE_TRY
02769     {
02770       int result = 0;
02771       TAO_AV_QoS qos (stream_qos);
02772       for (u_int i=0;i< flow_spec.length ();i++)
02773         {
02774           TAO_Forward_FlowSpec_Entry *forward_entry = 0;
02775           ACE_NEW_RETURN (forward_entry,
02776                           TAO_Forward_FlowSpec_Entry,
02777                           0);
02778           forward_entry->parse (flow_spec[i]);
02779           ACE_CString mcast_key (forward_entry->flowname ());
02780           AVStreams::FlowEndPoint_var flow_endpoint;
02781 
02782           // @@Naga: There is a problem in the full profile case for multiconnect. Since
02783           // multiconnect on sep_a is called everytime a sink is added and if called for
02784           // the same flow twice, the following code will just call add producer on the flow connection.
02785           // It is however very hard to find out if the flow producer is already in the flow connection
02786           // since comparing object references will not work and the flowproducer reference is
02787           // generated by _narrow. Our only hope is that _narrow on the same fep will return the same
02788           // pointer for the flowproducer in which case we can find out if the flowproducer exists in
02789           // fep set for that flowconnection.
02790           if (this->fep_map_.find (mcast_key, flow_endpoint) == 0)
02791             {
02792               ACE_TRY_EX (narrow)
02793                 {
02794                   AVStreams::QoS flow_qos;
02795                   result = qos.get_flow_qos (forward_entry->flowname (), flow_qos);
02796                   if (result < 0)
02797                     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "QoS not found for %s\n", forward_entry->flowname ()));
02798                   // Narrow it to FlowProducer.
02799                   AVStreams::FlowProducer_var producer;
02800                   producer = AVStreams::FlowProducer::_narrow (flow_endpoint.in() ACE_ENV_ARG_PARAMETER);
02801                   //
02802                   ACE_TRY_CHECK_EX (narrow);
02803                   // Else narrow succeeeded.
02804                   if (!CORBA::is_nil (producer.in ()))
02805                     {
02806                       AVStreams::FlowConnection_var flow_connection;
02807                       ACE_TRY_EX (flow_connection)
02808                         {
02809                           if (CORBA::is_nil (this->streamctrl_.in ()))
02810                               {
02811                                 CORBA::Any_var streamctrl_any;
02812                                 streamctrl_any = this->get_property_value ("Related_StreamCtrl"
02813                                                                            ACE_ENV_ARG_PARAMETER);
02814                                 ACE_TRY_CHECK;
02815                                 AVStreams::StreamCtrl_ptr streamctrl;
02816                                 streamctrl_any.in () >>= streamctrl;
02817                                 this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (streamctrl);
02818                               }
02819 
02820                           CORBA::Object_var flow_connection_obj =
02821                             this->streamctrl_->get_flow_connection (forward_entry->flowname ()
02822                                                                     ACE_ENV_ARG_PARAMETER);
02823                           ACE_TRY_CHECK_EX (flow_connection);
02824                           flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj.in ()
02825                                                                                 ACE_ENV_ARG_PARAMETER);
02826                           ACE_TRY_CHECK_EX (flow_connection);
02827                         }
02828                       ACE_CATCHANY
02829                         {
02830                           TAO_FlowConnection *flowConnection;
02831                           ACE_NEW_RETURN (flowConnection,
02832                                           TAO_FlowConnection,
02833                                           0);
02834                           //@@ Strategize the multicast address allocation.
02835                           flowConnection->set_mcast_addr (this->mcast_addr_, this->mcast_port_);
02836                           this->mcast_port_++;
02837                           flowConnection->set_protocol (forward_entry->carrier_protocol_str ());
02838                           flow_connection = flowConnection->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
02839                           ACE_TRY_CHECK;
02840                           this->streamctrl_->set_flow_connection (forward_entry->flowname (),
02841                                                                   flow_connection.in ()
02842                                                                   ACE_ENV_ARG_PARAMETER);
02843                           ACE_TRY_CHECK;
02844                         }
02845                       ACE_ENDTRY;
02846                       ACE_CHECK_RETURN (0);
02847                       if (ACE_OS::strcmp (forward_entry->flow_protocol_str (), "") != 0)
02848                         {
02849                           CORBA::Any fp_settings;
02850                           flow_connection->use_flow_protocol (forward_entry->flow_protocol_str (),
02851                                                               fp_settings
02852                                                               ACE_ENV_ARG_PARAMETER);
02853                           ACE_TRY_CHECK;
02854                         }
02855                       result = flow_connection->add_producer (producer.in (),
02856                                                               flow_qos
02857                                                               ACE_ENV_ARG_PARAMETER);
02858                       ACE_TRY_CHECK;
02859                       if (result == 0)
02860                         ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_A::multiconnect: add_producer failed\n"), 0);
02861                     }
02862                 }
02863               ACE_CATCHANY
02864                 {
02865                   // Narrow failed and since its not a flowproducer its an error.
02866                   ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "FlowProducer::_narrow");
02867                   ACE_ERROR_RETURN ((LM_ERROR, "sep_a doesn't contain a flowproducer"), 0);
02868                 }
02869               ACE_ENDTRY;
02870               ACE_CHECK_RETURN (0);
02871             }
02872           else
02873             {
02874               ACE_INET_Addr *mcast_addr;
02875               TAO_FlowSpec_Entry *entry = 0;
02876               result = this->mcast_entry_map_.find (mcast_key, entry);
02877               if (result == 0)
02878                 {
02879                   mcast_addr = dynamic_cast<ACE_INET_Addr *> (entry->address ());
02880                   char str_addr [BUFSIZ];
02881                   result = mcast_addr->addr_to_string (str_addr, BUFSIZ);
02882                   if (result < 0)
02883                     ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPointA::multiconnect ::addr_to_string failed\n"), 0);
02884                   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint_A::multiconnect:%s\n", str_addr));
02885                   TAO_Forward_FlowSpec_Entry new_entry (entry->flowname (),
02886                                                         entry->direction_str (),
02887                                                         entry->format (),
02888                                                         entry->flow_protocol_str (),
02889                                                         entry->carrier_protocol_str (),
02890                                                         entry->address ());
02891                   flow_spec[i] = CORBA::string_dup (new_entry.entry_to_string ());
02892                 }
02893               else
02894                 {
02895 
02896                   switch (forward_entry->direction ())
02897                     {
02898                     case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
02899                       {
02900                         ACE_NEW_RETURN (mcast_addr,
02901                                         ACE_INET_Addr,
02902                                         0);
02903                         mcast_addr->set (this->mcast_port_, this->mcast_addr_.c_str ());
02904                         this->mcast_port_++;
02905                         char buf[BUFSIZ];
02906                         mcast_addr->addr_to_string (buf, BUFSIZ);
02907                         if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%s\n", buf));
02908                         TAO_Forward_FlowSpec_Entry *new_entry;
02909                         ACE_NEW_RETURN (new_entry,
02910                                         TAO_Forward_FlowSpec_Entry (forward_entry->flowname (),
02911                                                                     forward_entry->direction_str (),
02912                                                                     forward_entry->format (),
02913                                                                     forward_entry->flow_protocol_str (),
02914                                                                     forward_entry->carrier_protocol_str (),
02915                                                                     mcast_addr),
02916                                         0);
02917                         flow_spec[i] = CORBA::string_dup (new_entry->entry_to_string ());
02918                         //new_entry->is_multicast (1);
02919 
02920                         this->forward_flow_spec_set.insert (new_entry);
02921                         TAO_AV_Acceptor_Registry *acceptor_registry = TAO_AV_CORE::instance ()->acceptor_registry ();
02922                         result = acceptor_registry->open (this,
02923                                                           TAO_AV_CORE::instance (),
02924                                                           this->forward_flow_spec_set);
02925                         if (result < 0)
02926                           ACE_ERROR_RETURN ((LM_ERROR, "Acceptor_Registry::open failed\n"), 0);
02927                         result = this->mcast_entry_map_.bind (mcast_key, new_entry);
02928                         if (result < 0)
02929                           ACE_ERROR_RETURN ((LM_ERROR, "mcast_entry::bind failed"), 0);
02930                       }
02931                       break;
02932                     case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
02933                       // OUT implies we're the sink.
02934                       break;
02935                     default:
02936                       break;
02937                     }
02938                 }
02939             }
02940         }
02941     }
02942   ACE_CATCHANY
02943     {
02944       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamEndPoint_A::multiconnect");
02945       return 0;
02946     }
02947   ACE_ENDTRY;
02948   ACE_CHECK_RETURN (0);
02949   return 1;
02950 }
02951 
02952 // ATM style Multicast is not supported yet.
02953 CORBA::Boolean
02954 TAO_StreamEndPoint_A::connect_leaf (AVStreams::StreamEndPoint_B_ptr /* the_ep */,
02955                                     AVStreams::streamQoS & /* the_qos */,
02956                                     const AVStreams::flowSpec & /* the_flows */
02957                                     ACE_ENV_ARG_DECL)
02958   ACE_THROW_SPEC ((CORBA::SystemException,
02959                    AVStreams::streamOpFailed,
02960                    AVStreams::noSuchFlow,
02961                    AVStreams::QoSRequestFailed,
02962                    AVStreams::notSupported))
02963 {
02964   ACE_THROW_RETURN (AVStreams::notSupported (), 0);
02965 }
02966 
02967 // Multicast not supported yet.
02968 void
02969 TAO_StreamEndPoint_A::disconnect_leaf (AVStreams::StreamEndPoint_B_ptr /* the_ep */,
02970                                        const AVStreams::flowSpec & /* theSpec */
02971                                        ACE_ENV_ARG_DECL)
02972 
02973   ACE_THROW_SPEC ((CORBA::SystemException,
02974                    AVStreams::streamOpFailed,
02975                    AVStreams::noSuchFlow,
02976                    AVStreams::notSupported))
02977 {
02978 
02979   ACE_THROW (AVStreams::notSupported ());
02980 
02981 }
02982 
02983 TAO_StreamEndPoint_A::~TAO_StreamEndPoint_A (void)
02984 {
02985 }
02986 
02987 // ----------------------------------------------------------------------
02988 // TAO_StreamEndPoint_B
02989 // ----------------------------------------------------------------------
02990 
02991 TAO_StreamEndPoint_B::TAO_StreamEndPoint_B (void)
02992 {
02993   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
02994                                        "\n(%P|%t) TAO_StreamEndPoint_B::TAO_StreamEndPoint_B: created"));
02995 }
02996 
02997 CORBA::Boolean
02998 TAO_StreamEndPoint_B::multiconnect (AVStreams::streamQoS &stream_qos,
02999                                     AVStreams::flowSpec &flow_spec
03000                                     ACE_ENV_ARG_DECL)
03001      ACE_THROW_SPEC ((CORBA::SystemException,
03002                      AVStreams::streamOpFailed,
03003                      AVStreams::noSuchFlow,
03004                      AVStreams::QoSRequestFailed,
03005                      AVStreams::FPError))
03006 {
03007   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint_B::multiconnect\n"));
03008   ACE_TRY
03009     {
03010       int result = 0;
03011       TAO_AV_QoS qos (stream_qos);
03012       for (u_int i=0;i< flow_spec.length ();i++)
03013         {
03014           TAO_Forward_FlowSpec_Entry *forward_entry;
03015           ACE_NEW_RETURN (forward_entry,
03016                           TAO_Forward_FlowSpec_Entry,
03017                           0);
03018           forward_entry->parse (flow_spec[i]);
03019           ACE_CString mcast_key (forward_entry->flowname ());
03020           AVStreams::FlowEndPoint_var flow_endpoint;
03021           if (this->fep_map_.find (mcast_key, flow_endpoint ) == 0)
03022             {
03023               AVStreams::FlowConsumer_var consumer;
03024               ACE_TRY_EX (narrow)
03025                 {
03026                   consumer = AVStreams::FlowConsumer::_narrow (flow_endpoint.in () ACE_ENV_ARG_PARAMETER);
03027                   ACE_TRY_CHECK_EX (narrow);
03028                 }
03029               ACE_CATCHANY
03030                 {
03031                   ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "FlowConsumer::_narrow");
03032                   ACE_ERROR_RETURN ((LM_ERROR, "sep_b doesn't contain a flowconsumer"), 0);
03033                 }
03034               ACE_ENDTRY;
03035               ACE_CHECK_RETURN (0);
03036               AVStreams::QoS flow_qos;
03037               result = qos.get_flow_qos (forward_entry->flowname (), flow_qos);
03038               if (result < 0)
03039                 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "QoS not found for %s", forward_entry->flowname ()));
03040               AVStreams::FlowConnection_var flow_connection;
03041               ACE_TRY_EX (flow_connection)
03042                 {
03043                   if (CORBA::is_nil (this->streamctrl_.in ()))
03044                     {
03045                       CORBA::Any_var streamctrl_any;
03046                       streamctrl_any = this->get_property_value ("Related_StreamCtrl"
03047                                                                  ACE_ENV_ARG_PARAMETER);
03048                       ACE_TRY_CHECK;
03049                       AVStreams::StreamCtrl_ptr streamctrl;
03050                       streamctrl_any.in () >>= streamctrl;
03051                       this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (streamctrl);
03052                     }
03053                   CORBA::Object_var flow_connection_obj =
03054                     this->streamctrl_->get_flow_connection (forward_entry->flowname ()
03055                                                             ACE_ENV_ARG_PARAMETER);
03056                   ACE_TRY_CHECK_EX (flow_connection);
03057                   flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj.in ()
03058                                                                         ACE_ENV_ARG_PARAMETER);
03059                   ACE_TRY_CHECK_EX (flow_connection);
03060                 }
03061               ACE_CATCHANY
03062                 {
03063                   ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamEndPoint_B::multiconnect::get_flow_connection");
03064                   return 0;
03065                 }
03066               ACE_ENDTRY;
03067               ACE_CHECK_RETURN (0);
03068               result = flow_connection->add_consumer (consumer.in (),
03069                                                       flow_qos
03070                                                       ACE_ENV_ARG_PARAMETER);
03071               ACE_TRY_CHECK;
03072               if (result == 0)
03073                 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_B::multiconnect:add_consumer failed\n"), 0);
03074             }
03075           else
03076             {
03077               TAO_FlowSpec_Entry *mcast_entry = 0;
03078               ACE_INET_Addr *mcast_addr;
03079               mcast_addr = dynamic_cast<ACE_INET_Addr *> (forward_entry->address ());
03080               if (mcast_addr == 0)
03081                 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_B::multiconnect::Address missing in flowspec_entry\n"), 0);
03082               result = this->mcast_entry_map_.find (mcast_key, mcast_entry);
03083               if (result == 0)
03084                 {
03085                   ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_B::multiconnect::handler already found\n"), 0);
03086                 }
03087               else
03088                 {
03089                   switch (forward_entry->direction ())
03090                     {
03091                     case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
03092                       {
03093                         // IN means we're the sink.
03094                         // @@ We have to take care of this.
03095                         //                 result = this->make_dgram_mcast_flow_handler (mcast_dgram);
03096                         //                 if (result < 0)
03097                         //                   return 0;
03098 
03099                         this->forward_flow_spec_set.insert (forward_entry);
03100                         TAO_AV_Connector_Registry *connector_registry = TAO_AV_CORE::instance ()->connector_registry ();
03101                         result = connector_registry->open (this,
03102                                                            TAO_AV_CORE::instance (),
03103                                                            this->forward_flow_spec_set);
03104                         if (result < 0)
03105                           ACE_ERROR_RETURN ((LM_ERROR, "connector_registry::open failed\n"), 0);
03106                         result = this->mcast_entry_map_.bind (mcast_key, forward_entry);
03107                         if (result < 0)
03108                           ACE_ERROR_RETURN ((LM_ERROR, "dgram_mcast_handler::bind failed"), 0);
03109                       }
03110                       break;
03111                     case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
03112                       // OUT implies we're the source., which is an error.
03113                       break;
03114                     default:
03115                       break;
03116                     }
03117                 }
03118             }
03119         }
03120     }
03121   ACE_CATCHANY
03122     {
03123       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamEndPoint_B::multiconnect");
03124       return 0;
03125     }
03126   ACE_ENDTRY;
03127   ACE_CHECK_RETURN (0);
03128   return 1;
03129 }
03130 
03131 TAO_StreamEndPoint_B::~TAO_StreamEndPoint_B (void)
03132 {
03133 }
03134 
03135 // ----------------------------------------------------------------------
03136 // TAO_VDev
03137 // ----------------------------------------------------------------------
03138 
03139 TAO_VDev::TAO_VDev (void)
03140 {
03141   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
03142               "(%P|%t) TAO_VDev::TAO_VDev: created\n"));
03143 }
03144 
03145 // StreamCtrl will call this to give us a reference to itself, and to
03146 // our peer vdev..
03147 CORBA::Boolean
03148 TAO_VDev::set_peer (AVStreams::StreamCtrl_ptr the_ctrl,
03149                     AVStreams::VDev_ptr the_peer_dev,
03150                     AVStreams::streamQoS &the_qos,
03151                     const AVStreams::flowSpec &the_spec
03152                     ACE_ENV_ARG_DECL)
03153   ACE_THROW_SPEC ((CORBA::SystemException,
03154                    AVStreams::noSuchFlow,
03155                    AVStreams::QoSRequestFailed,
03156                    AVStreams::streamOpFailed))
03157 {
03158   ACE_UNUSED_ARG (the_qos);
03159   ACE_UNUSED_ARG (the_spec);
03160 
03161   CORBA::Boolean result = 0;
03162   ACE_TRY
03163     {
03164       if (TAO_debug_level > 0)
03165         ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_VDev::set_peer: called\n"));
03166 
03167 
03168       CORBA::Any anyval;
03169       anyval <<= the_peer_dev;
03170       this->define_property ("Related_VDev",
03171                              anyval
03172                              ACE_ENV_ARG_PARAMETER);
03173 
03174       ACE_TRY_CHECK;
03175 
03176       this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (the_ctrl);
03177       this->peer_ = AVStreams::VDev::_duplicate (the_peer_dev);
03178 
03179       CORBA::Any_var anyptr;
03180       anyptr = this->peer_->get_property_value ("Related_MediaCtrl"
03181                                                 ACE_ENV_ARG_PARAMETER);
03182       ACE_TRY_CHECK;
03183 
03184       CORBA::Object_ptr media_ctrl_obj = 0;
03185 
03186       anyptr.in () >>= CORBA::Any::to_object(media_ctrl_obj);
03187       ACE_TRY_CHECK;
03188 
03189       result = this->set_media_ctrl (media_ctrl_obj ACE_ENV_ARG_PARAMETER);
03190       ACE_TRY_CHECK;
03191     }
03192   ACE_CATCHANY
03193     {
03194       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_VDev::set_peer");
03195       return 0;
03196     }
03197   ACE_ENDTRY;
03198   ACE_CHECK_RETURN (0);
03199   return result;
03200 }
03201 
03202 CORBA::Boolean
03203 TAO_VDev::set_media_ctrl (CORBA::Object_ptr media_ctrl
03204                           ACE_ENV_ARG_DECL_NOT_USED)
03205 
03206 {
03207   //  since the media ctrl is not stored or used, delete it.
03208 
03209   CORBA::release( media_ctrl);
03210 
03211   return 1;
03212 }
03213 
03214 // Sets the multicast VDev peer.
03215 CORBA::Boolean
03216 TAO_VDev::set_Mcast_peer (AVStreams::StreamCtrl_ptr /* the_ctrl */,
03217                           AVStreams::MCastConfigIf_ptr mcast_peer,
03218                           AVStreams::streamQoS &/* the_qos */,
03219                           const AVStreams::flowSpec &/* the_spec */
03220                           ACE_ENV_ARG_DECL_NOT_USED)
03221   ACE_THROW_SPEC ((CORBA::SystemException,
03222                    AVStreams::noSuchFlow,
03223                    AVStreams::QoSRequestFailed,
03224                    AVStreams::streamOpFailed))
03225 {
03226   this->mcast_peer_ = AVStreams::MCastConfigIf::_duplicate (mcast_peer);
03227   return 1;
03228 }
03229 
03230 // applications should override this to handle configuration changes.
03231 void
03232 TAO_VDev::configure (const CosPropertyService::Property &/*the_config_mesg*/
03233                      ACE_ENV_ARG_DECL_NOT_USED)
03234   ACE_THROW_SPEC ((CORBA::SystemException,
03235                    AVStreams::PropertyException,
03236                    AVStreams::streamOpFailed))
03237 {
03238 }
03239 
03240 // sets the media format used for the flowname as a property.
03241 void
03242 TAO_VDev::set_format (const char *flowName,
03243                       const char *format_name
03244                       ACE_ENV_ARG_DECL)
03245   ACE_THROW_SPEC ((CORBA::SystemException,
03246                    AVStreams::notSupported))
03247 {
03248   ACE_TRY
03249     {
03250       if (flowName == 0 || format_name == 0)
03251         ACE_ERROR ((LM_ERROR, "TAO_VDev::set_format: flowName or format_name is null\n"));
03252       char format_property [BUFSIZ];
03253       ACE_OS::sprintf (format_property, "%s_currFormat", flowName);
03254       CORBA::Any format;
03255       format <<= format_name;
03256       this->define_property (format_property,
03257                              format
03258                              ACE_ENV_ARG_PARAMETER);
03259       ACE_TRY_CHECK;
03260     }
03261   ACE_CATCHANY
03262     {
03263       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_VDev::set_format");
03264       return;
03265     }
03266   ACE_ENDTRY;
03267   ACE_CHECK;
03268   return;
03269 }
03270 
03271 // sets the device parameters for the flowname as a property.
03272 void
03273 TAO_VDev::set_dev_params (const char *flowName,
03274                           const CosPropertyService::Properties &new_params
03275                           ACE_ENV_ARG_DECL)
03276   ACE_THROW_SPEC ((CORBA::SystemException,
03277                    AVStreams::PropertyException,
03278                    AVStreams::streamOpFailed))
03279 {
03280   ACE_TRY
03281     {
03282       if (flowName == 0)
03283         ACE_ERROR ((LM_ERROR, "TAO_VDev::set_dev_params:flowName is null\n"));
03284       char devParams_property[BUFSIZ];
03285       ACE_OS::sprintf (devParams_property, "%s_devParams", flowName);
03286       CORBA::Any devParams;
03287       devParams <<= new_params;
03288       this->define_property (devParams_property,
03289                              devParams
03290                              ACE_ENV_ARG_PARAMETER);
03291       ACE_TRY_CHECK;
03292     }
03293   ACE_CATCHANY
03294     {
03295       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_VDev::set_dev_params");
03296       return;
03297     }
03298   ACE_ENDTRY;
03299   ACE_CHECK;
03300   return;
03301 }
03302 
03303 // QoS Modification should be handled by the application currently.
03304 CORBA::Boolean
03305 TAO_VDev::modify_QoS (AVStreams::streamQoS &the_qos,
03306                       const AVStreams::flowSpec &flowspec
03307                       ACE_ENV_ARG_DECL)
03308   ACE_THROW_SPEC ((CORBA::SystemException,
03309                    AVStreams::noSuchFlow,
03310                    AVStreams::QoSRequestFailed))
03311 {
03312   if (TAO_debug_level > 0)
03313   ACE_DEBUG ((LM_DEBUG,
03314              "TAO_VDev::modify_QoS\n"));
03315 
03316   if (flowspec.length () != 0)
03317     {
03318       TAO_Forward_FlowSpec_Entry entry;
03319       entry.parse (flowspec [0]);
03320       int direction = entry.direction ();
03321       if (direction == 0)
03322         {
03323           AVStreams::StreamEndPoint_A_ptr sep_a;
03324 
03325           CORBA::Any_ptr streamendpoint_a_any =
03326           this->get_property_value ("Related_StreamEndpoint"
03327                                     ACE_ENV_ARG_PARAMETER);
03328           ACE_CHECK_RETURN (0);
03329 
03330           *streamendpoint_a_any >>= sep_a;
03331           if (sep_a != 0)
03332             {
03333               sep_a->modify_QoS (the_qos, flowspec ACE_ENV_ARG_PARAMETER);
03334               ACE_CHECK_RETURN (0);
03335             }
03336           else ACE_DEBUG ((LM_DEBUG,
03337                            "Stream EndPoint Not Found\n"));
03338         }
03339       else
03340         {
03341           AVStreams::StreamEndPoint_B_ptr sep_b;
03342 
03343           CORBA::Any_ptr streamendpoint_b_any =
03344           this->get_property_value ("Related_StreamEndpoint"
03345                                     ACE_ENV_ARG_PARAMETER);
03346           ACE_CHECK_RETURN (0);
03347           *streamendpoint_b_any >>= sep_b;
03348           sep_b->modify_QoS (the_qos, flowspec ACE_ENV_ARG_PARAMETER);
03349           ACE_CHECK_RETURN (0);
03350         }
03351   }
03352   return 1;
03353 }
03354 
03355 TAO_VDev::~TAO_VDev (void)
03356 {
03357 }
03358 
03359 // ----------------------------------------------------------------------
03360 // TAO_MMDevice
03361 // ----------------------------------------------------------------------
03362 
03363 
03364 TAO_MMDevice::TAO_MMDevice (TAO_AV_Endpoint_Strategy *endpoint_strategy)
03365   : endpoint_strategy_ (endpoint_strategy),
03366     flow_count_ (0),
03367     flow_num_ (0),
03368     stream_ctrl_ (0)
03369 {
03370 }
03371 
03372 // create a streamctrl which is colocated with me, use that streamctrl
03373 // to bind the peer_device with me.
03374 AVStreams::StreamCtrl_ptr
03375 TAO_MMDevice::bind (AVStreams::MMDevice_ptr peer_device,
03376                     AVStreams::streamQoS &the_qos,
03377                     CORBA::Boolean_out is_met,
03378                     const AVStreams::flowSpec &the_spec
03379                     ACE_ENV_ARG_DECL)
03380   ACE_THROW_SPEC ((CORBA::SystemException,
03381                    AVStreams::streamOpFailed,
03382                    AVStreams::noSuchFlow,
03383                    AVStreams::QoSRequestFailed))
03384 {
03385   AVStreams::StreamCtrl_ptr streamctrl (AVStreams::StreamCtrl::_nil ());
03386   ACE_TRY
03387     {
03388       ACE_UNUSED_ARG (is_met);
03389       ACE_NEW_RETURN (this->stream_ctrl_,
03390                       TAO_StreamCtrl,
03391                       0);
03392       AVStreams::MMDevice_var mmdevice = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
03393       ACE_TRY_CHECK;
03394       this->stream_ctrl_->bind_devs (peer_device,
03395                                      mmdevice.in (),
03396                                      the_qos,
03397                                      the_spec
03398                                      ACE_ENV_ARG_PARAMETER);
03399       ACE_TRY_CHECK;
03400       streamctrl = this->stream_ctrl_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
03401       ACE_TRY_CHECK;
03402     }
03403   ACE_CATCHANY
03404     {
03405       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MMDevice::bind");
03406       return streamctrl;
03407     }
03408   ACE_ENDTRY;
03409   ACE_CHECK_RETURN (streamctrl);
03410   return streamctrl;
03411 }
03412 
03413 // Multicast is not supported yet.
03414 AVStreams::StreamCtrl_ptr
03415 TAO_MMDevice::bind_mcast (AVStreams::MMDevice_ptr first_peer,
03416                           AVStreams::streamQoS &the_qos,
03417                           CORBA::Boolean_out is_met,
03418                           const AVStreams::flowSpec &the_spec
03419                           ACE_ENV_ARG_DECL_NOT_USED)
03420   ACE_THROW_SPEC ((CORBA::SystemException,
03421                    AVStreams::streamOpFailed,
03422                    AVStreams::noSuchFlow,
03423                    AVStreams::QoSRequestFailed))
03424 {
03425   ACE_UNUSED_ARG (first_peer);
03426   ACE_UNUSED_ARG (the_qos);
03427   ACE_UNUSED_ARG (is_met);
03428   ACE_UNUSED_ARG (the_spec);
03429 
03430   return 0;
03431 }
03432 
03433 AVStreams::StreamEndPoint_ptr
03434 TAO_MMDevice::create_A_B (MMDevice_Type type,
03435                           AVStreams::StreamCtrl_ptr streamctrl,
03436                           AVStreams::VDev_out the_vdev,
03437                           AVStreams::streamQoS &the_qos,
03438                           CORBA::Boolean_out met_qos,
03439                           char *&/*named_vdev*/,
03440                           const AVStreams::flowSpec &flow_spec
03441                           ACE_ENV_ARG_DECL)
03442 {
03443   AVStreams::StreamEndPoint_A_ptr sep_a (AVStreams::StreamEndPoint_A::_nil ());
03444   AVStreams::StreamEndPoint_B_ptr sep_b (AVStreams::StreamEndPoint_B::_nil ());
03445   AVStreams::StreamEndPoint_ptr sep (AVStreams::StreamEndPoint::_nil ());
03446   ACE_TRY
03447     {
03448       switch (type)
03449         {
03450         case MMDEVICE_A:
03451           {
03452             if (this->endpoint_strategy_->create_A (sep_a,
03453                                                     the_vdev
03454                                                     ACE_ENV_ARG_PARAMETER) == -1)
03455               ACE_ERROR_RETURN ((LM_ERROR,
03456                                  "TAO_MMDevice::create_A_B (%P|%t) - "
03457                                  "error in create_A\n"),
03458                                 0);
03459             sep = sep_a;
03460           }
03461           break;
03462         case MMDEVICE_B:
03463           {
03464             if (this->endpoint_strategy_->create_B (sep_b,
03465                                                     the_vdev
03466                                                     ACE_ENV_ARG_PARAMETER) == -1)
03467               ACE_ERROR_RETURN ((LM_ERROR,
03468                                  "TAO_MMDevice::create_A_B (%P|%t) - "
03469                                  "error in create_B\n"),
03470                                 0);
03471             sep = sep_b;
03472           }
03473           break;
03474         default:
03475           break;
03476         }
03477       ACE_TRY_CHECK;
03478       if (this->fdev_map_.current_size () > 0)
03479         {
03480           TAO_AV_QoS qos (the_qos);
03481           // create flowendpoints from the FDevs.
03482           for (u_int i=0;i<flow_spec.length ();i++)
03483             {
03484               TAO_Forward_FlowSpec_Entry forward_entry;
03485               forward_entry.parse (flow_spec[i]);
03486               ACE_CString flow_key (forward_entry.flowname ());
03487               AVStreams::FDev_var flow_dev;
03488               AVStreams::FlowConnection_var flowconnection;
03489               ACE_TRY_EX (flowconnection)
03490                 {
03491                   // Get the flowconnection for this flow.
03492                   //static int blah = 0; if(blah == 1){blah=0; abort();}else{blah=1;}
03493                   CORBA::Object_var flowconnection_obj =
03494                     streamctrl->get_flow_connection (forward_entry.flowname () ACE_ENV_ARG_PARAMETER);
03495                   ACE_TRY_CHECK_EX (flowconnection);
03496                         printf("successfully called get_flow_connection\n");
03497                   if (!CORBA::is_nil (flowconnection_obj.in ()))
03498                     {
03499                       flowconnection = AVStreams::FlowConnection::_narrow (flowconnection_obj.in ()
03500                                                                            ACE_ENV_ARG_PARAMETER);
03501                       ACE_TRY_CHECK_EX (flowconnection);
03502                     }
03503                 }
03504               ACE_CATCH(AVStreams::noSuchFlow, nsf)
03505                 {
03506                           TAO_FlowConnection *flowConnection;
03507                           ACE_NEW_RETURN (flowConnection,
03508                                           TAO_FlowConnection,
03509                                           0);
03510                           flowconnection = flowConnection->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
03511                           ACE_TRY_CHECK;
03512                           streamctrl->set_flow_connection (forward_entry.flowname(),
03513                                                      flowconnection.in ()
03514                                                      ACE_ENV_ARG_PARAMETER);
03515                           ACE_TRY_CHECK;
03516                 }
03517               ACE_CATCHANY
03518                 {
03519                   //if (TAO_debug_level >= 0)
03520                     ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MMDevice::create_a::get_flow_connection");
03521                 }
03522               ACE_ENDTRY;
03523               ACE_CHECK_RETURN (0);
03524 
03525               int result = this->fdev_map_.find (flow_key, flow_dev);
03526               if (result < 0)
03527                 ACE_ERROR_RETURN ((LM_ERROR, "(%N,%l) fdev_map::find failed\n"), 0);
03528 
03529               CORBA::String_var named_fdev;
03530               AVStreams::FlowEndPoint_var flow_endpoint;
03531               AVStreams::QoS flow_qos;
03532               result = qos.get_flow_qos (forward_entry.flowname (), flow_qos);
03533               if (result < 0)
03534                 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%N,%l) get_flow_qos failed for %s\n", forward_entry.flowname ()));
03535               switch (forward_entry.direction ())
03536                 {
03537                 case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
03538                   {
03539                     switch (type)
03540                       {
03541                       case MMDEVICE_A:
03542                         {
03543                           // In implies flow is from A to B and
03544                           // hence A is the producer for this flow and B is the consumer for this flow.
03545                           // We have to create a producer from the FDev for this flow.
03546                           flow_endpoint =
03547                             flow_dev->create_producer (flowconnection.in (),
03548                                                        flow_qos,
03549                                                        met_qos,
03550                                                        named_fdev.inout ()
03551                                                        ACE_ENV_ARG_PARAMETER);
03552                         }
03553                         break;
03554                       case MMDEVICE_B:
03555                         {
03556                           flow_endpoint =
03557                             flow_dev->create_consumer (flowconnection.in (),
03558                                                        flow_qos,
03559                                                        met_qos,
03560                                                        named_fdev.inout ()
03561                                                        ACE_ENV_ARG_PARAMETER);
03562                         }
03563                         break;
03564                       }
03565                     ACE_TRY_CHECK;
03566                   }
03567                   break;
03568                 case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
03569                   {
03570                     switch (type)
03571                       {
03572                       case MMDEVICE_A:
03573                         {
03574                           // OUT implies flow is from B to A and
03575                           // hence B is the producer for this flow and A is the consumer for this flow.
03576                           // We have to create a consumer from the FDev for this flow.
03577                           flow_endpoint =
03578                             flow_dev->create_consumer (flowconnection.in (),
03579                                                        flow_qos,
03580                                                        met_qos,
03581                                                        named_fdev.inout ()
03582                                                        ACE_ENV_ARG_PARAMETER);
03583                         }
03584                         break;
03585                       case MMDEVICE_B:
03586                         {
03587                           // In implies flow is from A to B and
03588                           // hence A is the producer for this flow and B is the consumer for this flow.
03589                           // We have to create a producer from the FDev for this flow.
03590                           flow_endpoint =
03591                             flow_dev->create_producer (flowconnection.in (),
03592                                                        flow_qos,
03593                                                        met_qos,
03594                                                        named_fdev.inout ()
03595                                                        ACE_ENV_ARG_PARAMETER);
03596                         }
03597                         break;
03598                       }
03599                     ACE_TRY_CHECK;
03600                   }
03601                   break;
03602                 default:
03603                   break;
03604                 }
03605               CORBA::Any flowname_any;
03606               flowname_any <<= forward_entry.flowname ();
03607               flow_endpoint->define_property ("FlowName", flowname_any ACE_ENV_ARG_PARAMETER);
03608               ACE_TRY_CHECK;
03609               sep->add_fep (flow_endpoint.in ()
03610                             ACE_ENV_ARG_PARAMETER);
03611               ACE_TRY_CHECK;
03612             }
03613         }
03614     }
03615   ACE_CATCHANY
03616     {
03617       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MMDevice::create_A");
03618       return sep;
03619     }
03620   ACE_ENDTRY;
03621   ACE_CHECK_RETURN (sep);
03622   return sep;
03623 }
03624 
03625 AVStreams::StreamEndPoint_A_ptr
03626 TAO_MMDevice::create_A (AVStreams::StreamCtrl_ptr streamctrl,
03627                         AVStreams::VDev_out the_vdev,
03628                         AVStreams::streamQoS &stream_qos,
03629                         CORBA::Boolean_out met_qos,
03630                         char *&named_vdev,
03631                         const AVStreams::flowSpec &flow_spec
03632                         ACE_ENV_ARG_DECL)
03633   ACE_THROW_SPEC ((CORBA::SystemException,
03634                    AVStreams::streamOpFailed,
03635                    AVStreams::streamOpDenied,
03636                    AVStreams::notSupported,
03637                    AVStreams::QoSRequestFailed,
03638                    AVStreams::noSuchFlow))
03639 {
03640   AVStreams::StreamEndPoint_A_ptr sep_a = 0;
03641   AVStreams::StreamEndPoint_var sep;
03642   ACE_TRY
03643     {
03644       sep = this->create_A_B (MMDEVICE_A, streamctrl, the_vdev, stream_qos, met_qos, named_vdev, flow_spec ACE_ENV_ARG_PARAMETER);
03645       ACE_TRY_CHECK;
03646       sep_a = AVStreams::StreamEndPoint_A::_narrow (sep.in() ACE_ENV_ARG_PARAMETER);
03647       ACE_TRY_CHECK;
03648 
03649       ACE_ASSERT( !CORBA::is_nil( sep_a ) );
03650     }
03651   ACE_CATCHANY
03652     {
03653       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MMDevice::create_A");
03654       return sep_a;
03655     }
03656   ACE_ENDTRY;
03657 
03658   return sep_a;
03659 }
03660 
03661 
03662 AVStreams::StreamEndPoint_B_ptr
03663 TAO_MMDevice::create_B (AVStreams::StreamCtrl_ptr streamctrl,
03664                         AVStreams::VDev_out the_vdev,
03665                         AVStreams::streamQoS &stream_qos,
03666                         CORBA::Boolean_out met_qos,
03667                         char *&named_vdev,
03668                         const AVStreams::flowSpec &flow_spec
03669                         ACE_ENV_ARG_DECL)
03670   ACE_THROW_SPEC ((CORBA::SystemException,
03671                    AVStreams::streamOpFailed,
03672                    AVStreams::streamOpDenied,
03673                    AVStreams::notSupported,
03674                    AVStreams::QoSRequestFailed,
03675                    AVStreams::noSuchFlow))
03676 {
03677   AVStreams::StreamEndPoint_B_ptr sep_b = AVStreams::StreamEndPoint_B::_nil ();
03678   AVStreams::StreamEndPoint_var sep;
03679 
03680   ACE_TRY
03681     {
03682       sep = this->create_A_B (MMDEVICE_B, streamctrl, the_vdev, stream_qos, met_qos, named_vdev, flow_spec ACE_ENV_ARG_PARAMETER);
03683       ACE_TRY_CHECK;
03684       sep_b = AVStreams::StreamEndPoint_B::_narrow (sep.in() ACE_ENV_ARG_PARAMETER);
03685       ACE_TRY_CHECK;
03686 
03687       ACE_ASSERT ( !CORBA::is_nil( sep_b ) );
03688     }
03689   ACE_CATCHANY
03690     {
03691       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MMDevice::create_B");
03692       return sep_b;
03693     }
03694   ACE_ENDTRY;
03695   ACE_CHECK_RETURN (sep_b);
03696   return sep_b;
03697 }
03698 
03699 
03700 // destroys the streamendpoint and the Vdev.
03701 void
03702 TAO_MMDevice::destroy (AVStreams::StreamEndPoint_ptr /* the_ep */,
03703                        const char * /* vdev_name */
03704                        ACE_ENV_ARG_DECL_NOT_USED)
03705   ACE_THROW_SPEC ((CORBA::SystemException,
03706                    AVStreams::notSupported))
03707 {
03708   // Remove self from POA.  Because of reference counting, the POA
03709   // will automatically delete the servant when all pending requests
03710   // on this servant are complete.
03711   int result = TAO_AV_Core::deactivate_servant (this);
03712   if (result < 0)
03713     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_MMDevice::destroy failed\n"));
03714 }
03715 
03716 char *
03717 TAO_MMDevice::add_fdev_i (AVStreams::FDev_ptr fdev
03718                           ACE_ENV_ARG_DECL)
03719   ACE_THROW_SPEC ((CORBA::SystemException,
03720                    AVStreams::notSupported,
03721                    AVStreams::streamOpFailed))
03722 {
03723   char* tmp;
03724   ACE_NEW_RETURN (tmp,
03725                   char[64],
03726                   0);
03727   CORBA::String_var flow_name = tmp;
03728 
03729   ACE_TRY
03730     {
03731       // exception implies the flow name is not defined and is system
03732       // generated.
03733       ACE_OS::sprintf (tmp, "flow%d", flow_num_++);
03734       CORBA::Any flowname_any;
03735       flowname_any <<= flow_name.in ();
03736       fdev->define_property ("Flow", flowname_any ACE_ENV_ARG_PARAMETER);
03737       ACE_TRY_CHECK;
03738     }
03739   ACE_CATCHANY
03740     {
03741       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MMDevice::add_fdev");
03742       return 0;
03743     }
03744   ACE_ENDTRY;
03745   return flow_name._retn ();
03746 }
03747 
03748 // Adds the fdev object to the MMDevice.
03749 char *
03750 TAO_MMDevice::add_fdev (CORBA::Object_ptr fdev_obj
03751                         ACE_ENV_ARG_DECL)
03752   ACE_THROW_SPEC ((CORBA::SystemException,
03753                    AVStreams::notSupported,
03754                    AVStreams::streamOpFailed))
03755 {
03756   CORBA::String_var flow_name;
03757   AVStreams::FDev_var fdev;
03758   ACE_TRY_EX (flow_name)
03759     {
03760       CORBA::Any_ptr flow_name_any;
03761       fdev = AVStreams::FDev::_narrow (fdev_obj ACE_ENV_ARG_PARAMETER);
03762       ACE_TRY_CHECK_EX (flow_name);
03763 
03764       if (CORBA::is_nil (fdev.in ()))
03765           return 0;
03766 
03767 
03768       flow_name_any = fdev->get_property_value ("Flow" ACE_ENV_ARG_PARAMETER);
03769       ACE_TRY_CHECK_EX (flow_name);
03770 
03771       const char *tmp;
03772       *flow_name_any >>= tmp;
03773       flow_name = CORBA::string_dup (tmp);
03774     }
03775   ACE_CATCHANY
03776     {
03777       flow_name =
03778         this->add_fdev_i (fdev.in () ACE_ENV_ARG_PARAMETER);
03779       ACE_CHECK_RETURN (0);
03780     }
03781   ACE_ENDTRY;
03782   ACE_CHECK_RETURN (0);
03783 
03784 
03785   // Add it to the sequence of flowNames supported.
03786   // put the flowname and the fdev in a hashtable.
03787   ACE_CString fdev_name_key ( flow_name.in () );
03788 
03789 
03790   if ( (this->fdev_map_.bind (fdev_name_key, fdev )) != 0)
03791     ACE_THROW_RETURN (AVStreams::streamOpFailed (), 0);
03792   // increment the flow count.
03793   this->flow_count_++;
03794   this->flows_.length (this->flow_count_);
03795   this->flows_ [this->flow_count_-1] = flow_name;
03796   // define/modify the "Flows" property.
03797   CORBA::Any flows_any;
03798   flows_any <<= this->flows_;
03799   ACE_TRY_EX (flows)
03800     {
03801       this->define_property ("Flows",
03802                              flows_any
03803                              ACE_ENV_ARG_PARAMETER);
03804       ACE_TRY_CHECK_EX (flows);
03805     }
03806   ACE_CATCHANY
03807     {
03808       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MMDevice::add_fdev");
03809       return 0;
03810     }
03811   ACE_ENDTRY;
03812   ACE_CHECK_RETURN (0);
03813   return flow_name._retn ();
03814 }
03815 
03816 // Gets the FDev object associated with this flow.
03817 CORBA::Object_ptr
03818 TAO_MMDevice::get_fdev (const char *flow_name
03819                         ACE_ENV_ARG_DECL_NOT_USED)
03820   ACE_THROW_SPEC ((CORBA::SystemException,
03821                    AVStreams::notSupported,
03822                    AVStreams::noSuchFlow))
03823 {
03824 
03825   ACE_CString fdev_name_key (flow_name);
03826   AVStreams::FDev_var fdev_entry;
03827   if (this->fdev_map_.find (fdev_name_key, fdev_entry) == 0)
03828     return fdev_entry._retn() ;
03829   return 0;
03830 }
03831 
03832 // Removes the fdev from this MMDevice.
03833 void
03834 TAO_MMDevice::remove_fdev (const char *flow_name
03835                            ACE_ENV_ARG_DECL)
03836   ACE_THROW_SPEC ((CORBA::SystemException,
03837                    AVStreams::notSupported,
03838                    AVStreams::noSuchFlow,
03839                    AVStreams::streamOpFailed))
03840 {
03841   ACE_TRY
03842     {
03843       ACE_CString fdev_name_key (flow_name);
03844       AVStreams::FDev_var fdev_entry;
03845       // Remove the fep from the hash table.
03846       if (this->fdev_map_.unbind (fdev_name_key, fdev_entry)!= 0)
03847         ACE_THROW (AVStreams::streamOpFailed ());
03848 
03849       AVStreams::flowSpec new_flows (this->flows_.length ());
03850       for (u_int i=0, j=0 ; i <this->flows_.length (); i++)
03851         if (ACE_OS::strcmp (flow_name, this->flows_[i]) != 0)
03852           new_flows[j++] = this->flows_[i];
03853 
03854       CORBA::Any flows;
03855       flows <<= new_flows;
03856       this->flows_ = new_flows;
03857       this->define_property ("Flows",
03858                              flows
03859                              ACE_ENV_ARG_PARAMETER);
03860       ACE_TRY_CHECK;
03861     }
03862   ACE_CATCHANY
03863     {
03864       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MMDevice::remove_fdev");
03865     }
03866   ACE_ENDTRY;
03867   ACE_CHECK;
03868 }
03869 
03870 // destructor.
03871 TAO_MMDevice::~TAO_MMDevice (void)
03872 {
03873   delete this->stream_ctrl_;
03874 }
03875 
03876 //------------------------------------------------------------------
03877 // TAO_FlowConnection
03878 //------------------------------------------------------------------
03879 
03880 // default constructor.
03881 TAO_FlowConnection::TAO_FlowConnection (void)
03882   :fp_name_ (CORBA::string_dup ("")),
03883    ip_multicast_ (0)
03884 {
03885 }
03886 
03887 // int
03888 // TAO_FlowConnection::set_mcast_addr (ACE_UINT32 mcast_addr, u_short mcast_port)
03889 // {
03890 //   this->mcast_addr_ = mcast_addr;
03891 //   this->mcast_port_ = mcast_port;
03892 //   return 0;
03893 // }
03894 
03895 int
03896 TAO_FlowConnection::set_mcast_addr (ACE_CString mcast_addr, u_short mcast_port)
03897 {
03898   this->mcast_addr_ = mcast_addr;
03899   this->mcast_port_ = mcast_port;
03900   return 0;
03901 }
03902 
03903 void
03904 TAO_FlowConnection::set_protocol (const char *protocol)
03905 {
03906   this->protocol_ = protocol;
03907 }
03908 
03909 // stop this flow.
03910 void
03911 TAO_FlowConnection::stop (ACE_ENV_SINGLE_ARG_DECL)
03912   ACE_THROW_SPEC ((CORBA::SystemException))
03913 {
03914   ACE_TRY
03915     {
03916       FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03917         ();
03918       for (FlowProducer_SetItor producer_end =
03919              this->flow_producer_set_.end ();
03920            producer_begin != producer_end; ++producer_begin)
03921         {
03922           (*producer_begin)->stop (ACE_ENV_SINGLE_ARG_PARAMETER);
03923           ACE_TRY_CHECK;
03924         }
03925       FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03926         ();
03927       for (FlowConsumer_SetItor consumer_end =
03928              this->flow_consumer_set_.end ();
03929            consumer_begin != consumer_end; ++consumer_begin)
03930         {
03931           (*consumer_begin)->stop (ACE_ENV_SINGLE_ARG_PARAMETER);
03932           ACE_TRY_CHECK;
03933         }
03934     }
03935   ACE_CATCHANY
03936     {
03937       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowConnection::stop");
03938       return;
03939     }
03940   ACE_ENDTRY;
03941   ACE_CHECK;
03942 }
03943 
03944 // start this flow.
03945 void
03946 TAO_FlowConnection::start (ACE_ENV_SINGLE_ARG_DECL)
03947   ACE_THROW_SPEC ((CORBA::SystemException))
03948 {
03949   ACE_TRY
03950     {
03951       FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03952         ();
03953       for (FlowConsumer_SetItor consumer_end =
03954              this->flow_consumer_set_.end ();
03955            consumer_begin != consumer_end; ++consumer_begin)
03956         {
03957           (*consumer_begin)->start (ACE_ENV_SINGLE_ARG_PARAMETER);
03958           ACE_TRY_CHECK;
03959         }
03960       FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03961         ();
03962       for (FlowProducer_SetItor producer_end =
03963              this->flow_producer_set_.end ();
03964            producer_begin != producer_end; ++producer_begin)
03965         {
03966           (*producer_begin)->start (ACE_ENV_SINGLE_ARG_PARAMETER);
03967           ACE_TRY_CHECK;
03968         }
03969     }
03970   ACE_CATCHANY
03971     {
03972       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowConnection::start");
03973       return;
03974     }
03975   ACE_ENDTRY;
03976   ACE_CHECK;
03977 }
03978 
03979 // destroy this flow.
03980 void
03981 TAO_FlowConnection::destroy (ACE_ENV_SINGLE_ARG_DECL)
03982   ACE_THROW_SPEC ((CORBA::SystemException))
03983 {
03984   ACE_TRY
03985     {
03986       FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03987         ();
03988       for (FlowProducer_SetItor producer_end =
03989              this->flow_producer_set_.end ();
03990            producer_begin != producer_end; ++producer_begin)
03991         {
03992           (*producer_begin)->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
03993           ACE_TRY_CHECK;
03994         }
03995       FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03996         ();
03997       for (FlowConsumer_SetItor consumer_end =
03998              this->flow_consumer_set_.end ();
03999            consumer_begin != consumer_end; ++consumer_begin)
04000         {
04001           (*consumer_begin)->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
04002           ACE_TRY_CHECK;
04003         }
04004     }
04005   ACE_CATCHANY
04006     {
04007       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowConnection::destroy");
04008       return;
04009     }
04010   ACE_ENDTRY;
04011   ACE_CHECK;
04012   int result = TAO_AV_Core::deactivate_servant (this);
04013   if (result < 0)
04014     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::destroy failed\n"));
04015 }
04016 
04017 // modify the QoS for this flow.
04018 CORBA::Boolean
04019 TAO_FlowConnection::modify_QoS (AVStreams::QoS & new_qos
04020                                 ACE_ENV_ARG_DECL_NOT_USED)
04021   ACE_THROW_SPEC ((CORBA::SystemException,
04022                    AVStreams::QoSRequestFailed))
04023 {
04024   ACE_UNUSED_ARG (new_qos);
04025   return 0;
04026 }
04027 
04028 // use the specified flow protocol for this flow.
04029 CORBA::Boolean
04030 TAO_FlowConnection::use_flow_protocol (const char * fp_name,
04031                                        const CORBA::Any & fp_settings
04032                                        ACE_ENV_ARG_DECL)
04033   ACE_THROW_SPEC ((CORBA::SystemException,
04034                    AVStreams::FPError,
04035                    AVStreams::notSupported))
04036 {
04037   this->fp_name_ = fp_name;
04038   this->fp_settings_ = fp_settings;
04039   FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
04040     ();
04041   for (FlowProducer_SetItor producer_end =
04042          this->flow_producer_set_.end ();
04043        producer_begin != producer_end; ++producer_begin)
04044     {
04045       (*producer_begin)->use_flow_protocol
04046         (fp_name, fp_settings ACE_ENV_ARG_PARAMETER);
04047       ACE_CHECK_RETURN (0);
04048     }
04049   FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
04050     ();
04051   for (FlowConsumer_SetItor consumer_end =
04052          this->flow_consumer_set_.end ();
04053        consumer_begin != consumer_end; ++consumer_begin)
04054     {
04055       (*consumer_begin)->use_flow_protocol
04056         (fp_name, fp_settings ACE_ENV_ARG_PARAMETER);
04057       ACE_CHECK_RETURN (0);
04058     }
04059   return 1;
04060 }
04061 
04062 void
04063 TAO_FlowConnection::push_event (const AVStreams::streamEvent & the_event
04064                                 ACE_ENV_ARG_DECL_NOT_USED)
04065   ACE_THROW_SPEC ((CORBA::SystemException))
04066 {
04067   ACE_UNUSED_ARG (the_event);
04068 }
04069 
04070 CORBA::Boolean
04071 TAO_FlowConnection::connect_devs (AVStreams::FDev_ptr a_party,
04072                                   AVStreams::FDev_ptr b_party,
04073                                   AVStreams::QoS & flow_qos
04074                                   ACE_ENV_ARG_DECL)
04075   ACE_THROW_SPEC ((CORBA::SystemException,
04076                    AVStreams::streamOpFailed,
04077                    AVStreams::streamOpDenied,
04078                    AVStreams::QoSRequestFailed))
04079 {
04080   CORBA::Boolean result = 0;
04081   ACE_TRY
04082     {
04083       AVStreams::FlowConnection_var flowconnection = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
04084       ACE_TRY_CHECK;
04085       CORBA::Boolean met_qos;
04086       CORBA::String_var named_fdev ((const char *)"");
04087       AVStreams::FlowProducer_var producer =
04088         a_party->create_producer (flowconnection.in (),
04089                                   flow_qos,
04090                                   met_qos,
04091                                   named_fdev.inout ()
04092                                   ACE_ENV_ARG_PARAMETER);
04093       ACE_TRY_CHECK;
04094       AVStreams::FlowConsumer_var consumer =
04095         b_party->create_consumer (flowconnection.in (),
04096                                   flow_qos,
04097                                   met_qos,
04098                                   named_fdev.inout ()
04099                                   ACE_ENV_ARG_PARAMETER);
04100       ACE_TRY_CHECK;
04101       result = this->connect (producer.in (),
04102                               consumer.in (),
04103                               flow_qos
04104                               ACE_ENV_ARG_PARAMETER);
04105       ACE_TRY_CHECK;
04106     }
04107   ACE_CATCHANY
04108     {
04109       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowConnection::connect_devs");
04110       return 0;
04111     }
04112   ACE_ENDTRY;
04113   ACE_CHECK_RETURN (0);
04114   return result;
04115 }
04116 
04117 // connect the producer and the consumer
04118 CORBA::Boolean
04119 TAO_FlowConnection::connect (AVStreams::FlowProducer_ptr producer,
04120                              AVStreams::FlowConsumer_ptr consumer,
04121                              AVStreams::QoS & the_qos
04122                              ACE_ENV_ARG_DECL)
04123   ACE_THROW_SPEC ((CORBA::SystemException,
04124                    AVStreams::formatMismatch,
04125                    AVStreams::FEPMismatch,
04126                    AVStreams::alreadyConnected))
04127 {
04128   ACE_TRY
04129     {
04130 
04131       AVStreams::FlowProducer_ptr flow_producer =
04132         AVStreams::FlowProducer::_duplicate (producer);
04133       AVStreams::FlowConsumer_ptr flow_consumer =
04134         AVStreams::FlowConsumer::_duplicate (consumer);
04135 
04136       this->flow_producer_set_.insert (flow_producer);
04137       this->flow_consumer_set_.insert (flow_consumer);
04138       AVStreams::FlowConnection_var flowconnection =
04139         this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
04140       ACE_TRY_CHECK;
04141 
04142       flow_producer->set_peer (flowconnection.in (),
04143                                flow_consumer,
04144                                the_qos
04145                                ACE_ENV_ARG_PARAMETER);
04146       ACE_TRY_CHECK;
04147 
04148       flow_consumer->set_peer (flowconnection.in (),
04149                                flow_producer,
04150                                the_qos
04151                                ACE_ENV_ARG_PARAMETER);
04152       ACE_TRY_CHECK;
04153 
04154       char *consumer_address =
04155         flow_consumer->go_to_listen (the_qos,
04156                                      0, // false for is_mcast
04157                                      flow_producer,
04158                                      this->fp_name_.inout ()
04159                                      ACE_ENV_ARG_PARAMETER);
04160       ACE_TRY_CHECK;
04161 
04162       if (ACE_OS::strcmp (consumer_address, "") == 0)
04163         {
04164           // Consumer is not willing to listen, so try the producer.
04165           consumer_address = flow_producer->go_to_listen (the_qos,
04166                                                           0, // false for is_mcast
04167                                                           flow_consumer,
04168                                                           this->fp_name_.inout ()
04169                                                           ACE_ENV_ARG_PARAMETER);
04170           ACE_TRY_CHECK;
04171           flow_consumer->connect_to_peer (the_qos,
04172                                           consumer_address,
04173                                           this->fp_name_.inout ()
04174                                           ACE_ENV_ARG_PARAMETER);
04175           ACE_TRY_CHECK;
04176           // @@ Naga: We have to find means to set the reverse channel for the producer.
04177           // Its broken in the point-to_point case for UDP.
04178         }
04179       else
04180         {
04181           if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::connect_to_peer addres: %s", consumer_address));
04182           flow_producer->connect_to_peer (the_qos,
04183                                           consumer_address,
04184                                           this->fp_name_.inout ()
04185                                           ACE_ENV_ARG_PARAMETER);
04186           ACE_TRY_CHECK;
04187         }
04188     }
04189   ACE_CATCHANY
04190     {
04191       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowConnection::connect");
04192     }
04193   ACE_ENDTRY;
04194   ACE_CHECK_RETURN (0);
04195   return 1;
04196 }
04197 
04198 
04199 CORBA::Boolean
04200 TAO_FlowConnection::disconnect (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
04201   ACE_THROW_SPEC ((CORBA::SystemException))
04202 {
04203   return  0;
04204 }
04205 
04206 CORBA::Boolean
04207 TAO_FlowConnection::add_producer (AVStreams::FlowProducer_ptr producer,
04208                                   AVStreams::QoS & the_qos
04209                                   ACE_ENV_ARG_DECL)
04210   ACE_THROW_SPEC ((CORBA::SystemException,
04211                    AVStreams::alreadyConnected,
04212                    AVStreams::notSupported))
04213 {
04214   ACE_TRY
04215     {
04216           AVStreams::FlowProducer_ptr flow_producer =
04217             AVStreams::FlowProducer::_duplicate (producer);
04218           // @@Naga:Sometimes the same producer could be added with a different object reference.
04219           // There's no portable way of comparing obj refs. but we have to do this till we find
04220           // a permanent solution.For eg. 2 different flowproducers for the same flow in a
04221           // Multipoint to Multipoint binding will have the same flowname and hence cannot be
04222           // used for resolving ties.
04223           FlowProducer_SetItor begin = this->flow_producer_set_.begin ();
04224           FlowProducer_SetItor end = this->flow_producer_set_.end ();
04225           for (; begin != end; ++begin)
04226             {
04227               if ((*begin)->_is_equivalent (producer
04228                                             ACE_ENV_ARG_PARAMETER))
04229               // producer exists in the set, a duplicate.
04230               ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1);
04231             }
04232           // We need to check the return value of the insert into the flow producer
04233           // set, since multiconnect could be called many times which will lead to
04234           // a call to add_producer every time a sink is added. If the producer is already
04235           // present in our list we just return immediately.
04236           int result = this->flow_producer_set_.insert (flow_producer);
04237           if (result == 1)
04238             {
04239               // producer exists in the set, a duplicate.
04240               ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1);
04241             }
04242           CORBA::Boolean met_qos;
04243           char mcast_address[BUFSIZ];
04244           if (this->producer_address_.in () == 0)
04245             {
04246               ACE_INET_Addr mcast_addr;
04247               mcast_addr.set (this->mcast_port_,
04248                               this->mcast_addr_.c_str ()
04249                               );
04250 
04251               char buf [BUFSIZ];
04252               mcast_addr.addr_to_string (buf, BUFSIZ);
04253               ACE_OS::sprintf (mcast_address, "%s=%s", this->protocol_.in (), buf);
04254             }
04255           else
04256             {
04257               ACE_OS::strcpy (mcast_address, this->producer_address_.in ());
04258             }
04259           char *address = flow_producer->connect_mcast (the_qos,
04260                                                         met_qos,
04261                                                         mcast_address,
04262                                                         this->fp_name_.in ()
04263                                                         ACE_ENV_ARG_PARAMETER);
04264 
04265           ACE_TRY_CHECK;
04266           if (this->producer_address_.in () == 0)
04267             {
04268               TAO_Forward_FlowSpec_Entry entry ("", "", "", "", address);
04269               if (entry.address () != 0)
04270                 {
04271                   // Internet multicasting is in use.
04272                   this->producer_address_ = address;
04273                 }
04274               else
04275                 {
04276                   // ATM Multicasting is in use.
04277                   this->ip_multicast_ = 0;
04278                 }
04279             }
04280           // set the multicast peer.
04281           if (CORBA::is_nil (this->mcastconfigif_.in ()))
04282             {
04283               ACE_NEW_RETURN (this->mcastconfigif_i_,
04284                               TAO_MCastConfigIf,
04285                               0);
04286               this->mcastconfigif_ = this->mcastconfigif_i_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
04287               ACE_TRY_CHECK;
04288             }
04289           AVStreams::FlowConnection_var flowconnection = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
04290           ACE_TRY_CHECK;
04291           flow_producer->set_Mcast_peer (flowconnection.in (),
04292                                          this->mcastconfigif_.in (),
04293                                          the_qos
04294                                          ACE_ENV_ARG_PARAMETER);
04295           ACE_TRY_CHECK;
04296     }
04297   ACE_CATCHANY
04298     {
04299       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowConnection::add_producer");
04300       return 0;
04301     }
04302   ACE_ENDTRY;
04303   ACE_CHECK_RETURN (0);
04304   return 1;
04305 }
04306 
04307 CORBA::Boolean
04308 TAO_FlowConnection::add_consumer (AVStreams::FlowConsumer_ptr consumer,
04309                                   AVStreams::QoS & the_qos
04310                                   ACE_ENV_ARG_DECL)
04311   ACE_THROW_SPEC ((CORBA::SystemException,
04312                    AVStreams::alreadyConnected))
04313 {
04314   ACE_TRY
04315     {
04316       AVStreams::FlowConsumer_ptr flow_consumer =
04317         AVStreams::FlowConsumer::_duplicate (consumer);
04318       FlowConsumer_SetItor begin = this->flow_consumer_set_.begin ();
04319       FlowConsumer_SetItor end = this->flow_consumer_set_.end ();
04320       for (; begin != end; ++begin)
04321         {
04322           if ((*begin)->_is_equivalent (consumer
04323                                         ACE_ENV_ARG_PARAMETER))
04324             // Consumer exists in the set, a duplicate.
04325             ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_Consumer: Consumer already exists\n"), 1);
04326         }
04327       int result = this->flow_consumer_set_.insert (flow_consumer);
04328       if (result == 1)
04329         {
04330           // consumer exists in the set, a duplicate.
04331           ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_consumer: consumer already exists\n"), 1);
04332         }
04333 
04334       FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin ();
04335       // @@Lets take that the first entry as the only producer. We're
04336       // not sure if we can have multiple flow producers in a
04337       // flowconnection. We can have multiple producer in the MtM binding,
04338       // in which case the first producer that gets added is the leader.
04339       AVStreams::FlowProducer_ptr flow_producer = (*producer_begin);
04340 
04341       AVStreams::protocolSpec protocols (1);
04342       protocols.length (1);
04343       protocols [0] = CORBA::string_dup (this->producer_address_.in ());
04344 
04345       if (!this->ip_multicast_)
04346         {
04347           flow_consumer->set_protocol_restriction (protocols
04348                                                    ACE_ENV_ARG_PARAMETER);
04349           ACE_TRY_CHECK;
04350           char * address =
04351             flow_consumer->go_to_listen (the_qos,
04352                                          1,
04353                                          flow_producer,
04354                                          this->fp_name_.inout ()
04355                                          ACE_ENV_ARG_PARAMETER);
04356           ACE_TRY_CHECK;
04357           CORBA::Boolean is_met;
04358           flow_producer->connect_mcast (the_qos,
04359                                         is_met,
04360                                         address,
04361                                         this->fp_name_.inout ()
04362                                         ACE_ENV_ARG_PARAMETER);
04363           ACE_TRY_CHECK;
04364         }
04365        else
04366         {
04367           // The spec says go_to_listen is called with the multicast
04368           // address returned from the connect_mcast call called
04369           // during add_producer. But go_to_listen doesn't have a
04370           // address parameter. I guess it should be connect_to_peer.
04371           // IP Multicasting.
04372           flow_consumer->connect_to_peer (the_qos,
04373                                           this->producer_address_.in (),
04374                                           this->fp_name_.inout ()
04375                                           ACE_ENV_ARG_PARAMETER);
04376 
04377           //  char * address =
04378           //             flow_consumer->go_to_listen (the_qos,
04379           //                                          1,
04380           //                                          flow_producer,
04381           //                                          this->fp_name_.inout ()
04382           //                                          ACE_ENV_ARG_PARAMETER);
04383 
04384           //            ACE_TRY_CHECK;
04385         }
04386       if (CORBA::is_nil (this->mcastconfigif_.in ()))
04387         ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowConnection::add_consumer: first add a producer and then a consumer\n"), 0);
04388       // @@ Is this the right place to do set_peer?
04389       AVStreams::flowSpec flow_spec;
04390       AVStreams::streamQoS stream_qos (1);
04391       stream_qos.length (1);
04392       stream_qos [0] = the_qos;
04393       this->mcastconfigif_->set_peer (flow_consumer,
04394                                       stream_qos,
04395                                       flow_spec
04396                                       ACE_ENV_ARG_PARAMETER);
04397       ACE_TRY_CHECK;
04398     }
04399   ACE_CATCHANY
04400     {
04401       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowConnection::add_consumer");
04402       return 0;
04403     }
04404   ACE_ENDTRY;
04405   ACE_CHECK_RETURN (0);
04406   return 1;
04407 }
04408 
04409 CORBA::Boolean
04410 TAO_FlowConnection::drop (AVStreams::FlowEndPoint_ptr target
04411                           ACE_ENV_ARG_DECL_NOT_USED)
04412   ACE_THROW_SPEC ((CORBA::SystemException,
04413                    AVStreams::notConnected))
04414 {
04415   ACE_UNUSED_ARG (target);
04416   return 0;
04417 }
04418 
04419 // -----------------------------------------------------------------
04420 // TAO_FlowEndPoint
04421 // -----------------------------------------------------------------
04422 
04423 //default constructor.
04424 TAO_FlowEndPoint::TAO_FlowEndPoint (void)
04425   :lock_ (0)
04426 {
04427 }
04428 
04429 TAO_FlowEndPoint::TAO_FlowEndPoint (const char *flowname,
04430                                     AVStreams::protocolSpec &protocols,
04431                                     const char *format)
04432 {
04433   this->open (flowname, protocols, format);
04434 }
04435 
04436 void
04437 TAO_FlowEndPoint::set_flow_handler (const char * /*flowname*/,
04438                                     TAO_AV_Flow_Handler * /*handler*/)
04439 {
04440 }
04441 
04442 int
04443 TAO_FlowEndPoint::open (const char *flowname,
04444                         AVStreams::protocolSpec &protocols,
04445                         const char *format)
04446 {
04447   this->flowname_ = flowname;
04448   this->format_ = format;
04449 
04450   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowEndPoint::open\n"));
04451   ACE_DECLARE_NEW_CORBA_ENV;
04452   ACE_TRY
04453     {
04454       CORBA::Any flowname_any;
04455       flowname_any <<= flowname;
04456       this->define_property ("FlowName",
04457                              flowname_any
04458                              ACE_ENV_ARG_PARAMETER);
04459       ACE_TRY_CHECK;
04460       this->set_format (format
04461                         ACE_ENV_ARG_PARAMETER);
04462       ACE_TRY_CHECK;
04463       this->protocol_addresses_ = protocols;
04464       AVStreams::protocolSpec protocol_spec (protocols.length ());
04465       protocol_spec.length (protocols.length ());
04466       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%N:%l\n"));
04467       for (u_int i=0;i<protocols.length ();i++)
04468         {
04469           CORBA::String_var address = CORBA::string_dup (protocols [i]);
04470           TAO_Forward_FlowSpec_Entry entry ("", "", "", "", address.in ());
04471           protocol_spec [i] = CORBA::string_dup (entry.carrier_protocol_str ());
04472           if (TAO_debug_level > 0)
04473             ACE_DEBUG ((LM_DEBUG,
04474                         "[%s]\n",
04475                         static_cast<char const*>(protocol_spec[i])));
04476         }
04477       this->set_protocol_restriction (protocol_spec
04478                                       ACE_ENV_ARG_PARAMETER);
04479       ACE_TRY_CHECK;
04480     }
04481   ACE_CATCHANY
04482     {
04483       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowEndPoint::open");
04484       return -1;
04485     }
04486   ACE_ENDTRY;
04487   ACE_CHECK_RETURN (-1);
04488   return 0;
04489 }
04490 
04491 
04492 int
04493 TAO_FlowEndPoint::set_flowname (const char *flowname)
04494 {
04495   this->flowname_ = flowname;
04496   return 0;
04497 }
04498 
04499 // used by one flowconnection so that multiple connections cant use
04500 // the same flowendpoint.
04501 CORBA::Boolean
04502 TAO_FlowEndPoint::lock (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
04503   ACE_THROW_SPEC ((CORBA::SystemException))
04504 {
04505   // lock the current flowendpoint
04506 
04507   if (this->lock_)
04508     return 0;
04509   this->lock_ = 1;
04510   return 1;
04511 }
04512 
04513 // unlocks the flowendpoint , becomes free to be used in another flow.
04514 void
04515 TAO_FlowEndPoint::unlock (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
04516   ACE_THROW_SPEC ((CORBA::SystemException))
04517 {
04518   this->lock_ = 0;
04519 }
04520 
04521 
04522 void
04523 TAO_FlowEndPoint::destroy (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
04524   ACE_THROW_SPEC ((CORBA::SystemException))
04525 {
04526   int result = TAO_AV_Core::deactivate_servant (this);
04527   if (result < 0)
04528     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::destroy failed\n"));
04529   TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04530   for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04531        begin != end; ++begin)
04532     (*begin)->protocol_object ()->destroy ();
04533 }
04534 
04535 AVStreams::StreamEndPoint_ptr
04536 TAO_FlowEndPoint::related_sep (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
04537   ACE_THROW_SPEC ((CORBA::SystemException))
04538 {
04539 
04540   return AVStreams::StreamEndPoint::_duplicate (this->related_sep_.in ());
04541 }
04542 
04543 void
04544 TAO_FlowEndPoint::related_sep (AVStreams::StreamEndPoint_ptr related_sep
04545                                ACE_ENV_ARG_DECL_NOT_USED)
04546   ACE_THROW_SPEC ((CORBA::SystemException))
04547 {
04548   this->related_sep_ = AVStreams::StreamEndPoint::_duplicate (related_sep);
04549 }
04550 
04551 AVStreams::FlowConnection_ptr
04552 TAO_FlowEndPoint::related_flow_connection (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
04553   ACE_THROW_SPEC ((CORBA::SystemException))
04554 {
04555   return AVStreams::FlowConnection::_duplicate (this->related_flow_connection_.in ());
04556 }
04557 
04558 void
04559 TAO_FlowEndPoint::related_flow_connection (AVStreams::FlowConnection_ptr related_flow_connection
04560                                            ACE_ENV_ARG_DECL_NOT_USED)
04561   ACE_THROW_SPEC ((CORBA::SystemException))
04562 {
04563   this->related_flow_connection_ = AVStreams::FlowConnection::_duplicate (related_flow_connection);
04564 }
04565 
04566 // returns the connected peer for this flow
04567 AVStreams::FlowEndPoint_ptr
04568 TAO_FlowEndPoint::get_connected_fep (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
04569   ACE_THROW_SPEC ((CORBA::SystemException,
04570                    AVStreams::notConnected,
04571                    AVStreams::notSupported))
04572 {
04573   return AVStreams::FlowEndPoint::_duplicate (this->peer_fep_.in ());
04574 }
04575 
04576 CORBA::Boolean
04577 TAO_FlowEndPoint::use_flow_protocol (const char * fp_name,
04578                                      const CORBA::Any &
04579                                      ACE_ENV_ARG_DECL)
04580   ACE_THROW_SPEC ((CORBA::SystemException,
04581                    AVStreams::FPError,
04582                    AVStreams::notSupported))
04583 {
04584   ACE_TRY
04585     {
04586       // Define the property called FlowProtocol
04587       CORBA::Any flowname_property;
04588       flowname_property <<= fp_name;
04589       this->define_property ("FlowProtocol",
04590                              flowname_property
04591                              ACE_ENV_ARG_PARAMETER);
04592       ACE_TRY_CHECK;
04593     }
04594   ACE_CATCHANY
04595     {
04596       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowEndPoint::use_flow_protocol");
04597       return 0;
04598     }
04599   ACE_ENDTRY;
04600   ACE_CHECK_RETURN (0);
04601   return 1;
04602 }
04603 
04604 void
04605 TAO_FlowEndPoint::set_format (const char * format
04606                               ACE_ENV_ARG_DECL)
04607   ACE_THROW_SPEC ((CORBA::SystemException,
04608                    AVStreams::notSupported))
04609 {
04610   this->format_ = format;
04611   ACE_TRY
04612     {
04613       // make this a property so that is_fep_compatible can query this and
04614       // check if 2 flowendpoints are compatible.
04615       CORBA::Any format_val;
04616       format_val <<= format;
04617       this->define_property ("Format",
04618                              format_val
04619                              ACE_ENV_ARG_PARAMETER);
04620       ACE_TRY_CHECK;
04621     }
04622   ACE_CATCHANY
04623     {
04624       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowEndpoint::set_format");
04625     }
04626   ACE_ENDTRY;
04627   ACE_CHECK;
04628 }
04629 
04630 void
04631 TAO_FlowEndPoint::set_dev_params (const CosPropertyService::Properties & new_settings
04632                                   ACE_ENV_ARG_DECL)
04633   ACE_THROW_SPEC ((CORBA::SystemException,
04634                    AVStreams::PropertyException,
04635                    AVStreams::streamOpFailed))
04636 {
04637   this->dev_params_ = new_settings;
04638   ACE_TRY
04639     {
04640       CORBA::Any DevParams_property;
04641       DevParams_property <<= new_settings;
04642       this->define_property ("DevParams",
04643                              DevParams_property
04644                              ACE_ENV_ARG_PARAMETER);
04645       ACE_TRY_CHECK;
04646     }
04647   ACE_CATCHANY
04648     {
04649       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowEndPoint::set_dev_params");
04650     }
04651   ACE_ENDTRY;
04652   ACE_CHECK;
04653 }
04654 
04655 void
04656 TAO_FlowEndPoint::set_protocol_restriction (const AVStreams::protocolSpec & protocols
04657                                             ACE_ENV_ARG_DECL)
04658   ACE_THROW_SPEC ((CORBA::SystemException,
04659                    AVStreams::notSupported))
04660 {
04661   ACE_TRY
04662     {
04663       u_int i = 0;
04664       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%N:%l\n"));
04665       for (i=0;i<protocols.length ();i++)
04666         {
04667           const char *protocol = (protocols)[i];
04668           if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%s\n", protocol));
04669         }
04670       CORBA::Any AvailableProtocols_property;
04671       AvailableProtocols_property <<= protocols;
04672       this->define_property ("AvailableProtocols",
04673                              AvailableProtocols_property
04674                              ACE_ENV_ARG_PARAMETER);
04675       ACE_TRY_CHECK;
04676       AVStreams::protocolSpec *temp_spec;
04677       CORBA::Any_var temp_any = this->get_property_value ("AvailableProtocols"
04678                                                           ACE_ENV_ARG_PARAMETER);
04679       ACE_TRY_CHECK;
04680       temp_any.in () >>= temp_spec;
04681       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%N:%l\n"));
04682       for (i=0;i<temp_spec->length ();i++)
04683         {
04684           const char *protocol = (*temp_spec)[i];
04685           if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%s\n", protocol));
04686         }
04687       this->protocols_ = protocols;
04688     }
04689   ACE_CATCHANY
04690     {
04691       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowEndpoint::set_protocol_restriction");
04692     }
04693   ACE_ENDTRY;
04694   ACE_CHECK;
04695 }
04696 
04697 CORBA::Boolean
04698 TAO_FlowEndPoint::is_fep_compatible (AVStreams::FlowEndPoint_ptr peer_fep
04699                                      ACE_ENV_ARG_DECL)
04700   ACE_THROW_SPEC ((CORBA::SystemException,
04701                    AVStreams::formatMismatch,
04702                    AVStreams::deviceQosMismatch))
04703 {
04704   const char *exception_message = "";
04705   ACE_TRY
04706     {
04707       // check whether the passed flowendpoint is compatible with this flowendpoint.
04708       // should we check for the availableFormats and choose one format.
04709       // get my format value
04710       CORBA::Any_var format_ptr;
04711       CORBA::String_var my_format, peer_format;
04712 
04713       exception_message = "TAO_FlowEndPoint::is_fep_compatible - Format";
04714       format_ptr = this->get_property_value ("Format"
04715                                              ACE_ENV_ARG_PARAMETER);
04716       ACE_TRY_CHECK;
04717 
04718       const char *temp_format;
04719       format_ptr.in () >>= temp_format;
04720       my_format = CORBA::string_dup (temp_format);
04721       // get my peer's format value
04722       exception_message = "TAO_FlowEndPoint::is_fep_compatible - Format[2]";
04723       format_ptr = peer_fep->get_property_value ("Format"
04724                                                  ACE_ENV_ARG_PARAMETER);
04725       ACE_TRY_CHECK;
04726       format_ptr.in () >>= temp_format;
04727       peer_format = CORBA::string_dup (temp_format);
04728       if (ACE_OS::strcmp (my_format.in (),
04729                           peer_format.in ()) != 0)
04730         return 0;
04731 
04732       // since formats are same, check for a common protocol
04733       CORBA::Any_var AvailableProtocols_ptr;
04734       AVStreams::protocolSpec my_protocol_spec, peer_protocol_spec;
04735       AVStreams::protocolSpec *temp_protocols;;
04736 
04737       exception_message =
04738         "TAO_FlowEndPoint::is_fep_compatible - AvailableProtocols";
04739       AvailableProtocols_ptr = this->get_property_value ("AvailableProtocols"
04740                                                          ACE_ENV_ARG_PARAMETER);
04741       ACE_TRY_CHECK;
04742       AvailableProtocols_ptr.in () >>= temp_protocols;
04743       my_protocol_spec = *temp_protocols;
04744 
04745       exception_message =
04746         "TAO_FlowEndPoint::is_fep_compatible - AvailableProtocols[2]";
04747       AvailableProtocols_ptr = peer_fep->get_property_value ("AvailableProtocols"
04748                                                              ACE_ENV_ARG_PARAMETER);
04749       ACE_TRY_CHECK;
04750       AvailableProtocols_ptr.in () >>= temp_protocols;
04751       peer_protocol_spec = *temp_protocols;
04752 
04753       int protocol_match = 0;
04754       for (u_int i=0;i<my_protocol_spec.length ();i++)
04755         {
04756           CORBA::String_var my_protocol_string;
04757           for (u_int j=0;j<peer_protocol_spec.length ();j++)
04758             {
04759               CORBA::String_var peer_protocol_string;
04760               my_protocol_string = CORBA::string_dup (my_protocol_spec[i]);
04761               peer_protocol_string = CORBA::string_dup (peer_protocol_spec[j]);
04762               if (ACE_OS::strcmp (my_protocol_string.in (), peer_protocol_string.in ()) == 0)
04763                 {
04764                   protocol_match = 1;
04765                   break;
04766                 }
04767             }
04768           if (protocol_match)
04769             break;
04770         }
04771       if (!protocol_match)
04772         return 0;
04773     }
04774   ACE_CATCH (CosPropertyService::PropertyNotFound, nf)
04775     {
04776       ACE_PRINT_EXCEPTION (nf,
04777                            exception_message);
04778     }
04779   ACE_CATCHANY
04780     {
04781       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
04782                            "TAO_FlowEndPoint::is_fep_compatible");
04783       return 0;
04784     }
04785   ACE_ENDTRY;
04786   ACE_CHECK_RETURN (0);
04787   return 1;
04788 }
04789 
04790 CORBA::Boolean
04791 TAO_FlowEndPoint::set_peer (AVStreams::FlowConnection_ptr /* the_fc */,
04792                             AVStreams::FlowEndPoint_ptr the_peer_fep,
04793                             AVStreams::QoS & /* the_qos */
04794                             ACE_ENV_ARG_DECL_NOT_USED)
04795   ACE_THROW_SPEC ((CORBA::SystemException,
04796                    AVStreams::QoSRequestFailed,
04797                    AVStreams::streamOpFailed))
04798 {
04799   this->peer_fep_ =
04800     AVStreams::FlowEndPoint::_duplicate (the_peer_fep);
04801   return 1;
04802 }
04803 
04804 CORBA::Boolean
04805 TAO_FlowEndPoint::set_Mcast_peer (AVStreams::FlowConnection_ptr /* the_fc */,
04806                                   AVStreams::MCastConfigIf_ptr mcast_peer,
04807                                   AVStreams::QoS & /* the_qos */
04808                                   ACE_ENV_ARG_DECL_NOT_USED)
04809   ACE_THROW_SPEC ((CORBA::SystemException,
04810                    AVStreams::QoSRequestFailed))
04811 {
04812   this->mcast_peer_ = AVStreams::MCastConfigIf::_duplicate (mcast_peer);
04813   return 0;
04814 }
04815 
04816 char *
04817 TAO_FlowEndPoint::go_to_listen_i (TAO_FlowSpec_Entry::Role role,
04818                                   AVStreams::QoS & /*the_qos*/,
04819                                   CORBA::Boolean /*is_mcast*/,
04820                                   AVStreams::FlowEndPoint_ptr peer_fep,
04821                                   char *& flowProtocol
04822                                   ACE_ENV_ARG_DECL)
04823   ACE_THROW_SPEC ((CORBA::SystemException,
04824                    AVStreams::failedToListen,
04825                    AVStreams::FPError,
04826                    AVStreams::QoSRequestFailed))
04827 {
04828   char direction [BUFSIZ];
04829   switch (role)
04830     {
04831     case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
04832       ACE_OS::strcpy (direction, "IN");
04833       break;
04834     case TAO_FlowSpec_Entry::TAO_AV_CONSUMER:
04835       ACE_OS::strcpy (direction, "OUT");
04836       break;
04837     default:
04838       break;
04839     }
04840   AVStreams::protocolSpec my_protocol_spec, peer_protocol_spec;
04841   AVStreams::protocolSpec *temp_protocols;
04842   CORBA::Any_var AvailableProtocols_ptr =
04843     peer_fep->get_property_value ("AvailableProtocols"
04844                                   ACE_ENV_ARG_PARAMETER);
04845   ACE_CHECK_RETURN (0);
04846   AvailableProtocols_ptr.in () >>= temp_protocols;
04847   peer_protocol_spec = *temp_protocols;
04848   AvailableProtocols_ptr =
04849     this->get_property_value ("AvailableProtocols"
04850                               ACE_ENV_ARG_PARAMETER);
04851   ACE_CHECK_RETURN (0);
04852   AvailableProtocols_ptr.in () >>= temp_protocols;
04853   my_protocol_spec = *temp_protocols;
04854   int protocol_match = 0;
04855   CORBA::String_var listen_protocol;
04856   u_int i =0;
04857   for (i=0;i<my_protocol_spec.length ();i++)
04858     {
04859       CORBA::String_var my_protocol_string;
04860       for (u_int j=0;j<peer_protocol_spec.length ();j++)
04861         {
04862           CORBA::String_var peer_protocol_string;
04863           my_protocol_string = CORBA::string_dup (my_protocol_spec[i]);
04864           peer_protocol_string = CORBA::string_dup (peer_protocol_spec[j]);
04865           if (ACE_OS::strcmp (my_protocol_string.in (), peer_protocol_string.in ()) == 0)
04866             {
04867               listen_protocol = my_protocol_string;
04868               protocol_match = 1;
04869               break;
04870             }
04871         }
04872       if (protocol_match)
04873         break;
04874     }
04875   if (!protocol_match)
04876     ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowEndPoint::go_to_listen failed: no protoocol match\n"), 0);
04877 
04878   for (u_int j=0;j<this->protocol_addresses_.length ();j++)
04879     if (ACE_OS::strncmp (this->protocol_addresses_ [j], listen_protocol.in (), ACE_OS::strlen (listen_protocol.in ())) == 0)
04880       {
04881         // Now listen on that protocol.
04882         TAO_Forward_FlowSpec_Entry *entry;
04883         ACE_NEW_RETURN (entry,
04884                         TAO_Forward_FlowSpec_Entry (this->flowname_.in (),
04885                                                     direction,
04886                                                     this->format_.in (),
04887                                                     flowProtocol,
04888                                                     this->protocol_addresses_ [j]),
04889                         0);
04890 
04891         TAO_AV_Acceptor_Registry *acceptor_registry = TAO_AV_CORE::instance ()->acceptor_registry ();
04892         this->flow_spec_set_.insert (entry);
04893         int result = acceptor_registry->open (this,
04894                                               TAO_AV_CORE::instance (),
04895                                               this->flow_spec_set_);
04896         if (result < 0)
04897           return 0;
04898         char *listen_address = entry->get_local_addr_str ();
04899         char *address;
04900         ACE_NEW_RETURN (address,
04901                         char [BUFSIZ],
04902                         0);
04903         ACE_OS::sprintf (address, "%s=%s", listen_protocol.in (), listen_address);
04904         return address;
04905       }
04906   return 0;
04907 }
04908 
04909 
04910 CORBA::Boolean
04911 TAO_FlowEndPoint::connect_to_peer_i (TAO_FlowSpec_Entry::Role role,
04912                                      AVStreams::QoS & /*the_qos*/,
04913                                      const char * address,
04914                                      const char * use_flow_protocol
04915                                      ACE_ENV_ARG_DECL_NOT_USED)
04916   ACE_THROW_SPEC ((CORBA::SystemException,
04917                    AVStreams::failedToConnect,
04918                    AVStreams::FPError,
04919                    AVStreams::QoSRequestFailed))
04920 {
04921   char direction [BUFSIZ];
04922   switch (role)
04923     {
04924     case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
04925       ACE_OS::strcpy (direction, "IN");
04926       break;
04927     case TAO_FlowSpec_Entry::TAO_AV_CONSUMER:
04928       ACE_OS::strcpy (direction, "OUT");
04929       break;
04930     default:
04931       break;
04932     }
04933   TAO_Forward_FlowSpec_Entry *entry;
04934   ACE_NEW_RETURN (entry,
04935                   TAO_Forward_FlowSpec_Entry (this->flowname_.in (),
04936                                               direction,
04937                                               this->format_.in (),
04938                                               use_flow_protocol,
04939                                               address),
04940                   0);
04941   this->flow_spec_set_.insert (entry);
04942   TAO_AV_Connector_Registry *connector_registry = TAO_AV_CORE::instance ()->connector_registry ();
04943   int result = connector_registry->open (this,
04944                                          TAO_AV_CORE::instance (),
04945                                          this->flow_spec_set_);
04946   if (result < 0)
04947     ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowEndPoint::connector_registry::open failed\n"), 0);
04948   this->reverse_channel_ = entry->get_local_addr_str ();
04949   return 1;
04950 }
04951 
04952 int
04953 TAO_FlowEndPoint::set_protocol_object (const char * /*flowname*/,
04954                                        TAO_AV_Protocol_Object * /*object*/)
04955 {
04956   return 0;
04957 }
04958 
04959 
04960 // ------------------------------------------------------------
04961 // TAO_FlowProducer class
04962 // ------------------------------------------------------------
04963 
04964 //default constructor
04965 TAO_FlowProducer::TAO_FlowProducer (void)
04966 {
04967 }
04968 
04969 TAO_FlowProducer::TAO_FlowProducer (const char *flowname,
04970                                     AVStreams::protocolSpec protocols,
04971                                     const char *format)
04972 {
04973   this->open (flowname, protocols, format);
04974 }
04975 
04976 // gets the reverse channel for feedback.
04977 char *
04978 TAO_FlowProducer::get_rev_channel (const char * /*pcol_name*/
04979                                    ACE_ENV_ARG_DECL_NOT_USED)
04980   ACE_THROW_SPEC ((CORBA::SystemException))
04981 {
04982   return 0;
04983 }
04984 
04985 // The start, stop and destroy are to be handled by the application.
04986 void
04987 TAO_FlowProducer::stop (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
04988   ACE_THROW_SPEC ((CORBA::SystemException))
04989 {
04990   TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04991   for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04992        begin != end; ++begin)
04993     {
04994       TAO_FlowSpec_Entry *entry = (*begin);
04995       entry->handler ()->stop (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
04996     }
04997 }
04998 
04999 void
05000 TAO_FlowProducer::start (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
05001   ACE_THROW_SPEC ((CORBA::SystemException))
05002 {
05003   TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
05004   for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
05005        begin != end; ++begin)
05006     {
05007       TAO_FlowSpec_Entry *entry = (*begin);
05008       if (entry->handler () != 0)
05009         {
05010           entry->handler ()->start (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
05011         }
05012       if (entry->control_handler () != 0)
05013         {
05014           entry->control_handler ()->start (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
05015         }
05016     }
05017 }
05018 
05019 char *
05020 TAO_FlowProducer::go_to_listen (AVStreams::QoS & the_qos,
05021                                 CORBA::Boolean is_mcast,
05022                                 AVStreams::FlowEndPoint_ptr peer_fep,
05023                                 char *& flowProtocol
05024                                 ACE_ENV_ARG_DECL)
05025   ACE_THROW_SPEC ((CORBA::SystemException,
05026                    AVStreams::failedToListen,
05027                    AVStreams::FPError,
05028                    AVStreams::QoSRequestFailed))
05029 {
05030   return this->go_to_listen_i (TAO_FlowSpec_Entry::TAO_AV_PRODUCER,
05031                                the_qos,
05032                                is_mcast,
05033                                peer_fep,
05034                                flowProtocol
05035                                ACE_ENV_ARG_PARAMETER);
05036 }
05037 
05038 CORBA::Boolean
05039 TAO_FlowProducer::connect_to_peer (AVStreams::QoS & the_qos,
05040                                    const char * address,
05041                                    const char * use_flow_protocol
05042                                    ACE_ENV_ARG_DECL)
05043   ACE_THROW_SPEC ((CORBA::SystemException,
05044                    AVStreams::failedToConnect,
05045                    AVStreams::FPError,
05046                    AVStreams::QoSRequestFailed))
05047 {
05048   return this->connect_to_peer_i (TAO_FlowSpec_Entry::TAO_AV_PRODUCER,
05049                                   the_qos,
05050                                   address,
05051                                   use_flow_protocol
05052                                   ACE_ENV_ARG_PARAMETER);
05053 }
05054 //  Connect to a IP multicast address.
05055 char *
05056 TAO_FlowProducer::connect_mcast (AVStreams::QoS & /* the_qos */,
05057                                  CORBA::Boolean_out /* is_met */,
05058                                  const char *address,
05059                                  const char * use_flow_protocol
05060                                  ACE_ENV_ARG_DECL_NOT_USED)
05061   ACE_THROW_SPEC ((CORBA::SystemException,
05062                    AVStreams::failedToConnect,
05063                    AVStreams::notSupported,
05064                    AVStreams::FPError,
05065                    AVStreams::QoSRequestFailed))
05066 {
05067   // The address variable gives the multicast address to subscribe to.
05068   for (u_int i=0;i<this->protocols_.length ();i++)
05069     {
05070       // choose the protocol which supports multicast.
05071     }
05072 
05073   if (address == 0)
05074     if (TAO_debug_level > 0)
05075       ACE_DEBUG ((LM_DEBUG, "TAO_FlowProducer::connect_mcast address is 0\n"));
05076   TAO_Forward_FlowSpec_Entry  *entry;
05077   ACE_NEW_RETURN (entry,
05078                   TAO_Forward_FlowSpec_Entry(this->flowname_.in (),
05079                                              "IN",
05080                                              this->format_.in (),
05081                                              use_flow_protocol,
05082                                              address),
05083                   0);
05084 
05085   this->flow_spec_set_.insert (entry);
05086   TAO_AV_Acceptor_Registry *acceptor_registry =
05087     TAO_AV_CORE::instance ()->acceptor_registry ();
05088   int result = acceptor_registry->open (this,
05089                                         TAO_AV_CORE::instance (),
05090                                         this->flow_spec_set_);
05091   if (result < 0)
05092     ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowProducer::connect_mcast:acceptor_registry open failed\n"), 0);
05093   // Now remove our handler from the reactor since we're a producer and dont want to get called for
05094   // multicast packets.
05095   ACE_Event_Handler *event_handler = entry->handler ()->event_handler ();
05096   event_handler->reactor ()->remove_handler (event_handler,
05097                                              ACE_Event_Handler::READ_MASK);
05098   return CORBA::string_dup (address);
05099 }
05100 
05101 // sets the key for this flow.
05102 void
05103 TAO_FlowProducer::set_key (const AVStreams::key & the_key
05104                            ACE_ENV_ARG_DECL)
05105   ACE_THROW_SPEC ((CORBA::SystemException))
05106 {
05107   ACE_TRY
05108     {
05109       CORBA::Any anyval;
05110       anyval <<= the_key;
05111       this->define_property ("PublicKey",
05112                              anyval
05113                              ACE_ENV_ARG_PARAMETER);
05114       ACE_TRY_CHECK;
05115     }
05116   ACE_CATCHANY
05117     {
05118       ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowProducer::set_key");
05119     }
05120   ACE_ENDTRY;
05121   ACE_CHECK;
05122 }
05123 
05124 // source id to be used to distinguish this source from others.
05125 void
05126 TAO_FlowProducer::set_source_id (CORBA::Long source_id
05127                                  ACE_ENV_ARG_DECL_NOT_USED)
05128   ACE_THROW_SPEC ((CORBA::SystemException))
05129 {
05130   this->source_id_ = source_id;
05131 }
05132 
05133 // ------------------------------------------------------------
05134 // TAO_FlowConsumer
05135 // ------------------------------------------------------------
05136 
05137 
05138 // default constructor.
05139 TAO_FlowConsumer::TAO_FlowConsumer (void)
05140 {
05141 }
05142 
05143 TAO_FlowConsumer::TAO_FlowConsumer (const char *flowname,
05144                                     AVStreams::protocolSpec protocols,
05145                                     const char *format)
05146 {
05147   this->open (flowname, protocols, format);
05148 }
05149 
05150 // The start, stop and destroy are to be handled by the application.
05151 void
05152 TAO_FlowConsumer::stop (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
05153   ACE_THROW_SPEC ((CORBA::SystemException))
05154 {
05155   TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
05156   for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
05157        begin != end; ++begin)
05158     (*begin)->handler ()->stop (TAO_FlowSpec_Entry::TAO_AV_CONSUMER);
05159 }
05160 
05161 void
05162 TAO_FlowConsumer::start (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
05163   ACE_THROW_SPEC ((CORBA::SystemException))
05164 {
05165   TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
05166   for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
05167        begin != end; ++begin)
05168     {
05169       (*begin)->handler ()->start (TAO_FlowSpec_Entry::TAO_AV_CONSUMER);
05170     }
05171 }
05172 
05173 char *
05174 TAO_FlowConsumer::go_to_listen (AVStreams::QoS & the_qos,
05175                                 CORBA::Boolean is_mcast,
05176                                 AVStreams::FlowEndPoint_ptr peer_fep,
05177                                 char *& flowProtocol
05178                                 ACE_ENV_ARG_DECL)
05179   ACE_THROW_SPEC ((CORBA::SystemException,
05180                    AVStreams::failedToListen,
05181                    AVStreams::FPError,
05182                    AVStreams::QoSRequestFailed))
05183 {
05184   return this->go_to_listen_i (TAO_FlowSpec_Entry::TAO_AV_CONSUMER,
05185                                the_qos,
05186                                is_mcast,
05187                                peer_fep,
05188                                flowProtocol
05189                                ACE_ENV_ARG_PARAMETER);
05190 }
05191 
05192 CORBA::Boolean
05193 TAO_FlowConsumer::connect_to_peer (AVStreams::QoS & the_qos,
05194                                    const char * address,
05195                                    const char * use_flow_protocol
05196                                    ACE_ENV_ARG_DECL)
05197   ACE_THROW_SPEC ((CORBA::SystemException,
05198                    AVStreams::failedToConnect,
05199                    AVStreams::FPError,
05200                    AVStreams::QoSRequestFailed))
05201 {
05202   return this->connect_to_peer_i (TAO_FlowSpec_Entry::TAO_AV_CONSUMER,
05203                                   the_qos,
05204                                   address,
05205                                   use_flow_protocol
05206                                   ACE_ENV_ARG_PARAMETER);
05207 }
05208 
05209 //------------------------------------------------------------
05210 // TAO_Tokenizer
05211 //------------------------------------------------------------
05212 TAO_Tokenizer::TAO_Tokenizer (const char *string, char delimiter)
05213   :token_array_ (10),
05214    count_ (0)
05215 {
05216   this->parse (string, delimiter);
05217 }
05218 
05219 TAO_Tokenizer::~TAO_Tokenizer ()
05220 {
05221   for (unsigned int i=0; i<this->num_tokens_; i++)
05222     CORBA::string_free (this->token_array_[i]);
05223 }
05224 
05225 
05226 int
05227 TAO_Tokenizer::parse (const char *string, char delimiter)
05228 {
05229   ACE_CString new_string (string);
05230   u_int pos =0;
05231   int slash_pos = 0;
05232   u_int count = 0;
05233   int result;
05234   while (pos < new_string.length ())
05235     {
05236       slash_pos = new_string.find (delimiter, pos);
05237       ACE_CString substring;
05238       if (slash_pos != new_string.npos)
05239         {
05240           substring = new_string.substring (pos,
05241                                             slash_pos - pos);
05242           pos = slash_pos + 1;
05243         }
05244       else
05245         {
05246           substring = new_string.substring (pos);
05247           pos = static_cast<int> (new_string.length ());
05248         }
05249       char *token = CORBA::string_dup (substring.c_str ());
05250       result = this->token_array_.set (token, count);
05251       if (result == -1)
05252         {
05253           this->token_array_.size (this->token_array_.size ()*2);
05254           result = this->token_array_.set (token, count);
05255           if (result == -1)
05256             ACE_ERROR_RETURN ((LM_ERROR, "TAO_Tokenizer::parse error"), -1);
05257         }
05258       count++;
05259     }
05260 
05261   /*
05262   ACE_OS::strcpy (this->string_ , string);
05263   char delimiter_str [2] = {0, 0};
05264   delimiter_str [0] = delimiter;
05265   char *token = ACE_OS::strtok (this->string_, delimiter_str);
05266 
05267   while (token != 0)
05268     {
05269       result = this->token_array_.set (token, count);
05270       if (result == -1)
05271         {
05272           this->token_array_.size (this->token_array_.size ()*2);
05273           result = this->token_array_.set (token, count);
05274           if (result == -1)
05275             ACE_ERROR_RETURN ((LM_ERROR, "TAO_Tokenizer::parse error"), -1);
05276         }
05277       token = ACE_OS::strtok (0, delimiter_str);
05278       count++;
05279     }
05280   */
05281   this->num_tokens_ = count;
05282   return 0;
05283 }
05284 
05285 char*
05286 TAO_Tokenizer::token (void)
05287 {
05288   if (count_ < num_tokens_)
05289     return CORBA::string_dup (this->token_array_[this->count_++]);
05290   else
05291     return 0;
05292 }
05293 
05294 int
05295 TAO_Tokenizer::num_tokens (void)
05296 {
05297   return static_cast<int> (this->num_tokens_);
05298 }
05299 
05300 const char *
05301 TAO_Tokenizer::operator [] (size_t index) const
05302 {
05303   if (index >= this->num_tokens_)
05304     return 0;
05305 
05306   return this->token_array_[index];
05307 }
05308 
05309 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:44:40 2006 for TAO_AV by doxygen 1.3.6