00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #include "orbsvcs/AV/AVStreams_i.h"
00018 #include "orbsvcs/AV/sfp.h"
00019 #include "orbsvcs/AV/MCast.h"
00020 #include "orbsvcs/AV/RTCP.h"
00021
00022 #include "tao/debug.h"
00023 #include "tao/ORB_Core.h"
00024 #include "tao/AnyTypeCode/Any.h"
00025 #include "ace/OS_NS_arpa_inet.h"
00026
00027 #if !defined (__ACE_INLINE__)
00028 #include "orbsvcs/AV/AVStreams_i.i"
00029 #endif
00030
00031 ACE_RCSID (AV,
00032 AVStreams_i,
00033 "AVStreams_i.cpp,v 5.138 2006/04/24 13:53:27 jwillemsen Exp")
00034
00035 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00036
00037
00038
00039
00040
00041 TAO_AV_QoS::TAO_AV_QoS (void)
00042 {
00043 }
00044
00045 TAO_AV_QoS::TAO_AV_QoS (AVStreams::streamQoS &stream_qos)
00046 {
00047 this->set (stream_qos);
00048 }
00049
00050 int
00051 TAO_AV_QoS::convert (AVStreams::streamQoS &)
00052 {
00053 return -1;
00054 }
00055
00056
00057
00058
00059
00060 AV_Null_MediaCtrl::AV_Null_MediaCtrl (void)
00061 {
00062 }
00063
00064 AV_Null_MediaCtrl::~AV_Null_MediaCtrl (void)
00065 {
00066 }
00067
00068
00069
00070
00071
00072
00073
00074 TAO_Basic_StreamCtrl::TAO_Basic_StreamCtrl (void)
00075 :flow_count_ (0)
00076 {
00077 }
00078
00079
00080
00081
00082 void
00083 TAO_Basic_StreamCtrl::stop (const AVStreams::flowSpec &flow_spec
00084 ACE_ENV_ARG_DECL)
00085 ACE_THROW_SPEC ((CORBA::SystemException,
00086 AVStreams::noSuchFlow))
00087 {
00088 ACE_TRY
00089 {
00090
00091
00092 if (this->flow_connection_map_.current_size () > 0)
00093 {
00094 if (flow_spec.length () > 0)
00095 for (u_int i=0;i<flow_spec.length ();i++)
00096 {
00097 char *flowname = TAO_AV_Core::get_flowname (flow_spec[i]);
00098 ACE_CString flow_name_key (flowname);
00099 AVStreams::FlowConnection_var flow_connection_entry;
00100 if (this->flow_connection_map_.find (flow_name_key,
00101 flow_connection_entry) == 0)
00102 {
00103 flow_connection_entry->stop (ACE_ENV_SINGLE_ARG_PARAMETER);
00104 ACE_TRY_CHECK;
00105 }
00106 }
00107 else
00108 {
00109
00110 FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
00111 FlowConnection_Map_Entry *entry;
00112 for (;iterator.next (entry) != 0;iterator.advance ())
00113 {
00114 entry->int_id_->stop (ACE_ENV_SINGLE_ARG_PARAMETER);
00115 ACE_TRY_CHECK;
00116 }
00117 }
00118 }
00119 }
00120 ACE_CATCHANY
00121 {
00122 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_Basic_StreamCtrl::stop");
00123 return;
00124 }
00125 ACE_ENDTRY;
00126 ACE_CHECK;
00127 }
00128
00129
00130
00131 void
00132 TAO_Basic_StreamCtrl::start (const AVStreams::flowSpec &flow_spec
00133 ACE_ENV_ARG_DECL)
00134 ACE_THROW_SPEC ((CORBA::SystemException,
00135 AVStreams::noSuchFlow))
00136 {
00137 ACE_TRY
00138 {
00139
00140
00141
00142 if (this->flow_connection_map_.current_size () > 0)
00143 {
00144 if (flow_spec.length () > 0)
00145 for (u_int i = 0; i < flow_spec.length (); i++)
00146 {
00147 char *flowname = TAO_AV_Core::get_flowname (flow_spec[i]);
00148 ACE_CString flow_name_key (flowname);
00149 FlowConnection_Map::ENTRY *flow_connection_entry = 0;
00150 if (this->flow_connection_map_.find (flow_name_key,
00151 flow_connection_entry) == 0)
00152 {
00153 flow_connection_entry->int_id_->start (ACE_ENV_SINGLE_ARG_PARAMETER);
00154 ACE_TRY_CHECK;
00155 }
00156 }
00157 else
00158 {
00159
00160 FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
00161 FlowConnection_Map_Entry *entry = 0;
00162 for (;iterator.next (entry) != 0;iterator.advance ())
00163 {
00164 entry->int_id_->start (ACE_ENV_SINGLE_ARG_PARAMETER);
00165 ACE_TRY_CHECK;
00166 }
00167 }
00168 }
00169 }
00170 ACE_CATCHANY
00171 {
00172 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_Basic_StreamCtrl::start");
00173 return;
00174 }
00175 ACE_ENDTRY;
00176 ACE_CHECK;
00177 }
00178
00179
00180
00181
00182
00183 void
00184 TAO_Basic_StreamCtrl::destroy (const AVStreams::flowSpec &flow_spec
00185 ACE_ENV_ARG_DECL)
00186 ACE_THROW_SPEC ((CORBA::SystemException,
00187 AVStreams::noSuchFlow))
00188 {
00189 ACE_TRY
00190 {
00191
00192 if (this->flow_connection_map_.current_size () > 0)
00193 {
00194 if (flow_spec.length () > 0)
00195 {
00196 for (u_int i=0;i<flow_spec.length ();i++)
00197 {
00198 char *flowname = TAO_AV_Core::get_flowname (flow_spec[i]);
00199 ACE_CString flow_name_key (flowname);
00200 FlowConnection_Map::ENTRY *flow_connection_entry = 0;
00201 if (this->flow_connection_map_.find (flow_name_key, flow_connection_entry) == 0)
00202 {
00203 flow_connection_entry->int_id_->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00204 ACE_TRY_CHECK;
00205 }
00206 }
00207 }
00208 else
00209 {
00210
00211 FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
00212 FlowConnection_Map_Entry *entry = 0;
00213 for (;iterator.next (entry) != 0;iterator.advance ())
00214 {
00215 entry->int_id_->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00216 ACE_TRY_CHECK;
00217 }
00218 }
00219 }
00220 }
00221 ACE_CATCHANY
00222 {
00223 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_Basic_StreamCtrl::destroy");
00224 return;
00225 }
00226 ACE_ENDTRY;
00227 ACE_CHECK;
00228 }
00229
00230
00231
00232 CORBA::Boolean
00233
00234 TAO_Basic_StreamCtrl::modify_QoS (AVStreams::streamQoS & ,
00235 const AVStreams::flowSpec &
00236 ACE_ENV_ARG_DECL_NOT_USED)
00237 ACE_THROW_SPEC ((CORBA::SystemException,
00238 AVStreams::noSuchFlow,
00239 AVStreams::QoSRequestFailed))
00240 {
00241 return 1;
00242 }
00243
00244
00245
00246 void
00247 TAO_Basic_StreamCtrl::push_event (const struct CosPropertyService::Property &
00248 ACE_ENV_ARG_DECL_NOT_USED)
00249 ACE_THROW_SPEC ((CORBA::SystemException))
00250 {
00251 if (TAO_debug_level > 0)
00252 ACE_DEBUG ((LM_DEBUG, "\n(%P|%t) Recieved event \""));
00253 }
00254
00255
00256 void
00257 TAO_Basic_StreamCtrl::set_FPStatus (const AVStreams::flowSpec &flow_spec,
00258 const char *fp_name,
00259 const CORBA::Any &fp_settings
00260 ACE_ENV_ARG_DECL)
00261 ACE_THROW_SPEC ((CORBA::SystemException,
00262 AVStreams::noSuchFlow,
00263 AVStreams::FPError))
00264
00265 {
00266 if (!CORBA::is_nil (this->sep_a_.in ()))
00267 {
00268 this->sep_a_->set_FPStatus (flow_spec, fp_name, fp_settings ACE_ENV_ARG_PARAMETER);
00269 ACE_CHECK;
00270 }
00271 }
00272
00273
00274 CORBA::Object_ptr
00275 TAO_Basic_StreamCtrl::get_flow_connection (const char *flow_name
00276 ACE_ENV_ARG_DECL)
00277 ACE_THROW_SPEC ((CORBA::SystemException,
00278 AVStreams::noSuchFlow,
00279 AVStreams::notSupported))
00280 {
00281 ACE_CString flow_name_key (flow_name);
00282 AVStreams::FlowConnection_var flow_connection_entry;
00283
00284 if (this->flow_connection_map_.find (flow_name_key, flow_connection_entry) == 0){
00285 return flow_connection_entry._retn();
00286 }
00287 else{
00288 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%N,%l) Cannot find flow: %s\n", flow_name ));
00289 ACE_THROW_RETURN (AVStreams::noSuchFlow (), CORBA::Object::_nil ());
00290 }
00291 }
00292
00293
00294 void
00295 TAO_Basic_StreamCtrl::set_flow_connection (const char *flow_name,
00296 CORBA::Object_ptr flow_connection_obj
00297 ACE_ENV_ARG_DECL)
00298 ACE_THROW_SPEC ((CORBA::SystemException,
00299 AVStreams::noSuchFlow,
00300 AVStreams::notSupported))
00301 {
00302 AVStreams::FlowConnection_var flow_connection;
00303 ACE_TRY
00304 {
00305 flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj ACE_ENV_ARG_PARAMETER);
00306 ACE_TRY_CHECK;
00307 }
00308 ACE_CATCHANY
00309 {
00310 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_Basic_StreamCtrl::set_flow_connection");
00311 return;
00312 }
00313 ACE_ENDTRY;
00314 ACE_CHECK;
00315
00316 this->flows_.length (this->flow_count_ + 1);
00317 this->flows_ [this->flow_count_++] = CORBA::string_dup (flow_name);
00318 ACE_CString flow_name_key (flow_name);
00319 if (this->flow_connection_map_.bind (flow_name_key, flow_connection) != 0)
00320 {
00321 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%N,%l) Cannot find flow: %s\n", flow_name ));
00322 ACE_THROW (AVStreams::noSuchFlow ());
00323 }
00324 }
00325
00326 TAO_Basic_StreamCtrl::~TAO_Basic_StreamCtrl (void)
00327 {
00328 }
00329
00330
00331
00332
00333
00334 CORBA::Boolean
00335 TAO_Negotiator::negotiate (AVStreams::Negotiator_ptr ,
00336 const AVStreams::streamQoS &
00337 ACE_ENV_ARG_DECL_NOT_USED)
00338 ACE_THROW_SPEC ((CORBA::SystemException))
00339 {
00340 ACE_DEBUG ((LM_DEBUG,
00341 "TAO_Negotiator::negotiate\n"));
00342 return 0;
00343 }
00344
00345
00346
00347
00348
00349 const int MMDevice_Map_Hash_Key::hash_maximum_ = 10000;
00350
00351
00352 MMDevice_Map_Hash_Key::MMDevice_Map_Hash_Key (void)
00353 {
00354 this->mmdevice_ = AVStreams::MMDevice::_nil ();
00355 }
00356
00357
00358 MMDevice_Map_Hash_Key::MMDevice_Map_Hash_Key (AVStreams::MMDevice_ptr mmdevice)
00359 {
00360 this->mmdevice_ = AVStreams::MMDevice::_duplicate (mmdevice);
00361 }
00362
00363
00364 MMDevice_Map_Hash_Key::MMDevice_Map_Hash_Key (const MMDevice_Map_Hash_Key& hash_key)
00365 {
00366 this->mmdevice_ = AVStreams::MMDevice::_duplicate (hash_key.mmdevice_);
00367 }
00368
00369
00370 MMDevice_Map_Hash_Key::~MMDevice_Map_Hash_Key (void)
00371 {
00372 CORBA::release (this->mmdevice_);
00373 }
00374
00375 bool
00376 MMDevice_Map_Hash_Key::operator == (const MMDevice_Map_Hash_Key &hash_key) const
00377 {
00378 CORBA::Boolean result = 0;
00379 ACE_DECLARE_NEW_CORBA_ENV;
00380 ACE_TRY
00381 {
00382 result =
00383 this->mmdevice_->_is_equivalent (hash_key.mmdevice_
00384 ACE_ENV_ARG_PARAMETER);
00385 ACE_TRY_CHECK;
00386 }
00387 ACE_CATCHANY
00388 {
00389 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "MMDevice_Map_Hash_Key::operator == ");
00390 return false;
00391 }
00392 ACE_ENDTRY;
00393 ACE_CHECK_RETURN (false);
00394
00395 return result;
00396 }
00397
00398 bool
00399 operator < (const MMDevice_Map_Hash_Key &left,
00400 const MMDevice_Map_Hash_Key &right)
00401 {
00402 bool result = false;
00403
00404 ACE_DECLARE_NEW_CORBA_ENV;
00405 ACE_TRY
00406 {
00407 const CORBA::ULong left_hash =
00408 left.mmdevice_->_hash (left.hash_maximum_
00409 ACE_ENV_ARG_PARAMETER);
00410 ACE_TRY_CHECK;
00411
00412 const CORBA::ULong right_hash =
00413 right.mmdevice_->_hash (right.hash_maximum_
00414 ACE_ENV_ARG_PARAMETER);
00415 ACE_TRY_CHECK;
00416
00417 result = left_hash < right_hash;
00418 }
00419 ACE_CATCHANY
00420 {
00421 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00422 "operator < for MMDevice_Map_Hash_Key");
00423 return false;
00424 }
00425 ACE_ENDTRY;
00426 ACE_CHECK_RETURN (false);
00427
00428 return result;
00429 }
00430
00431 u_long
00432 MMDevice_Map_Hash_Key::hash (void) const
00433 {
00434 u_long result = 0;
00435 ACE_DECLARE_NEW_CORBA_ENV;
00436 ACE_TRY
00437 {
00438 result = this->mmdevice_->_hash (this->hash_maximum_
00439 ACE_ENV_ARG_PARAMETER);
00440 ACE_TRY_CHECK;
00441 }
00442 ACE_CATCHANY
00443 {
00444 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "MMDevice_Map_Hash_Key::hash");
00445 return 0;
00446 }
00447 ACE_ENDTRY;
00448 ACE_CHECK_RETURN (0);
00449 return result;
00450 }
00451
00452
00453
00454
00455
00456 TAO_StreamCtrl::TAO_StreamCtrl (void)
00457 :mcastconfigif_ (0)
00458 {
00459 ACE_DECLARE_NEW_CORBA_ENV;
00460 ACE_TRY
00461 {
00462 this->streamctrl_ = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
00463 ACE_TRY_CHECK;
00464 char buf [BUFSIZ];
00465 int result = ACE_OS::hostname (buf, BUFSIZ);
00466 unsigned long ipaddr = 0;
00467 if (result == 0)
00468 ipaddr = ACE_OS::inet_addr (buf);
00469 this->source_id_ = TAO_AV_RTCP::alloc_srcid (ipaddr);
00470 }
00471 ACE_CATCHANY
00472 {
00473 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamCtrl::TAO_StreamCtrl");
00474 }
00475 ACE_ENDTRY;
00476 ACE_CHECK;
00477 }
00478
00479 TAO_StreamCtrl::~TAO_StreamCtrl (void)
00480 {
00481 delete this->mcastconfigif_;
00482 }
00483
00484
00485
00486
00487 void
00488 TAO_StreamCtrl::stop (const AVStreams::flowSpec &flow_spec
00489 ACE_ENV_ARG_DECL)
00490 ACE_THROW_SPEC ((CORBA::SystemException,
00491 AVStreams::noSuchFlow))
00492 {
00493 ACE_TRY
00494 {
00495 TAO_Basic_StreamCtrl::stop (flow_spec ACE_ENV_ARG_PARAMETER);
00496 ACE_TRY_CHECK;
00497 if (this->flow_connection_map_.current_size () > 0)
00498 return;
00499 MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
00500 MMDevice_Map::ENTRY *entry = 0;
00501 for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
00502 {
00503 entry->int_id_.sep_->stop (flow_spec ACE_ENV_ARG_PARAMETER);
00504 ACE_TRY_CHECK;
00505 }
00506 MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
00507 for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
00508 {
00509 entry->int_id_.sep_->stop (flow_spec ACE_ENV_ARG_PARAMETER);
00510 ACE_TRY_CHECK;
00511 }
00512 }
00513 ACE_CATCHANY
00514 {
00515 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_Basic_StreamCtrl::stop");
00516 return;
00517 }
00518 ACE_ENDTRY;
00519 ACE_CHECK;
00520 }
00521
00522
00523
00524 void
00525 TAO_StreamCtrl::start (const AVStreams::flowSpec &flow_spec
00526 ACE_ENV_ARG_DECL)
00527 ACE_THROW_SPEC ((CORBA::SystemException,
00528 AVStreams::noSuchFlow))
00529 {
00530 ACE_TRY
00531 {
00532 TAO_Basic_StreamCtrl::start (flow_spec ACE_ENV_ARG_PARAMETER);
00533 ACE_TRY_CHECK;
00534 if (this->flow_connection_map_.current_size () > 0)
00535 return;
00536
00537 MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
00538 MMDevice_Map::ENTRY *entry = 0;
00539 for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
00540 {
00541 entry->int_id_.sep_->start (flow_spec ACE_ENV_ARG_PARAMETER);
00542 ACE_TRY_CHECK;
00543 }
00544 MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
00545 for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
00546 {
00547 entry->int_id_.sep_->start (flow_spec ACE_ENV_ARG_PARAMETER);
00548 ACE_TRY_CHECK;
00549 }
00550 }
00551 ACE_CATCHANY
00552 {
00553 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamCtrl::start");
00554 return;
00555 }
00556 ACE_ENDTRY;
00557 ACE_CHECK;
00558 }
00559
00560
00561
00562
00563 void
00564 TAO_StreamCtrl::destroy (const AVStreams::flowSpec &flow_spec
00565 ACE_ENV_ARG_DECL)
00566 ACE_THROW_SPEC ((CORBA::SystemException,
00567 AVStreams::noSuchFlow))
00568 {
00569 ACE_TRY
00570 {
00571 TAO_Basic_StreamCtrl::destroy (flow_spec ACE_ENV_ARG_PARAMETER);
00572 ACE_TRY_CHECK;
00573 if (this->flow_connection_map_.current_size () > 0)
00574 return;
00575
00576 MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
00577 MMDevice_Map::ENTRY *entry = 0;
00578 for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
00579 {
00580 entry->int_id_.sep_->destroy (flow_spec ACE_ENV_ARG_PARAMETER);
00581 ACE_TRY_CHECK;
00582 }
00583 MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
00584 for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
00585 {
00586 entry->int_id_.sep_->destroy (flow_spec ACE_ENV_ARG_PARAMETER);
00587 ACE_TRY_CHECK;
00588 }
00589 }
00590 ACE_CATCHANY
00591 {
00592 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamCtrl::destroy");
00593 return;
00594 }
00595 ACE_ENDTRY;
00596 ACE_CHECK;
00597
00598 int result = TAO_AV_Core::deactivate_servant (this);
00599 if (result < 0)
00600 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamCtrl::destroy failed\n"));
00601 }
00602
00603
00604
00605
00606
00607 CORBA::Boolean
00608 TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party,
00609 AVStreams::MMDevice_ptr b_party,
00610 AVStreams::streamQoS &the_qos,
00611 const AVStreams::flowSpec &the_flows
00612 ACE_ENV_ARG_DECL)
00613 ACE_THROW_SPEC ((CORBA::SystemException,
00614 AVStreams::streamOpFailed,
00615 AVStreams::noSuchFlow,
00616 AVStreams::QoSRequestFailed))
00617 {
00618 ACE_TRY
00619 {
00620 if (CORBA::is_nil (a_party) && CORBA::is_nil (b_party))
00621 ACE_ERROR_RETURN ((LM_ERROR, "Both parties are nil\n"), 0);
00622
00623 if (TAO_debug_level > 0)
00624 if (CORBA::is_nil (a_party) ||
00625 CORBA::is_nil (b_party))
00626 if (TAO_debug_level > 0)
00627 ACE_DEBUG ((LM_DEBUG,
00628 "(%P|%t) TAO_StreamCtrl::bind_devs: "
00629 "a_party or b_party is null"
00630 "Multicast mode\n"));
00631
00632
00633 CORBA::Boolean met_qos;
00634 CORBA::String_var named_vdev;
00635
00636 if (!CORBA::is_nil (a_party))
00637 {
00638 MMDevice_Map_Hash_Key find_key (a_party);
00639 MMDevice_Map_Entry find_entry;
00640 int result =
00641 this->mmdevice_a_map_.find (find_key, find_entry);
00642 if (result == 0)
00643 {
00644 if (TAO_debug_level > 0)
00645 {
00646
00647 if (TAO_debug_level > 0)
00648 ACE_DEBUG ((LM_DEBUG, "mmdevice a_party is already bound\n"));
00649 }
00650 return 1;
00651 }
00652 else
00653 {
00654 this->sep_a_ =
00655 a_party-> create_A (this->streamctrl_.in (),
00656 this->vdev_a_.out (),
00657 the_qos,
00658 met_qos,
00659 named_vdev.inout (),
00660 the_flows
00661 ACE_ENV_ARG_PARAMETER);
00662 ACE_TRY_CHECK;
00663
00664 if (TAO_debug_level > 0)
00665 ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_StreamCtrl::create_A: succeeded\n"));
00666
00667 CORBA::Any streamctrl_any;
00668 streamctrl_any <<= this->streamctrl_.in ();
00669 this->sep_a_->define_property ("Related_StreamCtrl",
00670 streamctrl_any
00671 ACE_ENV_ARG_PARAMETER);
00672 ACE_TRY_CHECK;
00673
00674 CORBA::Any vdev_a_any;
00675 vdev_a_any <<= this->vdev_a_.in ();
00676 this->sep_a_->define_property ("Related_VDev",
00677 vdev_a_any
00678 ACE_ENV_ARG_PARAMETER);
00679 ACE_TRY_CHECK;
00680
00681 CORBA::Any streamendpoint_a_any;
00682 streamendpoint_a_any <<= this->sep_a_.in ();
00683 this->vdev_a_->define_property ("Related_StreamEndpoint",
00684 streamendpoint_a_any
00685 ACE_ENV_ARG_PARAMETER);
00686
00687 ACE_TRY_CHECK;
00688
00689 CORBA::Any mmdevice_a_any;
00690 mmdevice_a_any <<= a_party;
00691 this->vdev_a_->define_property ("Related_MMDevice",
00692 mmdevice_a_any
00693 ACE_ENV_ARG_PARAMETER);
00694 ACE_TRY_CHECK;
00695
00696
00697 MMDevice_Map_Entry map_entry;
00698 MMDevice_Map_Hash_Key key (a_party);
00699 map_entry.sep_ = AVStreams::StreamEndPoint_A::_duplicate (this->sep_a_.in ());
00700 map_entry.vdev_ = AVStreams::VDev::_duplicate (this->vdev_a_.in ());
00701 map_entry.flowspec_ = the_flows;
00702 map_entry.qos_ = the_qos;
00703 result =
00704 this->mmdevice_a_map_.bind (key, map_entry);
00705 if (result < 0)
00706 if (TAO_debug_level > 0)
00707 ACE_DEBUG ((LM_DEBUG, "Error binding mmdevice entry in the a_map"));
00708 }
00709 }
00710
00711 if (!CORBA::is_nil (b_party))
00712 {
00713 MMDevice_Map_Hash_Key find_key (b_party);
00714 MMDevice_Map_Entry find_entry;
00715 int result =
00716 this->mmdevice_b_map_.find (find_key, find_entry);
00717 if (result == 0)
00718 {
00719
00720 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "mmdevice b_party is already bound\n"));
00721 return 1;
00722 }
00723 else
00724 {
00725 this->sep_b_ =
00726 b_party-> create_B (this->streamctrl_.in (),
00727 this->vdev_b_.out (),
00728 the_qos,
00729 met_qos,
00730 named_vdev.inout (),
00731 the_flows
00732 ACE_ENV_ARG_PARAMETER);
00733 ACE_TRY_CHECK;
00734
00735 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_StreamCtrl::create_B: succeeded\n"));
00736
00737 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
00738 "\n(%P|%t)stream_endpoint_b_ = %s",
00739 TAO_ORB_Core_instance ()->orb ()->object_to_string (this->sep_b_.in ()
00740 ACE_ENV_ARG_PARAMETER)));
00741 ACE_TRY_CHECK;
00742
00743 CORBA::Any streamctrl_any;
00744 streamctrl_any <<= this->streamctrl_.in ();
00745 this->sep_b_->define_property ("Related_StreamCtrl",
00746 streamctrl_any
00747 ACE_ENV_ARG_PARAMETER);
00748 ACE_TRY_CHECK;
00749
00750 CORBA::Any vdev_b_any;
00751 vdev_b_any <<= this->vdev_b_.in ();
00752 this->sep_b_->define_property ("Related_VDev",
00753 vdev_b_any
00754 ACE_ENV_ARG_PARAMETER);
00755 ACE_TRY_CHECK;
00756
00757 CORBA::Any streamendpoint_b_any;
00758 streamendpoint_b_any <<= this->sep_b_.in ();
00759 this->vdev_b_->define_property ("Related_StreamEndpoint",
00760 streamendpoint_b_any
00761 ACE_ENV_ARG_PARAMETER);
00762
00763 ACE_TRY_CHECK;
00764
00765 CORBA::Any mmdevice_b_any;
00766 mmdevice_b_any <<= b_party;
00767 this->vdev_b_->define_property ("Related_MMDevice",
00768 mmdevice_b_any
00769 ACE_ENV_ARG_PARAMETER);
00770 ACE_TRY_CHECK;
00771
00772 MMDevice_Map_Entry map_entry;
00773 MMDevice_Map_Hash_Key key (b_party);
00774 map_entry.sep_ = AVStreams::StreamEndPoint::_duplicate (this->sep_b_.in ());
00775 map_entry.vdev_ = AVStreams::VDev::_duplicate(this->vdev_b_.in ());
00776 map_entry.flowspec_ = the_flows;
00777 map_entry.qos_ = the_qos;
00778 int result =
00779 this->mmdevice_b_map_.bind (key, map_entry);
00780 if (result < 0)
00781 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Error binding mmdevice entry in the b_map"));
00782 }
00783 }
00784
00785
00786 if ((!CORBA::is_nil (a_party)) && (!CORBA::is_nil (b_party)))
00787 {
00788 CORBA::Any sep_a_peer_any;
00789 CORBA::Any sep_b_peer_any;
00790
00791 sep_a_peer_any <<= this->sep_b_.in();
00792 sep_b_peer_any <<= this->sep_a_.in();
00793 this->sep_a_->define_property ("PeerAdapter",
00794 sep_a_peer_any
00795 ACE_ENV_ARG_PARAMETER);
00796 ACE_TRY_CHECK;
00797
00798 this->sep_b_->define_property ("PeerAdapter",
00799 sep_b_peer_any
00800 ACE_ENV_ARG_PARAMETER);
00801 ACE_TRY_CHECK;
00802 }
00803
00804
00805 if (CORBA::is_nil (b_party) && (!CORBA::is_nil (this->vdev_a_.in ())))
00806 {
00807
00808
00809
00810 ACE_TRY_EX (set_source_id)
00811 {
00812 CORBA::Any_ptr flows_any = this->sep_a_->get_property_value ("Flows"
00813 ACE_ENV_ARG_PARAMETER);
00814 ACE_TRY_CHECK_EX (set_source_id);
00815 AVStreams::flowSpec_var flows;
00816 *flows_any >>= flows.out ();
00817 for (CORBA::ULong i=0; i< flows->length ();++i)
00818 {
00819 CORBA::Object_var fep_obj =
00820 this->sep_a_->get_fep (flows [i] ACE_ENV_ARG_PARAMETER);
00821 ACE_TRY_CHECK_EX (set_source_id);
00822 ACE_TRY_EX (producer_check)
00823 {
00824 AVStreams::FlowProducer_var producer =
00825 AVStreams::FlowProducer::_narrow (fep_obj.in ()
00826 ACE_ENV_ARG_PARAMETER);
00827 ACE_TRY_CHECK_EX (producer_check);
00828 producer->set_source_id (this->source_id_++);
00829 }
00830 ACE_CATCHANY
00831 {
00832 if (TAO_debug_level > 0)
00833 ACE_DEBUG ((LM_DEBUG, " %s ", static_cast<char const*>(flows[i])));
00834
00835 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "producer_check: not a producer");
00836
00837 }
00838 ACE_ENDTRY;
00839 ACE_CHECK_RETURN (0);
00840 }
00841 }
00842 ACE_CATCHANY
00843 {
00844
00845
00846
00847
00848
00849
00850 this->sep_a_->set_source_id (this->source_id_++
00851 ACE_ENV_ARG_PARAMETER);
00852 ACE_TRY_CHECK;
00853 }
00854 ACE_ENDTRY;
00855 ACE_CHECK_RETURN (0);
00856 if (!this->mcastconfigif_)
00857 {
00858 ACE_NEW_RETURN (this->mcastconfigif_,
00859 TAO_MCastConfigIf,
00860 0);
00861
00862 this->mcastconfigif_ptr_ = this->mcastconfigif_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
00863 ACE_TRY_CHECK;
00864 }
00865
00866 CORBA::Boolean result = this->vdev_a_->set_Mcast_peer (this->streamctrl_.in (),
00867 this->mcastconfigif_ptr_.in (),
00868 the_qos,
00869 the_flows
00870 ACE_ENV_ARG_PARAMETER);
00871 if (!result)
00872 ACE_ERROR_RETURN ((LM_ERROR, "set_Mcast_peer failed\n"), 0);
00873 }
00874
00875 if (CORBA::is_nil (a_party))
00876 {
00877 if (!CORBA::is_nil (this->vdev_b_.in ()))
00878 {
00879
00880 if (!this->mcastconfigif_)
00881 ACE_ERROR_RETURN ((LM_ERROR, "first add a source and then a sink\n"), 0);
00882 this->mcastconfigif_->set_peer (this->vdev_b_.in (),
00883 the_qos,
00884 the_flows
00885 ACE_ENV_ARG_PARAMETER);
00886 ACE_TRY_CHECK;
00887 }
00888
00889 int connect_leaf_success = 0;
00890 ACE_TRY_EX (connect_leaf)
00891 {
00892
00893
00894 connect_leaf_success = this->sep_a_->connect_leaf (this->sep_b_.in (),
00895 the_qos,
00896 the_flows
00897 ACE_ENV_ARG_PARAMETER);
00898 ACE_TRY_CHECK_EX (connect_leaf);
00899 connect_leaf_success = 1;
00900 }
00901 ACE_CATCH (AVStreams::notSupported, ex)
00902 {
00903 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "connect_leaf failed\n"));
00904 connect_leaf_success = 0;
00905 }
00906 ACE_CATCHANY
00907 {
00908 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamCtrl::bind_devs");
00909 }
00910 ACE_ENDTRY;
00911 ACE_CHECK_RETURN (0);
00912 if (!connect_leaf_success)
00913 {
00914 if (TAO_debug_level > 0)
00915 ACE_DEBUG ((LM_DEBUG,"TAO_StreamCtrl::bind_devs Multiconnect\n"));
00916 AVStreams::flowSpec connect_flows = the_flows;
00917 this->sep_a_->multiconnect (the_qos, connect_flows ACE_ENV_ARG_PARAMETER);
00918 ACE_TRY_CHECK;
00919 this->sep_b_->multiconnect (the_qos, connect_flows ACE_ENV_ARG_PARAMETER);
00920 ACE_TRY_CHECK;
00921 }
00922 }
00923
00924 if (!CORBA::is_nil (a_party) && !CORBA::is_nil (b_party))
00925 {
00926
00927
00928
00929
00930 if( a_party->is_property_defined("Flows") &&
00931 b_party->is_property_defined("Flows") )
00932 {
00933 if (TAO_debug_level > 0) {
00934 ACE_DEBUG ((LM_DEBUG, "(%N,%l) Full profile, invoking bind()\n"));
00935 }
00936
00937
00938
00939
00940 this->bind (this->sep_a_.in (),
00941 this->sep_b_.in (),
00942 the_qos,
00943 the_flows
00944 ACE_ENV_ARG_PARAMETER);
00945 ACE_TRY_CHECK;
00946
00947
00948
00949 }
00950
00951 else if (!CORBA::is_nil (this->vdev_a_.in ()) && !CORBA::is_nil (this->vdev_b_.in ()))
00952 {
00953 if (TAO_debug_level > 0) {
00954 ACE_DEBUG ((LM_DEBUG, "(%N,%l) Light profile, invoking connect()\n"));
00955 }
00956
00957
00958 this->vdev_a_->set_peer (this->streamctrl_.in (),
00959 this->vdev_b_.in (),
00960 the_qos,
00961 the_flows
00962 ACE_ENV_ARG_PARAMETER);
00963
00964 ACE_TRY_CHECK;
00965 this->vdev_b_->set_peer (this->streamctrl_.in (),
00966 this->vdev_a_.in (),
00967 the_qos,
00968 the_flows
00969 ACE_ENV_ARG_PARAMETER);
00970
00971 ACE_TRY_CHECK;
00972
00973
00974
00975 CORBA::Boolean result =
00976 this->sep_a_->connect (this->sep_b_.in (),
00977 the_qos,
00978 the_flows
00979 ACE_ENV_ARG_PARAMETER);
00980 ACE_TRY_CHECK;
00981 if (result == 0)
00982 ACE_ERROR_RETURN ((LM_ERROR, "sep_a->connect (sep_b) failed\n"), 0);
00983 }
00984 }
00985 }
00986 ACE_CATCHANY
00987 {
00988 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamCtrl::bind_devs");
00989 return 0;
00990 }
00991 ACE_ENDTRY;
00992 ACE_CHECK_RETURN (0);
00993 return 1;
00994 }
00995
00996
00997
00998 CORBA::Boolean
00999 TAO_StreamCtrl::bind (AVStreams::StreamEndPoint_A_ptr sep_a,
01000 AVStreams::StreamEndPoint_B_ptr sep_b,
01001 AVStreams::streamQoS &stream_qos,
01002 const AVStreams::flowSpec &flow_spec
01003 ACE_ENV_ARG_DECL)
01004 ACE_THROW_SPEC ((CORBA::SystemException,
01005 AVStreams::streamOpFailed,
01006 AVStreams::noSuchFlow,
01007 AVStreams::QoSRequestFailed))
01008 {
01009 this->sep_a_ = AVStreams::StreamEndPoint_A::_duplicate(sep_a);
01010 this->sep_b_ = AVStreams::StreamEndPoint_B::_duplicate(sep_b);
01011
01012 int result = 0;
01013 ACE_TRY
01014 {
01015 if (CORBA::is_nil (sep_a_.in() ) ||
01016 CORBA::is_nil (sep_b_.in() ))
01017 ACE_ERROR_RETURN ((LM_ERROR,
01018 "(%P|%t) TAO_StreamCtrl::bind:"
01019 "a_party or b_party null!"),
01020 0);
01021
01022
01023 CORBA::Any sep_any;
01024 sep_any <<= sep_b;
01025 sep_a_->define_property ("PeerAdapter",
01026 sep_any
01027 ACE_ENV_ARG_PARAMETER);
01028 ACE_TRY_CHECK;
01029 sep_any <<= sep_a;
01030 sep_b_->define_property ("PeerAdapter",
01031 sep_any
01032 ACE_ENV_ARG_PARAMETER);
01033 ACE_TRY_CHECK;
01034
01035
01036
01037 AVStreams::flowSpec a_flows, b_flows;
01038 CORBA::Any_var flows_any;
01039 flows_any = sep_a_->get_property_value ("Flows" ACE_ENV_ARG_PARAMETER);
01040 ACE_TRY_CHECK;
01041 AVStreams::flowSpec *temp_flows;
01042 flows_any.in () >>= temp_flows;
01043 a_flows = *temp_flows;
01044 flows_any = sep_b_->get_property_value ("Flows" ACE_ENV_ARG_PARAMETER);
01045 ACE_TRY_CHECK;
01046 flows_any.in () >>= temp_flows;
01047 b_flows = *temp_flows;
01048 u_int i;
01049 FlowEndPoint_Map *a_fep_map;
01050 FlowEndPoint_Map *b_fep_map;
01051 ACE_NEW_RETURN (a_fep_map,
01052 FlowEndPoint_Map,
01053 0);
01054 ACE_NEW_RETURN (b_fep_map,
01055 FlowEndPoint_Map,
01056 0);
01057 for (i=0;i<a_flows.length ();i++)
01058 {
01059 const char *flowname = a_flows[i];
01060
01061 CORBA::Object_var fep_obj =
01062 sep_a_->get_fep (flowname
01063 ACE_ENV_ARG_PARAMETER);
01064 ACE_TRY_CHECK;
01065 AVStreams::FlowEndPoint_var fep =
01066 AVStreams::FlowEndPoint::_narrow (fep_obj.in ()
01067 ACE_ENV_ARG_PARAMETER);
01068 ACE_TRY_CHECK;
01069 ACE_CString fep_key (flowname);
01070 result = a_fep_map->bind (fep_key, fep);
01071 if (result == -1)
01072 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamCtrl::bind failed for %s\n", flowname));
01073 }
01074
01075 for (i=0;i<b_flows.length ();i++)
01076 {
01077 const char *flowname = b_flows[i];
01078
01079 CORBA::Object_var fep_obj =
01080 sep_b->get_fep (flowname
01081 ACE_ENV_ARG_PARAMETER);
01082 ACE_TRY_CHECK;
01083 AVStreams::FlowEndPoint_var fep =
01084 AVStreams::FlowEndPoint::_narrow (fep_obj.in ()
01085 ACE_ENV_ARG_PARAMETER);
01086 ACE_TRY_CHECK;
01087 ACE_CString fep_key (flowname);
01088 result = b_fep_map->bind (fep_key, fep);
01089 if (result == -1)
01090 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamCtrl::bind failed for %s\n", flowname));
01091 }
01092 FlowEndPoint_Map *map_a = 0, *map_b = 0;
01093 if (flow_spec.length () == 0)
01094 {
01095 map_a = a_fep_map;
01096 map_b = b_fep_map;
01097 }
01098 else
01099 {
01100 FlowEndPoint_Map *spec_fep_map_a, *spec_fep_map_b;
01101 ACE_NEW_RETURN (spec_fep_map_a,
01102 FlowEndPoint_Map,
01103 0);
01104 ACE_NEW_RETURN (spec_fep_map_b,
01105 FlowEndPoint_Map,
01106 0);
01107 for (i=0; i< flow_spec.length ();i++)
01108 {
01109 TAO_Forward_FlowSpec_Entry *entry = 0;
01110 ACE_NEW_RETURN (entry,
01111 TAO_Forward_FlowSpec_Entry,
01112 0);
01113 entry->parse (flow_spec[i]);
01114 ACE_CString fep_key (entry->flowname ());
01115 AVStreams::FlowEndPoint_var fep;
01116 result = a_fep_map->find (fep_key, fep);
01117 if (result == -1)
01118 ACE_ERROR_RETURN ((LM_ERROR, "Fep not found on A side for flowname: %s\n", flow_spec[i]), 0);
01119
01120 result = spec_fep_map_a->bind (fep_key, fep);
01121 if (result == -1)
01122 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Bind failed for %s\n", flow_spec[i]));
01123
01124 result = b_fep_map->find (fep_key, fep);
01125 if (result == -1)
01126 ACE_ERROR_RETURN ((LM_ERROR, "Fep not found on B side for flowname: %s\n", flow_spec[i]), 0);
01127
01128 result = spec_fep_map_b->bind (fep_key, fep);
01129 if (result == -1)
01130 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Bind failed for %s\n", flow_spec[i]));
01131 }
01132 map_a = spec_fep_map_a;
01133 map_b = spec_fep_map_b;
01134 }
01135
01136 TAO_AV_QoS qos (stream_qos);
01137
01138
01139 FlowEndPoint_Map_Iterator a_feps_iterator (*map_a);
01140 FlowEndPoint_Map_Entry *a_feps_entry, *b_feps_entry;
01141 ACE_TRY_EX (flow_connect)
01142 {
01143
01144 for (;a_feps_iterator.next (a_feps_entry) != 0;
01145 a_feps_iterator.advance ())
01146 {
01147 AVStreams::FlowEndPoint_var fep_a = a_feps_entry->int_id_;
01148 AVStreams::FlowEndPoint_var connected_to =
01149 fep_a->get_connected_fep (ACE_ENV_SINGLE_ARG_PARAMETER);
01150 ACE_TRY_CHECK_EX (flow_connect);
01151
01152 if (!CORBA::is_nil (connected_to.in ()))
01153 {
01154
01155 continue;
01156 }
01157
01158 FlowEndPoint_Map_Iterator b_feps_iterator (*map_b);
01159 for (;b_feps_iterator.next (b_feps_entry) != 0;
01160 b_feps_iterator.advance ())
01161 {
01162 AVStreams::FlowEndPoint_var fep_b = b_feps_entry->int_id_;
01163 AVStreams::FlowConnection_var flow_connection;
01164
01165 AVStreams::FlowEndPoint_var connected_to =
01166 fep_b->get_connected_fep (ACE_ENV_SINGLE_ARG_PARAMETER);
01167 ACE_TRY_CHECK_EX (flow_connect);
01168
01169 if (!CORBA::is_nil (connected_to.in ()))
01170 {
01171
01172 continue;
01173 }
01174
01175 if (fep_a->is_fep_compatible (fep_b.in()
01176 ACE_ENV_ARG_PARAMETER) == 1)
01177 {
01178 ACE_TRY_CHECK_EX (flow_connect);
01179
01180
01181 CORBA::Object_var flow_connection_obj;
01182 CORBA::Any_var flowname_any =
01183 fep_a->get_property_value ("FlowName"
01184 ACE_ENV_ARG_PARAMETER);
01185 ACE_TRY_CHECK_EX (flow_connect);
01186 const char *flowname = 0;
01187 flowname_any.in () >>= flowname;
01188 ACE_TRY_EX (flow_connection)
01189 {
01190 flow_connection_obj =
01191 this->get_flow_connection (flowname
01192 ACE_ENV_ARG_PARAMETER);
01193 ACE_TRY_CHECK_EX (flow_connection);
01194 flow_connection =
01195 AVStreams::FlowConnection::_narrow (flow_connection_obj.in ()
01196 ACE_ENV_ARG_PARAMETER);
01197 ACE_TRY_CHECK_EX (flow_connection);
01198 }
01199 ACE_CATCHANY
01200 {
01201 TAO_FlowConnection *flowConnection;
01202 ACE_NEW_RETURN (flowConnection,
01203 TAO_FlowConnection,
01204 0);
01205 flow_connection = flowConnection->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
01206 ACE_TRY_CHECK_EX (flow_connect);
01207 this->set_flow_connection (flowname,
01208 flow_connection.in ()
01209 ACE_ENV_ARG_PARAMETER);
01210 ACE_TRY_CHECK_EX (flow_connect);
01211 }
01212 ACE_ENDTRY;
01213 ACE_CHECK_RETURN (0);
01214
01215
01216
01217
01218
01219
01220
01221 AVStreams::FlowProducer_var producer;
01222 AVStreams::FlowConsumer_var consumer;
01223
01224 ACE_TRY_EX (producer_check)
01225 {
01226 producer =
01227 AVStreams::FlowProducer::_narrow (fep_a.in()
01228 ACE_ENV_ARG_PARAMETER);
01229 ACE_TRY_CHECK_EX (producer_check);
01230 consumer =
01231 AVStreams::FlowConsumer::_narrow (fep_b.in()
01232 ACE_ENV_ARG_PARAMETER);
01233 ACE_TRY_CHECK_EX (producer_check);
01234
01235
01236
01237 if (CORBA::is_nil (producer.in ()))
01238 {
01239 producer =
01240 AVStreams::FlowProducer::_narrow (fep_b.in()
01241 ACE_ENV_ARG_PARAMETER);
01242 ACE_TRY_CHECK_EX (producer_check);
01243 consumer =
01244 AVStreams::FlowConsumer::_narrow (fep_a.in()
01245 ACE_ENV_ARG_PARAMETER);
01246 ACE_TRY_CHECK_EX (producer_check);
01247 }
01248
01249
01250
01251
01252 ACE_ASSERT (!CORBA::is_nil (producer.in ()));
01253 ACE_ASSERT (!CORBA::is_nil (consumer.in ()));
01254 }
01255 ACE_CATCHANY
01256 {
01257
01258 ACE_RE_THROW;
01259 }
01260 ACE_ENDTRY;
01261 ACE_CHECK_RETURN (0);
01262 CORBA::String_var fep_a_name, fep_b_name;
01263 flowname_any = fep_a->get_property_value ("FlowName"
01264 ACE_ENV_ARG_PARAMETER);
01265 const char *temp_name;
01266 flowname_any.in () >>= temp_name;
01267 fep_a_name = CORBA::string_dup (temp_name);
01268 flowname_any = fep_b->get_property_value ("FlowName"
01269 ACE_ENV_ARG_PARAMETER);
01270 flowname_any.in () >>= temp_name;
01271 fep_b_name = CORBA::string_dup (temp_name);
01272 AVStreams::QoS flow_qos;
01273 flow_qos.QoSType = fep_a_name;
01274 flow_qos.QoSParams.length (0);
01275 result = qos.get_flow_qos (fep_a_name.in (), flow_qos);
01276 if (result == -1)
01277 {
01278 flow_qos.QoSType = fep_b_name;
01279 result = qos.get_flow_qos (fep_b_name.in (),
01280 flow_qos);
01281 if (result == -1 && TAO_debug_level > 0)
01282 ACE_DEBUG ((LM_DEBUG,
01283 "No QoS Specified for this flow <%s>\n", flowname));
01284 }
01285 flow_connection->connect (producer.in (),
01286 consumer.in (),
01287 flow_qos
01288 ACE_ENV_ARG_PARAMETER);
01289 ACE_TRY_CHECK_EX (flow_connect);
01290 }
01291 }
01292 }
01293 }
01294 ACE_CATCHANY
01295 {
01296 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
01297 "TAO_StreamCtrl::bind:flow_connect block");
01298 return 0;
01299 }
01300 ACE_ENDTRY;
01301 ACE_CHECK_RETURN (0);
01302 }
01303 ACE_CATCHANY
01304 {
01305
01306
01307 this->sep_a_->connect (this->sep_b_.in (),
01308 stream_qos,
01309 flow_spec
01310 ACE_ENV_ARG_PARAMETER);
01311 ACE_TRY_CHECK;
01312 }
01313 ACE_ENDTRY;
01314 ACE_CHECK_RETURN (0);
01315 return 1;
01316 }
01317
01318 void
01319 TAO_StreamCtrl::unbind (ACE_ENV_SINGLE_ARG_DECL)
01320 ACE_THROW_SPEC ((CORBA::SystemException,
01321 AVStreams::streamOpFailed))
01322 {
01323 ACE_TRY
01324 {
01325 if (this->flow_connection_map_.current_size () > 0)
01326 return;
01327
01328 AVStreams::flowSpec flow_spec;
01329 flow_spec.length(0);
01330
01331 MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
01332 MMDevice_Map::ENTRY *entry = 0;
01333 for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
01334 {
01335 entry->int_id_.sep_->destroy (flow_spec ACE_ENV_ARG_PARAMETER);
01336 ACE_TRY_CHECK;
01337 }
01338 MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
01339 for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
01340 {
01341 entry->int_id_.sep_->destroy (flow_spec ACE_ENV_ARG_PARAMETER);
01342 ACE_TRY_CHECK;
01343 }
01344 }
01345 ACE_CATCHANY
01346 {
01347 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamCtrl::unbind");
01348 return;
01349 }
01350 ACE_ENDTRY;
01351 ACE_CHECK;
01352 }
01353
01354 void
01355 TAO_StreamCtrl::unbind_party (AVStreams::StreamEndPoint_ptr ,
01356 const AVStreams::flowSpec &
01357 ACE_ENV_ARG_DECL_NOT_USED)
01358 ACE_THROW_SPEC ((CORBA::SystemException,
01359 AVStreams::streamOpFailed,
01360 AVStreams::noSuchFlow))
01361 {
01362 }
01363
01364 void
01365 TAO_StreamCtrl::unbind_dev (AVStreams::MMDevice_ptr ,
01366 const AVStreams::flowSpec &
01367 ACE_ENV_ARG_DECL_NOT_USED)
01368 ACE_THROW_SPEC ((CORBA::SystemException,
01369 AVStreams::streamOpFailed,
01370 AVStreams::noSuchFlow))
01371 {
01372 }
01373
01374 AVStreams::VDev_ptr
01375 TAO_StreamCtrl::get_related_vdev (AVStreams::MMDevice_ptr adev,
01376 AVStreams::StreamEndPoint_out sep
01377 ACE_ENV_ARG_DECL_NOT_USED)
01378 ACE_THROW_SPEC ((CORBA::SystemException,
01379 AVStreams::streamOpFailed))
01380 {
01381 MMDevice_Map_Hash_Key key (adev);
01382 MMDevice_Map_Entry entry;
01383 int result = -1;
01384 result = this->mmdevice_a_map_.find (key, entry);
01385 if (result < 0)
01386 {
01387 result = this->mmdevice_a_map_.find (key, entry);
01388 if (result < 0)
01389 return AVStreams::VDev::_nil ();
01390 }
01391 sep = AVStreams::StreamEndPoint::_duplicate (entry.sep_.in ());
01392 return AVStreams::VDev::_duplicate (entry.vdev_.in ());
01393 }
01394
01395 CORBA::Boolean
01396
01397 TAO_StreamCtrl::modify_QoS (AVStreams::streamQoS &new_qos,
01398 const AVStreams::flowSpec &the_spec
01399 ACE_ENV_ARG_DECL)
01400 ACE_THROW_SPEC ((CORBA::SystemException,
01401 AVStreams::noSuchFlow,
01402 AVStreams::QoSRequestFailed))
01403 {
01404 if (TAO_debug_level > 0)
01405 ACE_DEBUG ((LM_DEBUG,
01406 "TAO_StreamCtrl::modify_QoS\n"));
01407
01408
01409 if (this->mcastconfigif_ != 0)
01410 {
01411
01412 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Cannot Modify the Qos for multipoint streams\n"));
01413 }
01414 else
01415 {
01416 ACE_TRY
01417 {
01418 AVStreams::flowSpec in_flowspec;
01419 AVStreams::flowSpec out_flowspec;
01420
01421 in_flowspec.length (0);
01422 out_flowspec.length (0);
01423
01424 int in_index = 0;
01425 int out_index = 0;
01426
01427 AVStreams::flowSpec flowspec;
01428 if (the_spec.length () == 0)
01429 {
01430
01431 flowspec = this->flows_;
01432 MMDevice_Map_Iterator iterator (this->mmdevice_a_map_);
01433 MMDevice_Map::ENTRY *entry = 0;
01434 for (;iterator.next (entry) != 0;iterator.advance ())
01435 {
01436 flowspec = entry->int_id_.flowspec_;
01437 }
01438 }
01439 else
01440 {
01441 flowspec = the_spec;
01442 }
01443
01444 if (TAO_debug_level > 0)
01445 ACE_DEBUG ((LM_DEBUG,
01446 "TAO_StreamCtrl::modify_QoS\n"));
01447
01448
01449 for (u_int i=0;i < flowspec.length ();i++)
01450 {
01451 TAO_Forward_FlowSpec_Entry entry;
01452 entry.parse (flowspec [i]);
01453 int direction = entry.direction ();
01454 if (direction == 0)
01455 {
01456 in_flowspec.length (in_index + 1);
01457 in_flowspec [in_index++] = CORBA::string_dup (entry.entry_to_string ());
01458 }
01459 else
01460 {
01461 out_flowspec.length (out_index + 1);
01462 out_flowspec [out_index++] = CORBA::string_dup (entry.entry_to_string ());
01463 }
01464 }
01465
01466 if (in_flowspec.length () != 0)
01467 {
01468 this->vdev_a_->modify_QoS (new_qos, in_flowspec ACE_ENV_ARG_PARAMETER);
01469 ACE_TRY_CHECK;
01470 }
01471
01472 if (out_flowspec.length () != 0)
01473 {
01474 this->vdev_b_->modify_QoS (new_qos, out_flowspec ACE_ENV_ARG_PARAMETER);
01475 ACE_TRY_CHECK;
01476 }
01477 }
01478 ACE_CATCHANY
01479 {
01480 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamCtrl::modify_QoS");
01481 return 0;
01482 }
01483 ACE_ENDTRY;
01484 ACE_CHECK_RETURN (0);
01485
01486 }
01487 return 1;
01488 }
01489
01490
01491
01492
01493
01494 TAO_MCastConfigIf::TAO_MCastConfigIf (void)
01495 :peer_list_iterator_ (peer_list_)
01496 {
01497 }
01498
01499 TAO_MCastConfigIf::~TAO_MCastConfigIf (void)
01500 {
01501
01502 }
01503
01504
01505 CORBA::Boolean
01506 TAO_MCastConfigIf::set_peer (CORBA::Object_ptr peer,
01507 AVStreams::streamQoS & qos,
01508 const AVStreams::flowSpec & flow_spec
01509 ACE_ENV_ARG_DECL)
01510 ACE_THROW_SPEC ((CORBA::SystemException,
01511 AVStreams::QoSRequestFailed,
01512 AVStreams::streamOpFailed))
01513 {
01514 ACE_TRY
01515 {
01516 Peer_Info *info;
01517 ACE_NEW_RETURN (info,
01518 Peer_Info,
01519 0);
01520 info->peer_ = AVStreams::VDev::_narrow (peer ACE_ENV_ARG_PARAMETER);
01521 ACE_TRY_CHECK;
01522 info->qos_ = qos;
01523 info->flow_spec_ = flow_spec;
01524 this->peer_list_.insert_tail (info);
01525 }
01526 ACE_CATCHANY
01527 {
01528 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MCastConfigIf::set_peer");
01529 return 0;
01530 }
01531 ACE_ENDTRY;
01532 ACE_CHECK_RETURN (0);
01533 return 1;
01534 }
01535
01536
01537 void
01538 TAO_MCastConfigIf::configure (const CosPropertyService::Property & a_configuration
01539 ACE_ENV_ARG_DECL)
01540 ACE_THROW_SPEC ((CORBA::SystemException))
01541 {
01542 Peer_Info *info;
01543 ACE_TRY
01544 {
01545 for (this->peer_list_iterator_.first ();
01546 (info = this->peer_list_iterator_.next ()) != 0;
01547 this->peer_list_iterator_.advance ())
01548 {
01549 info->peer_->configure (a_configuration ACE_ENV_ARG_PARAMETER);
01550 ACE_TRY_CHECK;
01551 }
01552 }
01553 ACE_CATCHANY
01554 {
01555 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MCastConfigIf::set_configure");
01556 return;
01557 }
01558 ACE_ENDTRY;
01559 ACE_CHECK;
01560 }
01561
01562
01563 void
01564 TAO_MCastConfigIf::set_initial_configuration (const CosPropertyService::Properties &initial
01565 ACE_ENV_ARG_DECL_NOT_USED)
01566 ACE_THROW_SPEC ((CORBA::SystemException))
01567 {
01568 this->initial_configuration_ = initial;
01569 }
01570
01571
01572 void
01573 TAO_MCastConfigIf::set_format (const char * flowName,
01574 const char * format_name
01575 ACE_ENV_ARG_DECL)
01576 ACE_THROW_SPEC ((CORBA::SystemException,
01577 AVStreams::notSupported))
01578 {
01579 Peer_Info *info;
01580 ACE_TRY
01581 {
01582 for (this->peer_list_iterator_.first ();
01583 (info = this->peer_list_iterator_.next ()) != 0;
01584 this->peer_list_iterator_.advance ())
01585 {
01586 if (this->in_flowSpec (info->flow_spec_, flowName))
01587 {
01588 info->peer_->set_format (flowName, format_name ACE_ENV_ARG_PARAMETER);
01589 ACE_TRY_CHECK;
01590 }
01591 }
01592 }
01593 ACE_CATCHANY
01594 {
01595 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MCastConfigIf::set_format");
01596 return;
01597 }
01598 ACE_ENDTRY;
01599 ACE_CHECK;
01600 }
01601
01602
01603 void
01604 TAO_MCastConfigIf::set_dev_params (const char * flowName,
01605 const CosPropertyService::Properties & new_params
01606 ACE_ENV_ARG_DECL)
01607 ACE_THROW_SPEC ((CORBA::SystemException,
01608 AVStreams::PropertyException,
01609 AVStreams::streamOpFailed))
01610 {
01611 Peer_Info *info;
01612 ACE_TRY
01613 {
01614
01615 for (this->peer_list_iterator_.first ();
01616 (info = this->peer_list_iterator_.next ()) != 0;
01617 this->peer_list_iterator_.advance ())
01618 {
01619 if (this->in_flowSpec (info->flow_spec_, flowName))
01620 {
01621 info->peer_->set_dev_params (flowName, new_params ACE_ENV_ARG_PARAMETER);
01622 ACE_TRY_CHECK;
01623 }
01624 }
01625 }
01626 ACE_CATCHANY
01627 {
01628 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MCastConfigIf::set_dev_params");
01629 return;
01630 }
01631 ACE_ENDTRY;
01632 ACE_CHECK;
01633 }
01634
01635 int
01636 TAO_MCastConfigIf::in_flowSpec (const AVStreams::flowSpec& flow_spec, const char *flow_name)
01637 {
01638 size_t len = ACE_OS::strlen (flow_name);
01639 for (CORBA::ULong i = 0; i < flow_spec.length (); i++)
01640 if (ACE_OS::strncmp (flow_spec[i], flow_name, len) == 0)
01641 {
01642 return 1;
01643 }
01644 return 0;
01645 }
01646
01647
01648
01649
01650
01651 TAO_Base_StreamEndPoint::TAO_Base_StreamEndPoint (void)
01652 : protocol_object_set_ (0)
01653 {
01654 }
01655
01656 TAO_Base_StreamEndPoint::~TAO_Base_StreamEndPoint (void)
01657 {
01658 }
01659
01660 int
01661 TAO_Base_StreamEndPoint::handle_close (void)
01662 {
01663
01664
01665 return -1;
01666 }
01667
01668 int
01669 TAO_Base_StreamEndPoint::handle_open (void)
01670 {
01671 return 0;
01672 }
01673
01674 int
01675 TAO_Base_StreamEndPoint::handle_stop (const AVStreams::flowSpec &
01676 ACE_ENV_ARG_DECL_NOT_USED)
01677 {
01678 return 0;
01679 }
01680
01681 int
01682 TAO_Base_StreamEndPoint::handle_start (const AVStreams::flowSpec &
01683 ACE_ENV_ARG_DECL_NOT_USED)
01684 {
01685 return 0;
01686 }
01687
01688 int
01689 TAO_Base_StreamEndPoint::handle_destroy (const AVStreams::flowSpec &
01690 ACE_ENV_ARG_DECL_NOT_USED)
01691 {
01692 return 0;
01693 }
01694
01695
01696 CORBA::Boolean
01697 TAO_Base_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &)
01698 {
01699 return 1;
01700 }
01701
01702
01703 CORBA::Boolean
01704 TAO_Base_StreamEndPoint::handle_postconnect (AVStreams::flowSpec &)
01705 {
01706
01707 while (!this->is_protocol_object_set ())
01708 TAO_AV_CORE::instance ()->orb ()->perform_work ();
01709 return 1;
01710 }
01711
01712
01713 CORBA::Boolean
01714 TAO_Base_StreamEndPoint::handle_connection_requested (AVStreams::flowSpec &
01715 ACE_ENV_ARG_DECL_NOT_USED)
01716 {
01717 return 1;
01718 }
01719
01720 int
01721 TAO_Base_StreamEndPoint::set_protocol_object (const char * ,
01722 TAO_AV_Protocol_Object * )
01723 {
01724 return -1;
01725 }
01726
01727 void
01728 TAO_Base_StreamEndPoint::protocol_object_set (void)
01729 {
01730 this->protocol_object_set_ = 1;
01731 }
01732
01733
01734 int
01735 TAO_Base_StreamEndPoint::is_protocol_object_set (void)
01736 {
01737 return this->protocol_object_set_;
01738 }
01739
01740 int
01741 TAO_Base_StreamEndPoint::get_callback (const char * ,
01742 TAO_AV_Callback *&)
01743 {
01744 return -1;
01745 }
01746
01747 int
01748 TAO_Base_StreamEndPoint::get_control_callback (const char * ,
01749 TAO_AV_Callback *&)
01750 {
01751 return -1;
01752 }
01753
01754 void
01755 TAO_Base_StreamEndPoint::set_flow_handler (const char *flowname,
01756 TAO_AV_Flow_Handler *handler)
01757 {
01758 if(TAO_debug_level > 1)
01759 {
01760 ACE_DEBUG ((LM_DEBUG, "(%N,%l) TAO_Base_StreamEndPoint::set_flow_handler(), flowname: %s\n", flowname));
01761 }
01762 ACE_CString flow_name_key (flowname);
01763 if (this->flow_handler_map_.bind (flow_name_key, handler) != 0)
01764 ACE_ERROR ((LM_ERROR,
01765 "Error in storing flow handler\n"));
01766 }
01767
01768 void
01769 TAO_Base_StreamEndPoint::set_control_flow_handler (const char *flowname,
01770 TAO_AV_Flow_Handler *handler)
01771 {
01772 ACE_CString flow_name_key (flowname);
01773 if (this->control_flow_handler_map_.bind (flow_name_key, handler) != 0)
01774 ACE_ERROR ((LM_ERROR,
01775 "Error in storing control flow handler\n"));
01776 }
01777
01778
01779
01780
01781
01782
01783
01784 TAO_StreamEndPoint::TAO_StreamEndPoint (void)
01785 :flow_count_ (0),
01786 flow_num_ (0),
01787 mcast_port_ (ACE_DEFAULT_MULTICAST_PORT+1)
01788 {
01789
01790 this->mcast_addr_.set (ACE_DEFAULT_MULTICAST_ADDR);
01791 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::TAO_StreamEndPoint::mcast_addr = %s", this->mcast_addr_.c_str ()));
01792
01793 }
01794
01795
01796 CORBA::Boolean
01797 TAO_StreamEndPoint::connect (AVStreams::StreamEndPoint_ptr responder,
01798 AVStreams::streamQoS &qos,
01799 const AVStreams::flowSpec &the_spec
01800 ACE_ENV_ARG_DECL)
01801 ACE_THROW_SPEC ((CORBA::SystemException,
01802 AVStreams::noSuchFlow,
01803 AVStreams::QoSRequestFailed,
01804 AVStreams::streamOpFailed))
01805 {
01806 if (TAO_debug_level > 0)
01807 ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect ()\n"));
01808 CORBA::Boolean retv = 0;
01809 this->peer_sep_ = AVStreams::StreamEndPoint::_duplicate (responder);
01810 ACE_TRY_EX (negotiate)
01811 {
01812 if (!CORBA::is_nil (this->negotiator_.in ()))
01813 {
01814 ACE_DEBUG ((LM_DEBUG,
01815 "NEGOTIATOR AVIALABLE\n"));
01816
01817 CORBA::Any_var negotiator_any = responder->get_property_value ("Negotiator");
01818
01819 AVStreams::Negotiator_ptr peer_negotiator;
01820 negotiator_any.in () >>= peer_negotiator;
01821 if (!CORBA::is_nil (peer_negotiator))
01822 {
01823 CORBA::Boolean result =
01824 this->negotiator_->negotiate (peer_negotiator,
01825 qos
01826 ACE_ENV_ARG_PARAMETER);
01827 ACE_TRY_CHECK_EX (negotiate);
01828 if (!result)
01829 if (TAO_debug_level > 0)
01830 ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect (): negotiate failed\n"));
01831 }
01832 }
01833 }
01834 ACE_CATCHANY
01835 {
01836 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamEndPoint::negotiate");
01837 }
01838 ACE_ENDTRY;
01839 ACE_CHECK_RETURN (0);
01840
01841 ACE_TRY_EX (available_protocols)
01842 {
01843 if (this->protocols_.length () > 0)
01844 {
01845
01846 CORBA::Any_var protocols_any =
01847 responder->get_property_value ("AvailableProtocols" ACE_ENV_ARG_PARAMETER);
01848 ACE_TRY_CHECK_EX (available_protocols);
01849 AVStreams::protocolSpec peer_protocols;
01850 AVStreams::protocolSpec *temp_protocols;
01851 protocols_any.in () >>= temp_protocols;
01852 peer_protocols = *temp_protocols;
01853 for (u_int i=0;i<peer_protocols.length ();i++)
01854 {
01855 for (u_int j=0;j<this->protocols_.length ();j++)
01856 if (ACE_OS::strcmp (peer_protocols [i],
01857 this->protocols_[j]) == 0)
01858 {
01859
01860 this->protocol_ = CORBA::string_dup (peer_protocols [i]);
01861 break;
01862 }
01863 }
01864 }
01865 }
01866 ACE_CATCHANY
01867 {
01868 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Availableprotocols property not defined\n"));
01869 }
01870 ACE_ENDTRY;
01871 ACE_CHECK_RETURN (0);
01872 ACE_TRY
01873 {
01874 AVStreams::streamQoS network_qos;
01875 if (qos.length () > 0)
01876 {
01877 if (TAO_debug_level > 0)
01878 ACE_DEBUG ((LM_DEBUG,
01879 "QoS is Specified\n"));
01880
01881 int result = this->translate_qos (qos,
01882 network_qos);
01883 if (result != 0)
01884 if (TAO_debug_level > 0)
01885 ACE_DEBUG ((LM_DEBUG,
01886 "QoS translation failed\n"));
01887
01888 this->qos ().set (network_qos);
01889 }
01890
01891
01892 AVStreams::flowSpec flow_spec (the_spec);
01893 this->handle_preconnect (flow_spec);
01894
01895 if (TAO_debug_level > 0)
01896 ACE_DEBUG ((LM_DEBUG,
01897 "TAO_StreamEndPoint::connect: flow_spec_length = %d\n",
01898 flow_spec.length ()));
01899 u_int i;
01900 for (i=0;i<flow_spec.length ();i++)
01901 {
01902 TAO_Forward_FlowSpec_Entry *entry = 0;
01903 ACE_NEW_RETURN (entry,
01904 TAO_Forward_FlowSpec_Entry,
01905 0);
01906
01907 if (entry->parse (flow_spec[i]) == -1)
01908 return 0;
01909
01910 if (TAO_debug_level > 0)
01911 ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect: %s\n", entry->entry_to_string ()));
01912
01913 this->forward_flow_spec_set.insert (entry);
01914 }
01915
01916 int result =TAO_AV_CORE::instance ()->init_forward_flows (this,
01917 this->forward_flow_spec_set,
01918 TAO_AV_Core::TAO_AV_ENDPOINT_A,
01919 flow_spec);
01920
01921
01922 if (result < 0)
01923 ACE_ERROR_RETURN ((LM_ERROR, "%N:%l TAO_AV_Core::init_forward_flows failed\n"), 0);
01924
01925
01926 AVStreams::StreamEndPoint_var streamendpoint = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
01927 ACE_TRY_CHECK;
01928
01929 retv = responder->request_connection (streamendpoint.in (),
01930 0,
01931 network_qos,
01932 flow_spec
01933 ACE_ENV_ARG_PARAMETER);
01934 ACE_TRY_CHECK;
01935
01936 if (TAO_debug_level > 0)
01937 ACE_DEBUG ((LM_DEBUG, "%N:%l request_connection returned %d\n", retv));
01938
01939 if (retv == 0)
01940 return retv;
01941 for (i=0;i<flow_spec.length ();i++)
01942 {
01943 TAO_Reverse_FlowSpec_Entry *entry = 0;
01944 ACE_NEW_RETURN (entry,
01945 TAO_Reverse_FlowSpec_Entry,
01946 0);
01947 if (entry->parse (flow_spec[i]) == -1)
01948 ACE_ERROR_RETURN ((LM_ERROR,
01949 "Reverse_Flow_Spec_Set::parse failed\n"),
01950 0);
01951
01952 if (TAO_debug_level > 0)
01953 ACE_DEBUG ((LM_DEBUG,
01954 "TAO_StreamEndPoint::Connect: Reverse Flow Spec %s\n",
01955 entry->entry_to_string ()));
01956
01957 this->reverse_flow_spec_set.insert (entry);
01958 }
01959
01960 result = TAO_AV_CORE::instance ()->init_reverse_flows (this,
01961 this->forward_flow_spec_set,
01962 this->reverse_flow_spec_set,
01963 TAO_AV_Core::TAO_AV_ENDPOINT_A);
01964 if (result < 0)
01965 ACE_ERROR_RETURN ((LM_ERROR,
01966 "TAO_AV_Core::init_reverse_flows failed\n"),
01967 0);
01968
01969
01970 retv = this->handle_postconnect (flow_spec);
01971 }
01972 ACE_CATCHANY
01973 {
01974 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamEndPoint::connect");
01975 return 0;
01976 }
01977 ACE_ENDTRY;
01978 ACE_CHECK_RETURN (0);
01979 return retv;
01980 }
01981
01982 int
01983 TAO_StreamEndPoint::translate_qos (const AVStreams::streamQoS& application_qos,
01984 AVStreams::streamQoS& network_qos)
01985 {
01986 u_int len = application_qos.length ();
01987 network_qos.length (len);
01988 for (u_int i=0;i<len;i++)
01989 {
01990 network_qos [i].QoSType = application_qos [i].QoSType;
01991 network_qos [i].QoSParams = application_qos [i].QoSParams;
01992 }
01993 return 0;
01994 }
01995
01996
01997
01998
01999 void
02000 TAO_StreamEndPoint::stop (const AVStreams::flowSpec &flow_spec
02001 ACE_ENV_ARG_DECL)
02002 ACE_THROW_SPEC ((CORBA::SystemException,
02003 AVStreams::noSuchFlow))
02004 {
02005
02006 this->handle_stop (flow_spec ACE_ENV_ARG_PARAMETER);
02007 ACE_CHECK;
02008
02009 if (flow_spec.length () > 0)
02010 {
02011
02012 for (u_int i=0;i<flow_spec.length ();i++)
02013 {
02014 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
02015 for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
02016 begin != end; ++begin)
02017 {
02018 TAO_Forward_FlowSpec_Entry entry;
02019 entry.parse (flow_spec[i]);
02020 if (ACE_OS::strcmp ((*begin)->flowname (), entry.flowname ()) == 0)
02021 {
02022 TAO_FlowSpec_Entry *entry = *begin;
02023
02024 if (entry->handler() != 0)
02025 entry->handler ()->stop (entry->role ());
02026 if (entry->control_handler () != 0)
02027 entry->control_handler ()->stop (entry->role ());
02028 break;
02029 }
02030 }
02031 }
02032 }
02033 else
02034 {
02035 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
02036 for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
02037 begin != end; ++begin)
02038 {
02039 TAO_FlowSpec_Entry *entry = *begin;
02040
02041 if (entry->handler() != 0)
02042 entry->handler ()->stop (entry->role ());
02043 if (entry->control_handler () != 0)
02044 entry->control_handler ()->stop (entry->role ());
02045 }
02046 }
02047 }
02048
02049
02050
02051 void
02052 TAO_StreamEndPoint::start (const AVStreams::flowSpec &flow_spec
02053 ACE_ENV_ARG_DECL)
02054 ACE_THROW_SPEC ((CORBA::SystemException,
02055 AVStreams::noSuchFlow))
02056 {
02057 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::start\n"));
02058
02059 this->handle_start (flow_spec ACE_ENV_ARG_PARAMETER);
02060 ACE_CHECK;
02061
02062 if (flow_spec.length () > 0)
02063 {
02064
02065 for (u_int i=0;i<flow_spec.length ();i++)
02066 {
02067 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
02068 for (TAO_AV_FlowSpecSetItor forward_begin = this->forward_flow_spec_set.begin ();
02069 forward_begin != end; ++forward_begin)
02070 {
02071 TAO_FlowSpec_Entry *entry = *forward_begin;
02072 if (ACE_OS::strcmp (entry->flowname (), flow_spec [i]) == 0)
02073 {
02074
02075 if (entry->handler () != 0)
02076 {
02077 entry->handler ()->start (entry->role ());
02078 }
02079 if (entry->control_handler () != 0)
02080 {
02081 entry->control_handler ()->start (entry->role ());
02082 }
02083 }
02084 }
02085
02086 end = this->reverse_flow_spec_set.end ();
02087 for (TAO_AV_FlowSpecSetItor reverse_begin = this->reverse_flow_spec_set.begin ();
02088 reverse_begin != end; ++reverse_begin)
02089 {
02090 TAO_FlowSpec_Entry *entry = *reverse_begin;
02091 if (ACE_OS::strcmp (entry->flowname (), flow_spec [i]) == 0)
02092 {
02093
02094 if (entry->handler () != 0)
02095 {
02096 entry->handler ()->start (entry->role ());
02097 }
02098 if (entry->control_handler () != 0)
02099 {
02100 entry->control_handler ()->start (entry->role ());
02101 }
02102 }
02103 }
02104 }
02105 }
02106 else
02107 {
02108 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
02109 for (TAO_AV_FlowSpecSetItor forwardbegin = this->forward_flow_spec_set.begin ();
02110 forwardbegin != end; ++forwardbegin)
02111 {
02112 TAO_FlowSpec_Entry *entry = *forwardbegin;
02113 if (entry->handler () != 0)
02114 {
02115 entry->handler ()->start (entry->role ());
02116 }
02117 if (entry->control_handler () != 0)
02118 {
02119 entry->control_handler ()->start (entry->role ());
02120 }
02121 }
02122
02123 end = this->reverse_flow_spec_set.end ();
02124 for (TAO_AV_FlowSpecSetItor reversebegin = this->reverse_flow_spec_set.begin ();
02125 reversebegin != end; ++reversebegin)
02126 {
02127 TAO_FlowSpec_Entry *entry = *reversebegin;
02128
02129 if (entry->handler () != 0)
02130 {
02131 entry->handler ()->start (entry->role ());
02132 }
02133 if (entry->control_handler () != 0)
02134 {
02135 entry->control_handler ()->start (entry->role ());
02136 }
02137 }
02138 }
02139 }
02140
02141
02142 void
02143 TAO_StreamEndPoint::destroy (const AVStreams::flowSpec &flow_spec
02144 ACE_ENV_ARG_DECL)
02145 ACE_THROW_SPEC ((CORBA::SystemException,
02146 AVStreams::noSuchFlow))
02147 {
02148 CORBA::Any_var vdev_any = this->get_property_value ("Related_VDev"
02149 ACE_ENV_ARG_PARAMETER);
02150
02151 AVStreams::VDev_ptr vdev;
02152
02153 vdev_any.in() >>= vdev;
02154 CORBA::Any_var mc_any = vdev->get_property_value ("Related_MediaCtrl"
02155 ACE_ENV_ARG_PARAMETER);
02156
02157
02158
02159 CORBA::Object_var obj;
02160 mc_any.in() >>= CORBA::Any::to_object( obj.out() );
02161
02162 AVStreams::MediaControl_var media_ctrl =
02163 AVStreams::MediaControl::_narrow( obj.in() );
02164
02165
02166
02167 if ( !CORBA::is_nil( vdev ) )
02168 {
02169 PortableServer::ServantBase_var vdev_servant =
02170 TAO_AV_CORE::instance()->poa()->reference_to_servant ( vdev );
02171 TAO_AV_Core::deactivate_servant (vdev_servant.in());
02172 }
02173
02174 if ( !CORBA::is_nil ( media_ctrl.in () ) )
02175 {
02176 PortableServer::ServantBase_var mc_servant =
02177 TAO_AV_CORE::instance()->poa()->reference_to_servant (media_ctrl.in());
02178 TAO_AV_Core::deactivate_servant (mc_servant.in());
02179 }
02180
02181 int result = TAO_AV_Core::deactivate_servant (this);
02182 if (result < 0)
02183 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::destroy failed\n"));
02184
02185 if (flow_spec.length () > 0)
02186 {
02187 for (u_int i=0;i<flow_spec.length ();i++)
02188 {
02189 {
02190 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
02191 for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
02192 begin != end; ++begin)
02193 {
02194 TAO_FlowSpec_Entry *entry = *begin;
02195 TAO_Tokenizer flow_name (flow_spec [i], '\\');
02196 if (ACE_OS::strcmp (entry->flowname (), flow_name[0]) == 0)
02197 {
02198 if (entry->protocol_object ())
02199 {
02200 entry->protocol_object ()->destroy ();
02201 }
02202 break;
02203 }
02204 }
02205 }
02206 {
02207 TAO_AV_FlowSpecSetItor end = this->reverse_flow_spec_set.end ();
02208 for (TAO_AV_FlowSpecSetItor begin = this->reverse_flow_spec_set.begin ();
02209 begin != end; ++begin)
02210 {
02211 TAO_FlowSpec_Entry *entry = *begin;
02212 TAO_Tokenizer flow_name (flow_spec [i], '\\');
02213 if (ACE_OS::strcmp (entry->flowname (), flow_name[0]) == 0)
02214 {
02215 if (entry->protocol_object ())
02216 {
02217 entry->protocol_object ()->destroy ();
02218 }
02219 break;
02220 }
02221 }
02222 }
02223 }
02224 }
02225 else
02226 {
02227 {
02228 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
02229 for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
02230 begin != end; ++begin)
02231 {
02232 TAO_FlowSpec_Entry *entry = *begin;
02233 if (entry->protocol_object ())
02234 {
02235 entry->protocol_object ()->stop ();
02236
02237 ACE_CString control_flowname =
02238 TAO_AV_Core::get_control_flowname (entry->flowname ());
02239 TAO_AV_CORE::instance()->remove_acceptor(entry->flowname());
02240 TAO_AV_CORE::instance()->remove_acceptor(control_flowname.c_str());
02241
02242 entry->protocol_object ()->destroy ();
02243 }
02244 }
02245 }
02246 {
02247 TAO_AV_FlowSpecSetItor end = this->reverse_flow_spec_set.end ();
02248 for (TAO_AV_FlowSpecSetItor begin = this->reverse_flow_spec_set.begin ();
02249 begin != end; ++begin)
02250 {
02251 TAO_FlowSpec_Entry *entry = *begin;
02252 if (entry->protocol_object ())
02253 {
02254 entry->protocol_object ()->stop ();
02255
02256 ACE_CString control_flowname =
02257 TAO_AV_Core::get_control_flowname (entry->flowname ());
02258 TAO_AV_CORE::instance()->remove_connector(entry->flowname());
02259 TAO_AV_CORE::instance()->remove_connector(control_flowname.c_str());
02260 entry->protocol_object ()->destroy ();
02261
02262 }
02263 }
02264 }
02265 }
02266
02267
02268
02269
02270 }
02271
02272
02273
02274 CORBA::Boolean
02275 TAO_StreamEndPoint::request_connection (AVStreams::StreamEndPoint_ptr ,
02276 CORBA::Boolean ,
02277 AVStreams::streamQoS &qos,
02278 AVStreams::flowSpec &flow_spec
02279 ACE_ENV_ARG_DECL)
02280 ACE_THROW_SPEC ((CORBA::SystemException,
02281 AVStreams::streamOpDenied,
02282 AVStreams::noSuchFlow,
02283 AVStreams::QoSRequestFailed,
02284 AVStreams::FPError))
02285
02286 {
02287 if (TAO_debug_level > 0)
02288 ACE_DEBUG ((LM_DEBUG,
02289 "\n(%P|%t) TAO_StreamEndPoint::request_connection called"));
02290
02291 int result = 0;
02292 ACE_TRY
02293 {
02294 AVStreams::streamQoS network_qos;
02295 if (qos.length () > 0)
02296 {
02297 if (TAO_debug_level > 0)
02298 ACE_DEBUG ((LM_DEBUG,
02299 "QoS is Specified\n"));
02300
02301 int result = this->translate_qos (qos, network_qos);
02302 if (result != 0)
02303 if (TAO_debug_level > 0)
02304 ACE_DEBUG ((LM_DEBUG, "QoS translation failed\n"));
02305
02306 this->qos ().set (network_qos);
02307 }
02308
02309 if (TAO_debug_level > 0)
02310 ACE_DEBUG ((LM_DEBUG,
02311 "\n(%P|%t) TAO_StreamEndPoint::request_connection: "
02312 "flowspec has length = %d and the strings are:\n",
02313 flow_spec.length ()));
02314 CORBA::ULong i;
02315
02316 for (i=0;i<flow_spec.length ();i++)
02317 {
02318 TAO_Forward_FlowSpec_Entry *entry = 0;
02319 ACE_NEW_RETURN (entry,
02320 TAO_Forward_FlowSpec_Entry,
02321 0);
02322
02323 CORBA::String_var string_entry = CORBA::string_dup (flow_spec[i]);
02324
02325 if(TAO_debug_level > 0)
02326 ACE_DEBUG(( LM_DEBUG,
02327 "%N:%l Parsing flow spec: [%s]\n",
02328 string_entry.in ()));
02329
02330 if (entry->parse (string_entry.in ()) == -1)
02331 {
02332 if (TAO_debug_level > 0)
02333 ACE_DEBUG ((LM_DEBUG,
02334 "%N:%l Error parsing flow_spec: [%s]\n",
02335 string_entry.in ()));
02336 return 0;
02337 }
02338 if (TAO_debug_level > 0)
02339 ACE_DEBUG ((LM_DEBUG,
02340 "TAO_StreamEndPoint::request_connection flow spec [%s]\n",
02341 entry->entry_to_string ()));
02342
02343 this->forward_flow_spec_set.insert (entry);
02344 }
02345
02346 result = TAO_AV_CORE::instance ()->init_forward_flows (this,
02347 this->forward_flow_spec_set,
02348 TAO_AV_Core::TAO_AV_ENDPOINT_B,
02349 flow_spec);
02350
02351 if (result < 0)
02352 return 0;
02353
02354
02355 result = this->handle_connection_requested (flow_spec ACE_ENV_ARG_PARAMETER);
02356 ACE_TRY_CHECK;
02357 }
02358 ACE_CATCHANY
02359 {
02360 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
02361 "TAO_StreamEndpoint::request_connection");
02362 return 0;
02363 }
02364 ACE_ENDTRY;
02365 ACE_CHECK_RETURN (0);
02366 return result;
02367 }
02368
02369 int
02370 TAO_StreamEndPoint::change_qos (AVStreams::streamQoS &new_qos,
02371 const AVStreams::flowSpec &the_flows
02372 ACE_ENV_ARG_DECL_NOT_USED)
02373 {
02374 if (TAO_debug_level > 0)
02375 ACE_DEBUG ((LM_DEBUG,
02376 "TAO_StreamEndPoint::change_qos\n"));
02377
02378 TAO_AV_QoS qos (new_qos);
02379 for (int i = 0; (unsigned) i < the_flows.length (); i++)
02380 {
02381 TAO_Forward_FlowSpec_Entry entry;
02382 entry.parse (the_flows [i]);
02383 ACE_CString flow_name_key (entry.flowname ());
02384 Flow_Handler_Map_Entry *handler_entry;
02385 if (this->flow_handler_map_.find (flow_name_key,
02386 handler_entry) == 0)
02387 {
02388 AVStreams::QoS flow_qos;
02389 if (qos.get_flow_qos (entry.flowname (), flow_qos) != 0)
02390 ACE_DEBUG ((LM_DEBUG,
02391 "New QoS for the flow %s is not specified\n",
02392 entry.flowname ()));
02393 int result;
02394 result = handler_entry->int_id_->change_qos (flow_qos);
02395 if (result != 0)
02396 ACE_ERROR_RETURN ((LM_ERROR,
02397 "Modifying QoS Failed\n"),
02398 -1);
02399
02400 }
02401 }
02402 return 0;
02403 }
02404
02405
02406 CORBA::Boolean
02407 TAO_StreamEndPoint::modify_QoS (AVStreams::streamQoS &new_qos,
02408 const AVStreams::flowSpec &the_flows
02409 ACE_ENV_ARG_DECL)
02410 ACE_THROW_SPEC ((CORBA::SystemException,
02411 AVStreams::noSuchFlow,
02412 AVStreams::QoSRequestFailed))
02413 {
02414 if (TAO_debug_level > 0)
02415 ACE_DEBUG ((LM_DEBUG,
02416 "TAO_StreamEndPoint::modify_QoS\n"));
02417
02418 int result = this->change_qos (new_qos, the_flows ACE_ENV_ARG_PARAMETER);
02419 ACE_CHECK_RETURN (0);
02420
02421 if (result != 0)
02422 return 0;
02423
02424 return 1;
02425
02426 }
02427
02428
02429
02430 CORBA::Boolean
02431 TAO_StreamEndPoint::set_protocol_restriction (const AVStreams::protocolSpec &protocols
02432 ACE_ENV_ARG_DECL)
02433 ACE_THROW_SPEC ((CORBA::SystemException))
02434 {
02435 ACE_TRY
02436 {
02437 CORBA::Any protocol_restriction_any;
02438
02439 protocol_restriction_any <<= protocols;
02440 this->define_property ("ProtocolRestriction",
02441 protocol_restriction_any
02442 ACE_ENV_ARG_PARAMETER);
02443 ACE_TRY_CHECK;
02444 this->protocols_ = protocols;
02445 }
02446 ACE_CATCHANY
02447 {
02448 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamEndPoint::set_protocol_restriction");
02449 return 0;
02450 }
02451 ACE_ENDTRY;
02452 ACE_CHECK_RETURN (0);
02453 return 1;
02454 }
02455
02456
02457 void
02458 TAO_StreamEndPoint::disconnect (const AVStreams::flowSpec &the_spec
02459 ACE_ENV_ARG_DECL_NOT_USED)
02460 ACE_THROW_SPEC ((CORBA::SystemException,
02461 AVStreams::noSuchFlow,
02462 AVStreams::streamOpFailed))
02463 {
02464 ACE_UNUSED_ARG (the_spec);
02465 }
02466
02467
02468
02469 void
02470 TAO_StreamEndPoint::set_FPStatus (const AVStreams::flowSpec &,
02471 const char *fp_name,
02472 const CORBA::Any &fp_settings
02473 ACE_ENV_ARG_DECL_NOT_USED)
02474 ACE_THROW_SPEC ((CORBA::SystemException,
02475 AVStreams::noSuchFlow,
02476 AVStreams::FPError))
02477 {
02478 if (ACE_OS::strcmp (fp_name, "SFP1.0") != 0)
02479 return;
02480 fp_settings >>= this->sfp_status_;
02481
02482 }
02483
02484
02485 CORBA::Object_ptr
02486 TAO_StreamEndPoint::get_fep (const char *flow_name
02487 ACE_ENV_ARG_DECL_NOT_USED)
02488 ACE_THROW_SPEC ((CORBA::SystemException,
02489 AVStreams::notSupported,
02490 AVStreams::noSuchFlow))
02491 {
02492 ACE_CString fep_name_key (flow_name);
02493 AVStreams::FlowEndPoint_var fep_entry;
02494 if (this->fep_map_.find (fep_name_key, fep_entry) == 0)
02495 return fep_entry._retn();
02496 return 0;
02497 }
02498
02499 char*
02500 TAO_StreamEndPoint::add_fep_i_add_property (AVStreams::FlowEndPoint_ptr fep
02501 ACE_ENV_ARG_DECL)
02502 ACE_THROW_SPEC ((CORBA::SystemException,
02503 AVStreams::notSupported,
02504 AVStreams::streamOpFailed))
02505 {
02506 ACE_CString flow_name;
02507
02508 ACE_TRY
02509 {
02510
02511
02512 flow_name = "flow";
02513 char tmp[255];
02514 ACE_OS::sprintf (tmp, "%u", this->flow_num_++);
02515 flow_name += tmp;
02516
02517 CORBA::Any flowname_any;
02518 flowname_any <<= flow_name.c_str ();
02519 fep->define_property ("Flow",
02520 flowname_any
02521 ACE_ENV_ARG_PARAMETER);
02522 ACE_TRY_CHECK;
02523 }
02524 ACE_CATCHANY
02525 {
02526 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
02527 "TAO_StreamEndPoint::add_fep");
02528 return 0;
02529 }
02530 ACE_ENDTRY;
02531 return ACE_OS::strdup( flow_name.c_str () );
02532 }
02533
02534 char*
02535 TAO_StreamEndPoint::add_fep_i (AVStreams::FlowEndPoint_ptr fep
02536 ACE_ENV_ARG_DECL)
02537 ACE_THROW_SPEC ((CORBA::SystemException,
02538 AVStreams::notSupported,
02539 AVStreams::streamOpFailed))
02540 {
02541 CORBA::String_var flow_name;
02542 ACE_TRY
02543 {
02544 CORBA::Any_var flow_name_any =
02545 fep->get_property_value ("FlowName" ACE_ENV_ARG_PARAMETER);
02546 ACE_TRY_CHECK;
02547
02548 const char *tmp;
02549 flow_name_any >>= tmp;
02550 flow_name = CORBA::string_dup (tmp);
02551 }
02552 ACE_CATCHANY
02553 {
02554 flow_name =
02555 this->add_fep_i_add_property (fep ACE_ENV_ARG_PARAMETER);
02556 ACE_CHECK_RETURN (0);
02557 }
02558 ACE_ENDTRY;
02559 return flow_name._retn ();
02560 }
02561
02562 char *
02563 TAO_StreamEndPoint::add_fep (CORBA::Object_ptr fep_obj
02564 ACE_ENV_ARG_DECL)
02565 ACE_THROW_SPEC ((CORBA::SystemException,
02566 AVStreams::notSupported,
02567 AVStreams::streamOpFailed))
02568 {
02569 AVStreams::FlowEndPoint_var fep =
02570 AVStreams::FlowEndPoint::_narrow (fep_obj ACE_ENV_ARG_PARAMETER);
02571 ACE_CHECK_RETURN (0);
02572
02573 CORBA::String_var flow_name =
02574 this->add_fep_i (fep.in () ACE_ENV_ARG_PARAMETER);
02575 ACE_CHECK_RETURN (0);
02576
02577 ACE_TRY
02578 {
02579 fep->lock (ACE_ENV_SINGLE_ARG_PARAMETER);
02580 ACE_TRY_CHECK;
02581
02582
02583 ACE_CString fep_name_key (CORBA::string_dup (flow_name.in ()));
02584 if (this->fep_map_.bind (fep_name_key, AVStreams::FlowEndPoint::_duplicate (fep.in ())) != 0)
02585 {
02586 ACE_THROW_RETURN (AVStreams::streamOpFailed (), 0);
02587 }
02588
02589 this->flow_count_++;
02590 this->flows_.length (this->flow_count_);
02591 this->flows_[this->flow_count_-1] = flow_name;
02592
02593 CORBA::Any flows_any;
02594 flows_any <<= this->flows_;
02595 this->define_property ("Flows",
02596 flows_any
02597 ACE_ENV_ARG_PARAMETER);
02598 ACE_TRY_CHECK;
02599 }
02600 ACE_CATCHANY
02601 {
02602 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamEndPoint::add_fep");
02603 return 0;
02604 }
02605 ACE_ENDTRY;
02606 return flow_name._retn ();
02607 }
02608
02609
02610 void
02611 TAO_StreamEndPoint::remove_fep (const char *flow_name
02612 ACE_ENV_ARG_DECL)
02613 ACE_THROW_SPEC ((CORBA::SystemException,
02614 AVStreams::notSupported,
02615 AVStreams::streamOpFailed))
02616 {
02617 ACE_TRY
02618 {
02619 ACE_CString fep_name_key (flow_name);
02620 AVStreams::FlowEndPoint_var fep_entry;
02621
02622 if (this->fep_map_.unbind (fep_name_key, fep_entry)!= 0)
02623 ACE_THROW (AVStreams::streamOpFailed ());
02624
02625 AVStreams::flowSpec new_flows (this->flows_.length ());
02626 for (u_int i=0, j=0 ; i <this->flows_.length (); i++)
02627 if (ACE_OS::strcmp (flow_name, this->flows_[i]) != 0)
02628 new_flows[j++] = this->flows_[i];
02629
02630 CORBA::Any flows;
02631 flows <<= new_flows;
02632 this->flows_ = new_flows;
02633 this->define_property ("Flows",
02634 flows
02635 ACE_ENV_ARG_PARAMETER);
02636 ACE_TRY_CHECK;
02637 }
02638 ACE_CATCHANY
02639 {
02640 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamEndPoint::remove_fep");
02641 }
02642 ACE_ENDTRY;
02643 ACE_CHECK;
02644 }
02645
02646
02647 void
02648 TAO_StreamEndPoint::set_negotiator (AVStreams::Negotiator_ptr new_negotiator
02649 ACE_ENV_ARG_DECL)
02650 ACE_THROW_SPEC ((CORBA::SystemException))
02651 {
02652 ACE_TRY
02653 {
02654 CORBA::Any negotiator;
02655 negotiator <<= new_negotiator;
02656 this->define_property ("Negotiator",
02657 negotiator
02658 ACE_ENV_ARG_PARAMETER);
02659 ACE_TRY_CHECK;
02660 this->negotiator_ = AVStreams::Negotiator::_duplicate (new_negotiator);
02661 }
02662 ACE_CATCHANY
02663 {
02664 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamEndPoint::set_negotiator");
02665 }
02666 ACE_ENDTRY;
02667 ACE_CHECK;
02668 }
02669
02670
02671
02672 void
02673 TAO_StreamEndPoint::set_key (const char *flow_name,
02674 const AVStreams::key & the_key
02675 ACE_ENV_ARG_DECL)
02676 ACE_THROW_SPEC ((CORBA::SystemException))
02677 {
02678 ACE_TRY
02679 {
02680 this->key_ = the_key;
02681 CORBA::Any PublicKey;
02682 PublicKey <<= the_key;
02683 char PublicKey_property [BUFSIZ];
02684 ACE_OS::sprintf (PublicKey_property, "%s_PublicKey", flow_name);
02685 this->define_property (PublicKey_property,
02686 PublicKey
02687 ACE_ENV_ARG_PARAMETER);
02688 ACE_TRY_CHECK;
02689 }
02690 ACE_CATCHANY
02691 {
02692 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamEndPoint::set_key");
02693 }
02694 ACE_ENDTRY;
02695 ACE_CHECK;
02696 }
02697
02698
02699 void
02700 TAO_StreamEndPoint::set_source_id (CORBA::Long source_id
02701 ACE_ENV_ARG_DECL_NOT_USED)
02702 ACE_THROW_SPEC ((CORBA::SystemException))
02703 {
02704 this->source_id_ = source_id;
02705 }
02706
02707 CORBA::Boolean
02708 TAO_StreamEndPoint::multiconnect (AVStreams::streamQoS &,
02709 AVStreams::flowSpec &
02710 ACE_ENV_ARG_DECL_NOT_USED)
02711 {
02712 if (TAO_debug_level > 0)
02713 ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::multiconnect\n"));
02714 return 0;
02715 }
02716
02717 TAO_StreamEndPoint::~TAO_StreamEndPoint (void)
02718 {
02719
02720 TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
02721 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
02722
02723 int i=0;
02724
02725
02726 for ( ; begin != end; ++begin, ++i)
02727 {
02728
02729
02730 TAO_FlowSpec_Entry *entry = *begin;
02731 delete entry;
02732
02733 }
02734 begin = this->reverse_flow_spec_set.begin ();
02735 end = this->reverse_flow_spec_set.end ();
02736 i = 0;
02737 for (; begin != end; ++begin)
02738 {
02739
02740
02741 TAO_FlowSpec_Entry *entry = *begin;
02742 delete entry;
02743
02744 }
02745 }
02746
02747
02748
02749
02750
02751
02752 TAO_StreamEndPoint_A::TAO_StreamEndPoint_A (void)
02753 {
02754 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_StreamEndPoint_A::TAO_StreamEndPoint_A: created\n"));
02755 }
02756
02757
02758 CORBA::Boolean
02759 TAO_StreamEndPoint_A::multiconnect (AVStreams::streamQoS &stream_qos,
02760 AVStreams::flowSpec &flow_spec
02761 ACE_ENV_ARG_DECL)
02762 ACE_THROW_SPEC ((CORBA::SystemException,
02763 AVStreams::noSuchFlow,
02764 AVStreams::QoSRequestFailed,
02765 AVStreams::streamOpFailed))
02766 {
02767 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPointA::multiconnect\n"));
02768 ACE_TRY
02769 {
02770 int result = 0;
02771 TAO_AV_QoS qos (stream_qos);
02772 for (u_int i=0;i< flow_spec.length ();i++)
02773 {
02774 TAO_Forward_FlowSpec_Entry *forward_entry = 0;
02775 ACE_NEW_RETURN (forward_entry,
02776 TAO_Forward_FlowSpec_Entry,
02777 0);
02778 forward_entry->parse (flow_spec[i]);
02779 ACE_CString mcast_key (forward_entry->flowname ());
02780 AVStreams::FlowEndPoint_var flow_endpoint;
02781
02782
02783
02784
02785
02786
02787
02788
02789
02790 if (this->fep_map_.find (mcast_key, flow_endpoint) == 0)
02791 {
02792 ACE_TRY_EX (narrow)
02793 {
02794 AVStreams::QoS flow_qos;
02795 result = qos.get_flow_qos (forward_entry->flowname (), flow_qos);
02796 if (result < 0)
02797 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "QoS not found for %s\n", forward_entry->flowname ()));
02798
02799 AVStreams::FlowProducer_var producer;
02800 producer = AVStreams::FlowProducer::_narrow (flow_endpoint.in() ACE_ENV_ARG_PARAMETER);
02801
02802 ACE_TRY_CHECK_EX (narrow);
02803
02804 if (!CORBA::is_nil (producer.in ()))
02805 {
02806 AVStreams::FlowConnection_var flow_connection;
02807 ACE_TRY_EX (flow_connection)
02808 {
02809 if (CORBA::is_nil (this->streamctrl_.in ()))
02810 {
02811 CORBA::Any_var streamctrl_any;
02812 streamctrl_any = this->get_property_value ("Related_StreamCtrl"
02813 ACE_ENV_ARG_PARAMETER);
02814 ACE_TRY_CHECK;
02815 AVStreams::StreamCtrl_ptr streamctrl;
02816 streamctrl_any.in () >>= streamctrl;
02817 this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (streamctrl);
02818 }
02819
02820 CORBA::Object_var flow_connection_obj =
02821 this->streamctrl_->get_flow_connection (forward_entry->flowname ()
02822 ACE_ENV_ARG_PARAMETER);
02823 ACE_TRY_CHECK_EX (flow_connection);
02824 flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj.in ()
02825 ACE_ENV_ARG_PARAMETER);
02826 ACE_TRY_CHECK_EX (flow_connection);
02827 }
02828 ACE_CATCHANY
02829 {
02830 TAO_FlowConnection *flowConnection;
02831 ACE_NEW_RETURN (flowConnection,
02832 TAO_FlowConnection,
02833 0);
02834
02835 flowConnection->set_mcast_addr (this->mcast_addr_, this->mcast_port_);
02836 this->mcast_port_++;
02837 flowConnection->set_protocol (forward_entry->carrier_protocol_str ());
02838 flow_connection = flowConnection->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
02839 ACE_TRY_CHECK;
02840 this->streamctrl_->set_flow_connection (forward_entry->flowname (),
02841 flow_connection.in ()
02842 ACE_ENV_ARG_PARAMETER);
02843 ACE_TRY_CHECK;
02844 }
02845 ACE_ENDTRY;
02846 ACE_CHECK_RETURN (0);
02847 if (ACE_OS::strcmp (forward_entry->flow_protocol_str (), "") != 0)
02848 {
02849 CORBA::Any fp_settings;
02850 flow_connection->use_flow_protocol (forward_entry->flow_protocol_str (),
02851 fp_settings
02852 ACE_ENV_ARG_PARAMETER);
02853 ACE_TRY_CHECK;
02854 }
02855 result = flow_connection->add_producer (producer.in (),
02856 flow_qos
02857 ACE_ENV_ARG_PARAMETER);
02858 ACE_TRY_CHECK;
02859 if (result == 0)
02860 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_A::multiconnect: add_producer failed\n"), 0);
02861 }
02862 }
02863 ACE_CATCHANY
02864 {
02865
02866 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "FlowProducer::_narrow");
02867 ACE_ERROR_RETURN ((LM_ERROR, "sep_a doesn't contain a flowproducer"), 0);
02868 }
02869 ACE_ENDTRY;
02870 ACE_CHECK_RETURN (0);
02871 }
02872 else
02873 {
02874 ACE_INET_Addr *mcast_addr;
02875 TAO_FlowSpec_Entry *entry = 0;
02876 result = this->mcast_entry_map_.find (mcast_key, entry);
02877 if (result == 0)
02878 {
02879 mcast_addr = dynamic_cast<ACE_INET_Addr *> (entry->address ());
02880 char str_addr [BUFSIZ];
02881 result = mcast_addr->addr_to_string (str_addr, BUFSIZ);
02882 if (result < 0)
02883 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPointA::multiconnect ::addr_to_string failed\n"), 0);
02884 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint_A::multiconnect:%s\n", str_addr));
02885 TAO_Forward_FlowSpec_Entry new_entry (entry->flowname (),
02886 entry->direction_str (),
02887 entry->format (),
02888 entry->flow_protocol_str (),
02889 entry->carrier_protocol_str (),
02890 entry->address ());
02891 flow_spec[i] = CORBA::string_dup (new_entry.entry_to_string ());
02892 }
02893 else
02894 {
02895
02896 switch (forward_entry->direction ())
02897 {
02898 case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
02899 {
02900 ACE_NEW_RETURN (mcast_addr,
02901 ACE_INET_Addr,
02902 0);
02903 mcast_addr->set (this->mcast_port_, this->mcast_addr_.c_str ());
02904 this->mcast_port_++;
02905 char buf[BUFSIZ];
02906 mcast_addr->addr_to_string (buf, BUFSIZ);
02907 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%s\n", buf));
02908 TAO_Forward_FlowSpec_Entry *new_entry;
02909 ACE_NEW_RETURN (new_entry,
02910 TAO_Forward_FlowSpec_Entry (forward_entry->flowname (),
02911 forward_entry->direction_str (),
02912 forward_entry->format (),
02913 forward_entry->flow_protocol_str (),
02914 forward_entry->carrier_protocol_str (),
02915 mcast_addr),
02916 0);
02917 flow_spec[i] = CORBA::string_dup (new_entry->entry_to_string ());
02918
02919
02920 this->forward_flow_spec_set.insert (new_entry);
02921 TAO_AV_Acceptor_Registry *acceptor_registry = TAO_AV_CORE::instance ()->acceptor_registry ();
02922 result = acceptor_registry->open (this,
02923 TAO_AV_CORE::instance (),
02924 this->forward_flow_spec_set);
02925 if (result < 0)
02926 ACE_ERROR_RETURN ((LM_ERROR, "Acceptor_Registry::open failed\n"), 0);
02927 result = this->mcast_entry_map_.bind (mcast_key, new_entry);
02928 if (result < 0)
02929 ACE_ERROR_RETURN ((LM_ERROR, "mcast_entry::bind failed"), 0);
02930 }
02931 break;
02932 case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
02933
02934 break;
02935 default:
02936 break;
02937 }
02938 }
02939 }
02940 }
02941 }
02942 ACE_CATCHANY
02943 {
02944 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamEndPoint_A::multiconnect");
02945 return 0;
02946 }
02947 ACE_ENDTRY;
02948 ACE_CHECK_RETURN (0);
02949 return 1;
02950 }
02951
02952
02953 CORBA::Boolean
02954 TAO_StreamEndPoint_A::connect_leaf (AVStreams::StreamEndPoint_B_ptr ,
02955 AVStreams::streamQoS & ,
02956 const AVStreams::flowSpec &
02957 ACE_ENV_ARG_DECL)
02958 ACE_THROW_SPEC ((CORBA::SystemException,
02959 AVStreams::streamOpFailed,
02960 AVStreams::noSuchFlow,
02961 AVStreams::QoSRequestFailed,
02962 AVStreams::notSupported))
02963 {
02964 ACE_THROW_RETURN (AVStreams::notSupported (), 0);
02965 }
02966
02967
02968 void
02969 TAO_StreamEndPoint_A::disconnect_leaf (AVStreams::StreamEndPoint_B_ptr ,
02970 const AVStreams::flowSpec &
02971 ACE_ENV_ARG_DECL)
02972
02973 ACE_THROW_SPEC ((CORBA::SystemException,
02974 AVStreams::streamOpFailed,
02975 AVStreams::noSuchFlow,
02976 AVStreams::notSupported))
02977 {
02978
02979 ACE_THROW (AVStreams::notSupported ());
02980
02981 }
02982
02983 TAO_StreamEndPoint_A::~TAO_StreamEndPoint_A (void)
02984 {
02985 }
02986
02987
02988
02989
02990
02991 TAO_StreamEndPoint_B::TAO_StreamEndPoint_B (void)
02992 {
02993 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
02994 "\n(%P|%t) TAO_StreamEndPoint_B::TAO_StreamEndPoint_B: created"));
02995 }
02996
02997 CORBA::Boolean
02998 TAO_StreamEndPoint_B::multiconnect (AVStreams::streamQoS &stream_qos,
02999 AVStreams::flowSpec &flow_spec
03000 ACE_ENV_ARG_DECL)
03001 ACE_THROW_SPEC ((CORBA::SystemException,
03002 AVStreams::streamOpFailed,
03003 AVStreams::noSuchFlow,
03004 AVStreams::QoSRequestFailed,
03005 AVStreams::FPError))
03006 {
03007 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint_B::multiconnect\n"));
03008 ACE_TRY
03009 {
03010 int result = 0;
03011 TAO_AV_QoS qos (stream_qos);
03012 for (u_int i=0;i< flow_spec.length ();i++)
03013 {
03014 TAO_Forward_FlowSpec_Entry *forward_entry;
03015 ACE_NEW_RETURN (forward_entry,
03016 TAO_Forward_FlowSpec_Entry,
03017 0);
03018 forward_entry->parse (flow_spec[i]);
03019 ACE_CString mcast_key (forward_entry->flowname ());
03020 AVStreams::FlowEndPoint_var flow_endpoint;
03021 if (this->fep_map_.find (mcast_key, flow_endpoint ) == 0)
03022 {
03023 AVStreams::FlowConsumer_var consumer;
03024 ACE_TRY_EX (narrow)
03025 {
03026 consumer = AVStreams::FlowConsumer::_narrow (flow_endpoint.in () ACE_ENV_ARG_PARAMETER);
03027 ACE_TRY_CHECK_EX (narrow);
03028 }
03029 ACE_CATCHANY
03030 {
03031 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "FlowConsumer::_narrow");
03032 ACE_ERROR_RETURN ((LM_ERROR, "sep_b doesn't contain a flowconsumer"), 0);
03033 }
03034 ACE_ENDTRY;
03035 ACE_CHECK_RETURN (0);
03036 AVStreams::QoS flow_qos;
03037 result = qos.get_flow_qos (forward_entry->flowname (), flow_qos);
03038 if (result < 0)
03039 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "QoS not found for %s", forward_entry->flowname ()));
03040 AVStreams::FlowConnection_var flow_connection;
03041 ACE_TRY_EX (flow_connection)
03042 {
03043 if (CORBA::is_nil (this->streamctrl_.in ()))
03044 {
03045 CORBA::Any_var streamctrl_any;
03046 streamctrl_any = this->get_property_value ("Related_StreamCtrl"
03047 ACE_ENV_ARG_PARAMETER);
03048 ACE_TRY_CHECK;
03049 AVStreams::StreamCtrl_ptr streamctrl;
03050 streamctrl_any.in () >>= streamctrl;
03051 this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (streamctrl);
03052 }
03053 CORBA::Object_var flow_connection_obj =
03054 this->streamctrl_->get_flow_connection (forward_entry->flowname ()
03055 ACE_ENV_ARG_PARAMETER);
03056 ACE_TRY_CHECK_EX (flow_connection);
03057 flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj.in ()
03058 ACE_ENV_ARG_PARAMETER);
03059 ACE_TRY_CHECK_EX (flow_connection);
03060 }
03061 ACE_CATCHANY
03062 {
03063 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamEndPoint_B::multiconnect::get_flow_connection");
03064 return 0;
03065 }
03066 ACE_ENDTRY;
03067 ACE_CHECK_RETURN (0);
03068 result = flow_connection->add_consumer (consumer.in (),
03069 flow_qos
03070 ACE_ENV_ARG_PARAMETER);
03071 ACE_TRY_CHECK;
03072 if (result == 0)
03073 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_B::multiconnect:add_consumer failed\n"), 0);
03074 }
03075 else
03076 {
03077 TAO_FlowSpec_Entry *mcast_entry = 0;
03078 ACE_INET_Addr *mcast_addr;
03079 mcast_addr = dynamic_cast<ACE_INET_Addr *> (forward_entry->address ());
03080 if (mcast_addr == 0)
03081 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_B::multiconnect::Address missing in flowspec_entry\n"), 0);
03082 result = this->mcast_entry_map_.find (mcast_key, mcast_entry);
03083 if (result == 0)
03084 {
03085 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_B::multiconnect::handler already found\n"), 0);
03086 }
03087 else
03088 {
03089 switch (forward_entry->direction ())
03090 {
03091 case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
03092 {
03093
03094
03095
03096
03097
03098
03099 this->forward_flow_spec_set.insert (forward_entry);
03100 TAO_AV_Connector_Registry *connector_registry = TAO_AV_CORE::instance ()->connector_registry ();
03101 result = connector_registry->open (this,
03102 TAO_AV_CORE::instance (),
03103 this->forward_flow_spec_set);
03104 if (result < 0)
03105 ACE_ERROR_RETURN ((LM_ERROR, "connector_registry::open failed\n"), 0);
03106 result = this->mcast_entry_map_.bind (mcast_key, forward_entry);
03107 if (result < 0)
03108 ACE_ERROR_RETURN ((LM_ERROR, "dgram_mcast_handler::bind failed"), 0);
03109 }
03110 break;
03111 case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
03112
03113 break;
03114 default:
03115 break;
03116 }
03117 }
03118 }
03119 }
03120 }
03121 ACE_CATCHANY
03122 {
03123 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_StreamEndPoint_B::multiconnect");
03124 return 0;
03125 }
03126 ACE_ENDTRY;
03127 ACE_CHECK_RETURN (0);
03128 return 1;
03129 }
03130
03131 TAO_StreamEndPoint_B::~TAO_StreamEndPoint_B (void)
03132 {
03133 }
03134
03135
03136
03137
03138
03139 TAO_VDev::TAO_VDev (void)
03140 {
03141 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
03142 "(%P|%t) TAO_VDev::TAO_VDev: created\n"));
03143 }
03144
03145
03146
03147 CORBA::Boolean
03148 TAO_VDev::set_peer (AVStreams::StreamCtrl_ptr the_ctrl,
03149 AVStreams::VDev_ptr the_peer_dev,
03150 AVStreams::streamQoS &the_qos,
03151 const AVStreams::flowSpec &the_spec
03152 ACE_ENV_ARG_DECL)
03153 ACE_THROW_SPEC ((CORBA::SystemException,
03154 AVStreams::noSuchFlow,
03155 AVStreams::QoSRequestFailed,
03156 AVStreams::streamOpFailed))
03157 {
03158 ACE_UNUSED_ARG (the_qos);
03159 ACE_UNUSED_ARG (the_spec);
03160
03161 CORBA::Boolean result = 0;
03162 ACE_TRY
03163 {
03164 if (TAO_debug_level > 0)
03165 ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_VDev::set_peer: called\n"));
03166
03167
03168 CORBA::Any anyval;
03169 anyval <<= the_peer_dev;
03170 this->define_property ("Related_VDev",
03171 anyval
03172 ACE_ENV_ARG_PARAMETER);
03173
03174 ACE_TRY_CHECK;
03175
03176 this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (the_ctrl);
03177 this->peer_ = AVStreams::VDev::_duplicate (the_peer_dev);
03178
03179 CORBA::Any_var anyptr;
03180 anyptr = this->peer_->get_property_value ("Related_MediaCtrl"
03181 ACE_ENV_ARG_PARAMETER);
03182 ACE_TRY_CHECK;
03183
03184 CORBA::Object_ptr media_ctrl_obj = 0;
03185
03186 anyptr.in () >>= CORBA::Any::to_object(media_ctrl_obj);
03187 ACE_TRY_CHECK;
03188
03189 result = this->set_media_ctrl (media_ctrl_obj ACE_ENV_ARG_PARAMETER);
03190 ACE_TRY_CHECK;
03191 }
03192 ACE_CATCHANY
03193 {
03194 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_VDev::set_peer");
03195 return 0;
03196 }
03197 ACE_ENDTRY;
03198 ACE_CHECK_RETURN (0);
03199 return result;
03200 }
03201
03202 CORBA::Boolean
03203 TAO_VDev::set_media_ctrl (CORBA::Object_ptr media_ctrl
03204 ACE_ENV_ARG_DECL_NOT_USED)
03205
03206 {
03207
03208
03209 CORBA::release( media_ctrl);
03210
03211 return 1;
03212 }
03213
03214
03215 CORBA::Boolean
03216 TAO_VDev::set_Mcast_peer (AVStreams::StreamCtrl_ptr ,
03217 AVStreams::MCastConfigIf_ptr mcast_peer,
03218 AVStreams::streamQoS &,
03219 const AVStreams::flowSpec &
03220 ACE_ENV_ARG_DECL_NOT_USED)
03221 ACE_THROW_SPEC ((CORBA::SystemException,
03222 AVStreams::noSuchFlow,
03223 AVStreams::QoSRequestFailed,
03224 AVStreams::streamOpFailed))
03225 {
03226 this->mcast_peer_ = AVStreams::MCastConfigIf::_duplicate (mcast_peer);
03227 return 1;
03228 }
03229
03230
03231 void
03232 TAO_VDev::configure (const CosPropertyService::Property &
03233 ACE_ENV_ARG_DECL_NOT_USED)
03234 ACE_THROW_SPEC ((CORBA::SystemException,
03235 AVStreams::PropertyException,
03236 AVStreams::streamOpFailed))
03237 {
03238 }
03239
03240
03241 void
03242 TAO_VDev::set_format (const char *flowName,
03243 const char *format_name
03244 ACE_ENV_ARG_DECL)
03245 ACE_THROW_SPEC ((CORBA::SystemException,
03246 AVStreams::notSupported))
03247 {
03248 ACE_TRY
03249 {
03250 if (flowName == 0 || format_name == 0)
03251 ACE_ERROR ((LM_ERROR, "TAO_VDev::set_format: flowName or format_name is null\n"));
03252 char format_property [BUFSIZ];
03253 ACE_OS::sprintf (format_property, "%s_currFormat", flowName);
03254 CORBA::Any format;
03255 format <<= format_name;
03256 this->define_property (format_property,
03257 format
03258 ACE_ENV_ARG_PARAMETER);
03259 ACE_TRY_CHECK;
03260 }
03261 ACE_CATCHANY
03262 {
03263 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_VDev::set_format");
03264 return;
03265 }
03266 ACE_ENDTRY;
03267 ACE_CHECK;
03268 return;
03269 }
03270
03271
03272 void
03273 TAO_VDev::set_dev_params (const char *flowName,
03274 const CosPropertyService::Properties &new_params
03275 ACE_ENV_ARG_DECL)
03276 ACE_THROW_SPEC ((CORBA::SystemException,
03277 AVStreams::PropertyException,
03278 AVStreams::streamOpFailed))
03279 {
03280 ACE_TRY
03281 {
03282 if (flowName == 0)
03283 ACE_ERROR ((LM_ERROR, "TAO_VDev::set_dev_params:flowName is null\n"));
03284 char devParams_property[BUFSIZ];
03285 ACE_OS::sprintf (devParams_property, "%s_devParams", flowName);
03286 CORBA::Any devParams;
03287 devParams <<= new_params;
03288 this->define_property (devParams_property,
03289 devParams
03290 ACE_ENV_ARG_PARAMETER);
03291 ACE_TRY_CHECK;
03292 }
03293 ACE_CATCHANY
03294 {
03295 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_VDev::set_dev_params");
03296 return;
03297 }
03298 ACE_ENDTRY;
03299 ACE_CHECK;
03300 return;
03301 }
03302
03303
03304 CORBA::Boolean
03305 TAO_VDev::modify_QoS (AVStreams::streamQoS &the_qos,
03306 const AVStreams::flowSpec &flowspec
03307 ACE_ENV_ARG_DECL)
03308 ACE_THROW_SPEC ((CORBA::SystemException,
03309 AVStreams::noSuchFlow,
03310 AVStreams::QoSRequestFailed))
03311 {
03312 if (TAO_debug_level > 0)
03313 ACE_DEBUG ((LM_DEBUG,
03314 "TAO_VDev::modify_QoS\n"));
03315
03316 if (flowspec.length () != 0)
03317 {
03318 TAO_Forward_FlowSpec_Entry entry;
03319 entry.parse (flowspec [0]);
03320 int direction = entry.direction ();
03321 if (direction == 0)
03322 {
03323 AVStreams::StreamEndPoint_A_ptr sep_a;
03324
03325 CORBA::Any_ptr streamendpoint_a_any =
03326 this->get_property_value ("Related_StreamEndpoint"
03327 ACE_ENV_ARG_PARAMETER);
03328 ACE_CHECK_RETURN (0);
03329
03330 *streamendpoint_a_any >>= sep_a;
03331 if (sep_a != 0)
03332 {
03333 sep_a->modify_QoS (the_qos, flowspec ACE_ENV_ARG_PARAMETER);
03334 ACE_CHECK_RETURN (0);
03335 }
03336 else ACE_DEBUG ((LM_DEBUG,
03337 "Stream EndPoint Not Found\n"));
03338 }
03339 else
03340 {
03341 AVStreams::StreamEndPoint_B_ptr sep_b;
03342
03343 CORBA::Any_ptr streamendpoint_b_any =
03344 this->get_property_value ("Related_StreamEndpoint"
03345 ACE_ENV_ARG_PARAMETER);
03346 ACE_CHECK_RETURN (0);
03347 *streamendpoint_b_any >>= sep_b;
03348 sep_b->modify_QoS (the_qos, flowspec ACE_ENV_ARG_PARAMETER);
03349 ACE_CHECK_RETURN (0);
03350 }
03351 }
03352 return 1;
03353 }
03354
03355 TAO_VDev::~TAO_VDev (void)
03356 {
03357 }
03358
03359
03360
03361
03362
03363
03364 TAO_MMDevice::TAO_MMDevice (TAO_AV_Endpoint_Strategy *endpoint_strategy)
03365 : endpoint_strategy_ (endpoint_strategy),
03366 flow_count_ (0),
03367 flow_num_ (0),
03368 stream_ctrl_ (0)
03369 {
03370 }
03371
03372
03373
03374 AVStreams::StreamCtrl_ptr
03375 TAO_MMDevice::bind (AVStreams::MMDevice_ptr peer_device,
03376 AVStreams::streamQoS &the_qos,
03377 CORBA::Boolean_out is_met,
03378 const AVStreams::flowSpec &the_spec
03379 ACE_ENV_ARG_DECL)
03380 ACE_THROW_SPEC ((CORBA::SystemException,
03381 AVStreams::streamOpFailed,
03382 AVStreams::noSuchFlow,
03383 AVStreams::QoSRequestFailed))
03384 {
03385 AVStreams::StreamCtrl_ptr streamctrl (AVStreams::StreamCtrl::_nil ());
03386 ACE_TRY
03387 {
03388 ACE_UNUSED_ARG (is_met);
03389 ACE_NEW_RETURN (this->stream_ctrl_,
03390 TAO_StreamCtrl,
03391 0);
03392 AVStreams::MMDevice_var mmdevice = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
03393 ACE_TRY_CHECK;
03394 this->stream_ctrl_->bind_devs (peer_device,
03395 mmdevice.in (),
03396 the_qos,
03397 the_spec
03398 ACE_ENV_ARG_PARAMETER);
03399 ACE_TRY_CHECK;
03400 streamctrl = this->stream_ctrl_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
03401 ACE_TRY_CHECK;
03402 }
03403 ACE_CATCHANY
03404 {
03405 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MMDevice::bind");
03406 return streamctrl;
03407 }
03408 ACE_ENDTRY;
03409 ACE_CHECK_RETURN (streamctrl);
03410 return streamctrl;
03411 }
03412
03413
03414 AVStreams::StreamCtrl_ptr
03415 TAO_MMDevice::bind_mcast (AVStreams::MMDevice_ptr first_peer,
03416 AVStreams::streamQoS &the_qos,
03417 CORBA::Boolean_out is_met,
03418 const AVStreams::flowSpec &the_spec
03419 ACE_ENV_ARG_DECL_NOT_USED)
03420 ACE_THROW_SPEC ((CORBA::SystemException,
03421 AVStreams::streamOpFailed,
03422 AVStreams::noSuchFlow,
03423 AVStreams::QoSRequestFailed))
03424 {
03425 ACE_UNUSED_ARG (first_peer);
03426 ACE_UNUSED_ARG (the_qos);
03427 ACE_UNUSED_ARG (is_met);
03428 ACE_UNUSED_ARG (the_spec);
03429
03430 return 0;
03431 }
03432
03433 AVStreams::StreamEndPoint_ptr
03434 TAO_MMDevice::create_A_B (MMDevice_Type type,
03435 AVStreams::StreamCtrl_ptr streamctrl,
03436 AVStreams::VDev_out the_vdev,
03437 AVStreams::streamQoS &the_qos,
03438 CORBA::Boolean_out met_qos,
03439 char *&,
03440 const AVStreams::flowSpec &flow_spec
03441 ACE_ENV_ARG_DECL)
03442 {
03443 AVStreams::StreamEndPoint_A_ptr sep_a (AVStreams::StreamEndPoint_A::_nil ());
03444 AVStreams::StreamEndPoint_B_ptr sep_b (AVStreams::StreamEndPoint_B::_nil ());
03445 AVStreams::StreamEndPoint_ptr sep (AVStreams::StreamEndPoint::_nil ());
03446 ACE_TRY
03447 {
03448 switch (type)
03449 {
03450 case MMDEVICE_A:
03451 {
03452 if (this->endpoint_strategy_->create_A (sep_a,
03453 the_vdev
03454 ACE_ENV_ARG_PARAMETER) == -1)
03455 ACE_ERROR_RETURN ((LM_ERROR,
03456 "TAO_MMDevice::create_A_B (%P|%t) - "
03457 "error in create_A\n"),
03458 0);
03459 sep = sep_a;
03460 }
03461 break;
03462 case MMDEVICE_B:
03463 {
03464 if (this->endpoint_strategy_->create_B (sep_b,
03465 the_vdev
03466 ACE_ENV_ARG_PARAMETER) == -1)
03467 ACE_ERROR_RETURN ((LM_ERROR,
03468 "TAO_MMDevice::create_A_B (%P|%t) - "
03469 "error in create_B\n"),
03470 0);
03471 sep = sep_b;
03472 }
03473 break;
03474 default:
03475 break;
03476 }
03477 ACE_TRY_CHECK;
03478 if (this->fdev_map_.current_size () > 0)
03479 {
03480 TAO_AV_QoS qos (the_qos);
03481
03482 for (u_int i=0;i<flow_spec.length ();i++)
03483 {
03484 TAO_Forward_FlowSpec_Entry forward_entry;
03485 forward_entry.parse (flow_spec[i]);
03486 ACE_CString flow_key (forward_entry.flowname ());
03487 AVStreams::FDev_var flow_dev;
03488 AVStreams::FlowConnection_var flowconnection;
03489 ACE_TRY_EX (flowconnection)
03490 {
03491
03492
03493 CORBA::Object_var flowconnection_obj =
03494 streamctrl->get_flow_connection (forward_entry.flowname () ACE_ENV_ARG_PARAMETER);
03495 ACE_TRY_CHECK_EX (flowconnection);
03496 printf("successfully called get_flow_connection\n");
03497 if (!CORBA::is_nil (flowconnection_obj.in ()))
03498 {
03499 flowconnection = AVStreams::FlowConnection::_narrow (flowconnection_obj.in ()
03500 ACE_ENV_ARG_PARAMETER);
03501 ACE_TRY_CHECK_EX (flowconnection);
03502 }
03503 }
03504 ACE_CATCH(AVStreams::noSuchFlow, nsf)
03505 {
03506 TAO_FlowConnection *flowConnection;
03507 ACE_NEW_RETURN (flowConnection,
03508 TAO_FlowConnection,
03509 0);
03510 flowconnection = flowConnection->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
03511 ACE_TRY_CHECK;
03512 streamctrl->set_flow_connection (forward_entry.flowname(),
03513 flowconnection.in ()
03514 ACE_ENV_ARG_PARAMETER);
03515 ACE_TRY_CHECK;
03516 }
03517 ACE_CATCHANY
03518 {
03519
03520 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MMDevice::create_a::get_flow_connection");
03521 }
03522 ACE_ENDTRY;
03523 ACE_CHECK_RETURN (0);
03524
03525 int result = this->fdev_map_.find (flow_key, flow_dev);
03526 if (result < 0)
03527 ACE_ERROR_RETURN ((LM_ERROR, "(%N,%l) fdev_map::find failed\n"), 0);
03528
03529 CORBA::String_var named_fdev;
03530 AVStreams::FlowEndPoint_var flow_endpoint;
03531 AVStreams::QoS flow_qos;
03532 result = qos.get_flow_qos (forward_entry.flowname (), flow_qos);
03533 if (result < 0)
03534 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%N,%l) get_flow_qos failed for %s\n", forward_entry.flowname ()));
03535 switch (forward_entry.direction ())
03536 {
03537 case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
03538 {
03539 switch (type)
03540 {
03541 case MMDEVICE_A:
03542 {
03543
03544
03545
03546 flow_endpoint =
03547 flow_dev->create_producer (flowconnection.in (),
03548 flow_qos,
03549 met_qos,
03550 named_fdev.inout ()
03551 ACE_ENV_ARG_PARAMETER);
03552 }
03553 break;
03554 case MMDEVICE_B:
03555 {
03556 flow_endpoint =
03557 flow_dev->create_consumer (flowconnection.in (),
03558 flow_qos,
03559 met_qos,
03560 named_fdev.inout ()
03561 ACE_ENV_ARG_PARAMETER);
03562 }
03563 break;
03564 }
03565 ACE_TRY_CHECK;
03566 }
03567 break;
03568 case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
03569 {
03570 switch (type)
03571 {
03572 case MMDEVICE_A:
03573 {
03574
03575
03576
03577 flow_endpoint =
03578 flow_dev->create_consumer (flowconnection.in (),
03579 flow_qos,
03580 met_qos,
03581 named_fdev.inout ()
03582 ACE_ENV_ARG_PARAMETER);
03583 }
03584 break;
03585 case MMDEVICE_B:
03586 {
03587
03588
03589
03590 flow_endpoint =
03591 flow_dev->create_producer (flowconnection.in (),
03592 flow_qos,
03593 met_qos,
03594 named_fdev.inout ()
03595 ACE_ENV_ARG_PARAMETER);
03596 }
03597 break;
03598 }
03599 ACE_TRY_CHECK;
03600 }
03601 break;
03602 default:
03603 break;
03604 }
03605 CORBA::Any flowname_any;
03606 flowname_any <<= forward_entry.flowname ();
03607 flow_endpoint->define_property ("FlowName", flowname_any ACE_ENV_ARG_PARAMETER);
03608 ACE_TRY_CHECK;
03609 sep->add_fep (flow_endpoint.in ()
03610 ACE_ENV_ARG_PARAMETER);
03611 ACE_TRY_CHECK;
03612 }
03613 }
03614 }
03615 ACE_CATCHANY
03616 {
03617 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MMDevice::create_A");
03618 return sep;
03619 }
03620 ACE_ENDTRY;
03621 ACE_CHECK_RETURN (sep);
03622 return sep;
03623 }
03624
03625 AVStreams::StreamEndPoint_A_ptr
03626 TAO_MMDevice::create_A (AVStreams::StreamCtrl_ptr streamctrl,
03627 AVStreams::VDev_out the_vdev,
03628 AVStreams::streamQoS &stream_qos,
03629 CORBA::Boolean_out met_qos,
03630 char *&named_vdev,
03631 const AVStreams::flowSpec &flow_spec
03632 ACE_ENV_ARG_DECL)
03633 ACE_THROW_SPEC ((CORBA::SystemException,
03634 AVStreams::streamOpFailed,
03635 AVStreams::streamOpDenied,
03636 AVStreams::notSupported,
03637 AVStreams::QoSRequestFailed,
03638 AVStreams::noSuchFlow))
03639 {
03640 AVStreams::StreamEndPoint_A_ptr sep_a = 0;
03641 AVStreams::StreamEndPoint_var sep;
03642 ACE_TRY
03643 {
03644 sep = this->create_A_B (MMDEVICE_A, streamctrl, the_vdev, stream_qos, met_qos, named_vdev, flow_spec ACE_ENV_ARG_PARAMETER);
03645 ACE_TRY_CHECK;
03646 sep_a = AVStreams::StreamEndPoint_A::_narrow (sep.in() ACE_ENV_ARG_PARAMETER);
03647 ACE_TRY_CHECK;
03648
03649 ACE_ASSERT( !CORBA::is_nil( sep_a ) );
03650 }
03651 ACE_CATCHANY
03652 {
03653 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MMDevice::create_A");
03654 return sep_a;
03655 }
03656 ACE_ENDTRY;
03657
03658 return sep_a;
03659 }
03660
03661
03662 AVStreams::StreamEndPoint_B_ptr
03663 TAO_MMDevice::create_B (AVStreams::StreamCtrl_ptr streamctrl,
03664 AVStreams::VDev_out the_vdev,
03665 AVStreams::streamQoS &stream_qos,
03666 CORBA::Boolean_out met_qos,
03667 char *&named_vdev,
03668 const AVStreams::flowSpec &flow_spec
03669 ACE_ENV_ARG_DECL)
03670 ACE_THROW_SPEC ((CORBA::SystemException,
03671 AVStreams::streamOpFailed,
03672 AVStreams::streamOpDenied,
03673 AVStreams::notSupported,
03674 AVStreams::QoSRequestFailed,
03675 AVStreams::noSuchFlow))
03676 {
03677 AVStreams::StreamEndPoint_B_ptr sep_b = AVStreams::StreamEndPoint_B::_nil ();
03678 AVStreams::StreamEndPoint_var sep;
03679
03680 ACE_TRY
03681 {
03682 sep = this->create_A_B (MMDEVICE_B, streamctrl, the_vdev, stream_qos, met_qos, named_vdev, flow_spec ACE_ENV_ARG_PARAMETER);
03683 ACE_TRY_CHECK;
03684 sep_b = AVStreams::StreamEndPoint_B::_narrow (sep.in() ACE_ENV_ARG_PARAMETER);
03685 ACE_TRY_CHECK;
03686
03687 ACE_ASSERT ( !CORBA::is_nil( sep_b ) );
03688 }
03689 ACE_CATCHANY
03690 {
03691 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MMDevice::create_B");
03692 return sep_b;
03693 }
03694 ACE_ENDTRY;
03695 ACE_CHECK_RETURN (sep_b);
03696 return sep_b;
03697 }
03698
03699
03700
03701 void
03702 TAO_MMDevice::destroy (AVStreams::StreamEndPoint_ptr ,
03703 const char *
03704 ACE_ENV_ARG_DECL_NOT_USED)
03705 ACE_THROW_SPEC ((CORBA::SystemException,
03706 AVStreams::notSupported))
03707 {
03708
03709
03710
03711 int result = TAO_AV_Core::deactivate_servant (this);
03712 if (result < 0)
03713 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_MMDevice::destroy failed\n"));
03714 }
03715
03716 char *
03717 TAO_MMDevice::add_fdev_i (AVStreams::FDev_ptr fdev
03718 ACE_ENV_ARG_DECL)
03719 ACE_THROW_SPEC ((CORBA::SystemException,
03720 AVStreams::notSupported,
03721 AVStreams::streamOpFailed))
03722 {
03723 char* tmp;
03724 ACE_NEW_RETURN (tmp,
03725 char[64],
03726 0);
03727 CORBA::String_var flow_name = tmp;
03728
03729 ACE_TRY
03730 {
03731
03732
03733 ACE_OS::sprintf (tmp, "flow%d", flow_num_++);
03734 CORBA::Any flowname_any;
03735 flowname_any <<= flow_name.in ();
03736 fdev->define_property ("Flow", flowname_any ACE_ENV_ARG_PARAMETER);
03737 ACE_TRY_CHECK;
03738 }
03739 ACE_CATCHANY
03740 {
03741 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MMDevice::add_fdev");
03742 return 0;
03743 }
03744 ACE_ENDTRY;
03745 return flow_name._retn ();
03746 }
03747
03748
03749 char *
03750 TAO_MMDevice::add_fdev (CORBA::Object_ptr fdev_obj
03751 ACE_ENV_ARG_DECL)
03752 ACE_THROW_SPEC ((CORBA::SystemException,
03753 AVStreams::notSupported,
03754 AVStreams::streamOpFailed))
03755 {
03756 CORBA::String_var flow_name;
03757 AVStreams::FDev_var fdev;
03758 ACE_TRY_EX (flow_name)
03759 {
03760 CORBA::Any_ptr flow_name_any;
03761 fdev = AVStreams::FDev::_narrow (fdev_obj ACE_ENV_ARG_PARAMETER);
03762 ACE_TRY_CHECK_EX (flow_name);
03763
03764 if (CORBA::is_nil (fdev.in ()))
03765 return 0;
03766
03767
03768 flow_name_any = fdev->get_property_value ("Flow" ACE_ENV_ARG_PARAMETER);
03769 ACE_TRY_CHECK_EX (flow_name);
03770
03771 const char *tmp;
03772 *flow_name_any >>= tmp;
03773 flow_name = CORBA::string_dup (tmp);
03774 }
03775 ACE_CATCHANY
03776 {
03777 flow_name =
03778 this->add_fdev_i (fdev.in () ACE_ENV_ARG_PARAMETER);
03779 ACE_CHECK_RETURN (0);
03780 }
03781 ACE_ENDTRY;
03782 ACE_CHECK_RETURN (0);
03783
03784
03785
03786
03787 ACE_CString fdev_name_key ( flow_name.in () );
03788
03789
03790 if ( (this->fdev_map_.bind (fdev_name_key, fdev )) != 0)
03791 ACE_THROW_RETURN (AVStreams::streamOpFailed (), 0);
03792
03793 this->flow_count_++;
03794 this->flows_.length (this->flow_count_);
03795 this->flows_ [this->flow_count_-1] = flow_name;
03796
03797 CORBA::Any flows_any;
03798 flows_any <<= this->flows_;
03799 ACE_TRY_EX (flows)
03800 {
03801 this->define_property ("Flows",
03802 flows_any
03803 ACE_ENV_ARG_PARAMETER);
03804 ACE_TRY_CHECK_EX (flows);
03805 }
03806 ACE_CATCHANY
03807 {
03808 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MMDevice::add_fdev");
03809 return 0;
03810 }
03811 ACE_ENDTRY;
03812 ACE_CHECK_RETURN (0);
03813 return flow_name._retn ();
03814 }
03815
03816
03817 CORBA::Object_ptr
03818 TAO_MMDevice::get_fdev (const char *flow_name
03819 ACE_ENV_ARG_DECL_NOT_USED)
03820 ACE_THROW_SPEC ((CORBA::SystemException,
03821 AVStreams::notSupported,
03822 AVStreams::noSuchFlow))
03823 {
03824
03825 ACE_CString fdev_name_key (flow_name);
03826 AVStreams::FDev_var fdev_entry;
03827 if (this->fdev_map_.find (fdev_name_key, fdev_entry) == 0)
03828 return fdev_entry._retn() ;
03829 return 0;
03830 }
03831
03832
03833 void
03834 TAO_MMDevice::remove_fdev (const char *flow_name
03835 ACE_ENV_ARG_DECL)
03836 ACE_THROW_SPEC ((CORBA::SystemException,
03837 AVStreams::notSupported,
03838 AVStreams::noSuchFlow,
03839 AVStreams::streamOpFailed))
03840 {
03841 ACE_TRY
03842 {
03843 ACE_CString fdev_name_key (flow_name);
03844 AVStreams::FDev_var fdev_entry;
03845
03846 if (this->fdev_map_.unbind (fdev_name_key, fdev_entry)!= 0)
03847 ACE_THROW (AVStreams::streamOpFailed ());
03848
03849 AVStreams::flowSpec new_flows (this->flows_.length ());
03850 for (u_int i=0, j=0 ; i <this->flows_.length (); i++)
03851 if (ACE_OS::strcmp (flow_name, this->flows_[i]) != 0)
03852 new_flows[j++] = this->flows_[i];
03853
03854 CORBA::Any flows;
03855 flows <<= new_flows;
03856 this->flows_ = new_flows;
03857 this->define_property ("Flows",
03858 flows
03859 ACE_ENV_ARG_PARAMETER);
03860 ACE_TRY_CHECK;
03861 }
03862 ACE_CATCHANY
03863 {
03864 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_MMDevice::remove_fdev");
03865 }
03866 ACE_ENDTRY;
03867 ACE_CHECK;
03868 }
03869
03870
03871 TAO_MMDevice::~TAO_MMDevice (void)
03872 {
03873 delete this->stream_ctrl_;
03874 }
03875
03876
03877
03878
03879
03880
03881 TAO_FlowConnection::TAO_FlowConnection (void)
03882 :fp_name_ (CORBA::string_dup ("")),
03883 ip_multicast_ (0)
03884 {
03885 }
03886
03887
03888
03889
03890
03891
03892
03893
03894
03895 int
03896 TAO_FlowConnection::set_mcast_addr (ACE_CString mcast_addr, u_short mcast_port)
03897 {
03898 this->mcast_addr_ = mcast_addr;
03899 this->mcast_port_ = mcast_port;
03900 return 0;
03901 }
03902
03903 void
03904 TAO_FlowConnection::set_protocol (const char *protocol)
03905 {
03906 this->protocol_ = protocol;
03907 }
03908
03909
03910 void
03911 TAO_FlowConnection::stop (ACE_ENV_SINGLE_ARG_DECL)
03912 ACE_THROW_SPEC ((CORBA::SystemException))
03913 {
03914 ACE_TRY
03915 {
03916 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03917 ();
03918 for (FlowProducer_SetItor producer_end =
03919 this->flow_producer_set_.end ();
03920 producer_begin != producer_end; ++producer_begin)
03921 {
03922 (*producer_begin)->stop (ACE_ENV_SINGLE_ARG_PARAMETER);
03923 ACE_TRY_CHECK;
03924 }
03925 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03926 ();
03927 for (FlowConsumer_SetItor consumer_end =
03928 this->flow_consumer_set_.end ();
03929 consumer_begin != consumer_end; ++consumer_begin)
03930 {
03931 (*consumer_begin)->stop (ACE_ENV_SINGLE_ARG_PARAMETER);
03932 ACE_TRY_CHECK;
03933 }
03934 }
03935 ACE_CATCHANY
03936 {
03937 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowConnection::stop");
03938 return;
03939 }
03940 ACE_ENDTRY;
03941 ACE_CHECK;
03942 }
03943
03944
03945 void
03946 TAO_FlowConnection::start (ACE_ENV_SINGLE_ARG_DECL)
03947 ACE_THROW_SPEC ((CORBA::SystemException))
03948 {
03949 ACE_TRY
03950 {
03951 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03952 ();
03953 for (FlowConsumer_SetItor consumer_end =
03954 this->flow_consumer_set_.end ();
03955 consumer_begin != consumer_end; ++consumer_begin)
03956 {
03957 (*consumer_begin)->start (ACE_ENV_SINGLE_ARG_PARAMETER);
03958 ACE_TRY_CHECK;
03959 }
03960 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03961 ();
03962 for (FlowProducer_SetItor producer_end =
03963 this->flow_producer_set_.end ();
03964 producer_begin != producer_end; ++producer_begin)
03965 {
03966 (*producer_begin)->start (ACE_ENV_SINGLE_ARG_PARAMETER);
03967 ACE_TRY_CHECK;
03968 }
03969 }
03970 ACE_CATCHANY
03971 {
03972 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowConnection::start");
03973 return;
03974 }
03975 ACE_ENDTRY;
03976 ACE_CHECK;
03977 }
03978
03979
03980 void
03981 TAO_FlowConnection::destroy (ACE_ENV_SINGLE_ARG_DECL)
03982 ACE_THROW_SPEC ((CORBA::SystemException))
03983 {
03984 ACE_TRY
03985 {
03986 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03987 ();
03988 for (FlowProducer_SetItor producer_end =
03989 this->flow_producer_set_.end ();
03990 producer_begin != producer_end; ++producer_begin)
03991 {
03992 (*producer_begin)->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
03993 ACE_TRY_CHECK;
03994 }
03995 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03996 ();
03997 for (FlowConsumer_SetItor consumer_end =
03998 this->flow_consumer_set_.end ();
03999 consumer_begin != consumer_end; ++consumer_begin)
04000 {
04001 (*consumer_begin)->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
04002 ACE_TRY_CHECK;
04003 }
04004 }
04005 ACE_CATCHANY
04006 {
04007 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowConnection::destroy");
04008 return;
04009 }
04010 ACE_ENDTRY;
04011 ACE_CHECK;
04012 int result = TAO_AV_Core::deactivate_servant (this);
04013 if (result < 0)
04014 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::destroy failed\n"));
04015 }
04016
04017
04018 CORBA::Boolean
04019 TAO_FlowConnection::modify_QoS (AVStreams::QoS & new_qos
04020 ACE_ENV_ARG_DECL_NOT_USED)
04021 ACE_THROW_SPEC ((CORBA::SystemException,
04022 AVStreams::QoSRequestFailed))
04023 {
04024 ACE_UNUSED_ARG (new_qos);
04025 return 0;
04026 }
04027
04028
04029 CORBA::Boolean
04030 TAO_FlowConnection::use_flow_protocol (const char * fp_name,
04031 const CORBA::Any & fp_settings
04032 ACE_ENV_ARG_DECL)
04033 ACE_THROW_SPEC ((CORBA::SystemException,
04034 AVStreams::FPError,
04035 AVStreams::notSupported))
04036 {
04037 this->fp_name_ = fp_name;
04038 this->fp_settings_ = fp_settings;
04039 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
04040 ();
04041 for (FlowProducer_SetItor producer_end =
04042 this->flow_producer_set_.end ();
04043 producer_begin != producer_end; ++producer_begin)
04044 {
04045 (*producer_begin)->use_flow_protocol
04046 (fp_name, fp_settings ACE_ENV_ARG_PARAMETER);
04047 ACE_CHECK_RETURN (0);
04048 }
04049 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
04050 ();
04051 for (FlowConsumer_SetItor consumer_end =
04052 this->flow_consumer_set_.end ();
04053 consumer_begin != consumer_end; ++consumer_begin)
04054 {
04055 (*consumer_begin)->use_flow_protocol
04056 (fp_name, fp_settings ACE_ENV_ARG_PARAMETER);
04057 ACE_CHECK_RETURN (0);
04058 }
04059 return 1;
04060 }
04061
04062 void
04063 TAO_FlowConnection::push_event (const AVStreams::streamEvent & the_event
04064 ACE_ENV_ARG_DECL_NOT_USED)
04065 ACE_THROW_SPEC ((CORBA::SystemException))
04066 {
04067 ACE_UNUSED_ARG (the_event);
04068 }
04069
04070 CORBA::Boolean
04071 TAO_FlowConnection::connect_devs (AVStreams::FDev_ptr a_party,
04072 AVStreams::FDev_ptr b_party,
04073 AVStreams::QoS & flow_qos
04074 ACE_ENV_ARG_DECL)
04075 ACE_THROW_SPEC ((CORBA::SystemException,
04076 AVStreams::streamOpFailed,
04077 AVStreams::streamOpDenied,
04078 AVStreams::QoSRequestFailed))
04079 {
04080 CORBA::Boolean result = 0;
04081 ACE_TRY
04082 {
04083 AVStreams::FlowConnection_var flowconnection = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
04084 ACE_TRY_CHECK;
04085 CORBA::Boolean met_qos;
04086 CORBA::String_var named_fdev ((const char *)"");
04087 AVStreams::FlowProducer_var producer =
04088 a_party->create_producer (flowconnection.in (),
04089 flow_qos,
04090 met_qos,
04091 named_fdev.inout ()
04092 ACE_ENV_ARG_PARAMETER);
04093 ACE_TRY_CHECK;
04094 AVStreams::FlowConsumer_var consumer =
04095 b_party->create_consumer (flowconnection.in (),
04096 flow_qos,
04097 met_qos,
04098 named_fdev.inout ()
04099 ACE_ENV_ARG_PARAMETER);
04100 ACE_TRY_CHECK;
04101 result = this->connect (producer.in (),
04102 consumer.in (),
04103 flow_qos
04104 ACE_ENV_ARG_PARAMETER);
04105 ACE_TRY_CHECK;
04106 }
04107 ACE_CATCHANY
04108 {
04109 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowConnection::connect_devs");
04110 return 0;
04111 }
04112 ACE_ENDTRY;
04113 ACE_CHECK_RETURN (0);
04114 return result;
04115 }
04116
04117
04118 CORBA::Boolean
04119 TAO_FlowConnection::connect (AVStreams::FlowProducer_ptr producer,
04120 AVStreams::FlowConsumer_ptr consumer,
04121 AVStreams::QoS & the_qos
04122 ACE_ENV_ARG_DECL)
04123 ACE_THROW_SPEC ((CORBA::SystemException,
04124 AVStreams::formatMismatch,
04125 AVStreams::FEPMismatch,
04126 AVStreams::alreadyConnected))
04127 {
04128 ACE_TRY
04129 {
04130
04131 AVStreams::FlowProducer_ptr flow_producer =
04132 AVStreams::FlowProducer::_duplicate (producer);
04133 AVStreams::FlowConsumer_ptr flow_consumer =
04134 AVStreams::FlowConsumer::_duplicate (consumer);
04135
04136 this->flow_producer_set_.insert (flow_producer);
04137 this->flow_consumer_set_.insert (flow_consumer);
04138 AVStreams::FlowConnection_var flowconnection =
04139 this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
04140 ACE_TRY_CHECK;
04141
04142 flow_producer->set_peer (flowconnection.in (),
04143 flow_consumer,
04144 the_qos
04145 ACE_ENV_ARG_PARAMETER);
04146 ACE_TRY_CHECK;
04147
04148 flow_consumer->set_peer (flowconnection.in (),
04149 flow_producer,
04150 the_qos
04151 ACE_ENV_ARG_PARAMETER);
04152 ACE_TRY_CHECK;
04153
04154 char *consumer_address =
04155 flow_consumer->go_to_listen (the_qos,
04156 0,
04157 flow_producer,
04158 this->fp_name_.inout ()
04159 ACE_ENV_ARG_PARAMETER);
04160 ACE_TRY_CHECK;
04161
04162 if (ACE_OS::strcmp (consumer_address, "") == 0)
04163 {
04164
04165 consumer_address = flow_producer->go_to_listen (the_qos,
04166 0,
04167 flow_consumer,
04168 this->fp_name_.inout ()
04169 ACE_ENV_ARG_PARAMETER);
04170 ACE_TRY_CHECK;
04171 flow_consumer->connect_to_peer (the_qos,
04172 consumer_address,
04173 this->fp_name_.inout ()
04174 ACE_ENV_ARG_PARAMETER);
04175 ACE_TRY_CHECK;
04176
04177
04178 }
04179 else
04180 {
04181 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::connect_to_peer addres: %s", consumer_address));
04182 flow_producer->connect_to_peer (the_qos,
04183 consumer_address,
04184 this->fp_name_.inout ()
04185 ACE_ENV_ARG_PARAMETER);
04186 ACE_TRY_CHECK;
04187 }
04188 }
04189 ACE_CATCHANY
04190 {
04191 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowConnection::connect");
04192 }
04193 ACE_ENDTRY;
04194 ACE_CHECK_RETURN (0);
04195 return 1;
04196 }
04197
04198
04199 CORBA::Boolean
04200 TAO_FlowConnection::disconnect (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
04201 ACE_THROW_SPEC ((CORBA::SystemException))
04202 {
04203 return 0;
04204 }
04205
04206 CORBA::Boolean
04207 TAO_FlowConnection::add_producer (AVStreams::FlowProducer_ptr producer,
04208 AVStreams::QoS & the_qos
04209 ACE_ENV_ARG_DECL)
04210 ACE_THROW_SPEC ((CORBA::SystemException,
04211 AVStreams::alreadyConnected,
04212 AVStreams::notSupported))
04213 {
04214 ACE_TRY
04215 {
04216 AVStreams::FlowProducer_ptr flow_producer =
04217 AVStreams::FlowProducer::_duplicate (producer);
04218
04219
04220
04221
04222
04223 FlowProducer_SetItor begin = this->flow_producer_set_.begin ();
04224 FlowProducer_SetItor end = this->flow_producer_set_.end ();
04225 for (; begin != end; ++begin)
04226 {
04227 if ((*begin)->_is_equivalent (producer
04228 ACE_ENV_ARG_PARAMETER))
04229
04230 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1);
04231 }
04232
04233
04234
04235
04236 int result = this->flow_producer_set_.insert (flow_producer);
04237 if (result == 1)
04238 {
04239
04240 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1);
04241 }
04242 CORBA::Boolean met_qos;
04243 char mcast_address[BUFSIZ];
04244 if (this->producer_address_.in () == 0)
04245 {
04246 ACE_INET_Addr mcast_addr;
04247 mcast_addr.set (this->mcast_port_,
04248 this->mcast_addr_.c_str ()
04249 );
04250
04251 char buf [BUFSIZ];
04252 mcast_addr.addr_to_string (buf, BUFSIZ);
04253 ACE_OS::sprintf (mcast_address, "%s=%s", this->protocol_.in (), buf);
04254 }
04255 else
04256 {
04257 ACE_OS::strcpy (mcast_address, this->producer_address_.in ());
04258 }
04259 char *address = flow_producer->connect_mcast (the_qos,
04260 met_qos,
04261 mcast_address,
04262 this->fp_name_.in ()
04263 ACE_ENV_ARG_PARAMETER);
04264
04265 ACE_TRY_CHECK;
04266 if (this->producer_address_.in () == 0)
04267 {
04268 TAO_Forward_FlowSpec_Entry entry ("", "", "", "", address);
04269 if (entry.address () != 0)
04270 {
04271
04272 this->producer_address_ = address;
04273 }
04274 else
04275 {
04276
04277 this->ip_multicast_ = 0;
04278 }
04279 }
04280
04281 if (CORBA::is_nil (this->mcastconfigif_.in ()))
04282 {
04283 ACE_NEW_RETURN (this->mcastconfigif_i_,
04284 TAO_MCastConfigIf,
04285 0);
04286 this->mcastconfigif_ = this->mcastconfigif_i_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
04287 ACE_TRY_CHECK;
04288 }
04289 AVStreams::FlowConnection_var flowconnection = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
04290 ACE_TRY_CHECK;
04291 flow_producer->set_Mcast_peer (flowconnection.in (),
04292 this->mcastconfigif_.in (),
04293 the_qos
04294 ACE_ENV_ARG_PARAMETER);
04295 ACE_TRY_CHECK;
04296 }
04297 ACE_CATCHANY
04298 {
04299 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowConnection::add_producer");
04300 return 0;
04301 }
04302 ACE_ENDTRY;
04303 ACE_CHECK_RETURN (0);
04304 return 1;
04305 }
04306
04307 CORBA::Boolean
04308 TAO_FlowConnection::add_consumer (AVStreams::FlowConsumer_ptr consumer,
04309 AVStreams::QoS & the_qos
04310 ACE_ENV_ARG_DECL)
04311 ACE_THROW_SPEC ((CORBA::SystemException,
04312 AVStreams::alreadyConnected))
04313 {
04314 ACE_TRY
04315 {
04316 AVStreams::FlowConsumer_ptr flow_consumer =
04317 AVStreams::FlowConsumer::_duplicate (consumer);
04318 FlowConsumer_SetItor begin = this->flow_consumer_set_.begin ();
04319 FlowConsumer_SetItor end = this->flow_consumer_set_.end ();
04320 for (; begin != end; ++begin)
04321 {
04322 if ((*begin)->_is_equivalent (consumer
04323 ACE_ENV_ARG_PARAMETER))
04324
04325 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_Consumer: Consumer already exists\n"), 1);
04326 }
04327 int result = this->flow_consumer_set_.insert (flow_consumer);
04328 if (result == 1)
04329 {
04330
04331 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_consumer: consumer already exists\n"), 1);
04332 }
04333
04334 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin ();
04335
04336
04337
04338
04339 AVStreams::FlowProducer_ptr flow_producer = (*producer_begin);
04340
04341 AVStreams::protocolSpec protocols (1);
04342 protocols.length (1);
04343 protocols [0] = CORBA::string_dup (this->producer_address_.in ());
04344
04345 if (!this->ip_multicast_)
04346 {
04347 flow_consumer->set_protocol_restriction (protocols
04348 ACE_ENV_ARG_PARAMETER);
04349 ACE_TRY_CHECK;
04350 char * address =
04351 flow_consumer->go_to_listen (the_qos,
04352 1,
04353 flow_producer,
04354 this->fp_name_.inout ()
04355 ACE_ENV_ARG_PARAMETER);
04356 ACE_TRY_CHECK;
04357 CORBA::Boolean is_met;
04358 flow_producer->connect_mcast (the_qos,
04359 is_met,
04360 address,
04361 this->fp_name_.inout ()
04362 ACE_ENV_ARG_PARAMETER);
04363 ACE_TRY_CHECK;
04364 }
04365 else
04366 {
04367
04368
04369
04370
04371
04372 flow_consumer->connect_to_peer (the_qos,
04373 this->producer_address_.in (),
04374 this->fp_name_.inout ()
04375 ACE_ENV_ARG_PARAMETER);
04376
04377
04378
04379
04380
04381
04382
04383
04384
04385 }
04386 if (CORBA::is_nil (this->mcastconfigif_.in ()))
04387 ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowConnection::add_consumer: first add a producer and then a consumer\n"), 0);
04388
04389 AVStreams::flowSpec flow_spec;
04390 AVStreams::streamQoS stream_qos (1);
04391 stream_qos.length (1);
04392 stream_qos [0] = the_qos;
04393 this->mcastconfigif_->set_peer (flow_consumer,
04394 stream_qos,
04395 flow_spec
04396 ACE_ENV_ARG_PARAMETER);
04397 ACE_TRY_CHECK;
04398 }
04399 ACE_CATCHANY
04400 {
04401 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowConnection::add_consumer");
04402 return 0;
04403 }
04404 ACE_ENDTRY;
04405 ACE_CHECK_RETURN (0);
04406 return 1;
04407 }
04408
04409 CORBA::Boolean
04410 TAO_FlowConnection::drop (AVStreams::FlowEndPoint_ptr target
04411 ACE_ENV_ARG_DECL_NOT_USED)
04412 ACE_THROW_SPEC ((CORBA::SystemException,
04413 AVStreams::notConnected))
04414 {
04415 ACE_UNUSED_ARG (target);
04416 return 0;
04417 }
04418
04419
04420
04421
04422
04423
04424 TAO_FlowEndPoint::TAO_FlowEndPoint (void)
04425 :lock_ (0)
04426 {
04427 }
04428
04429 TAO_FlowEndPoint::TAO_FlowEndPoint (const char *flowname,
04430 AVStreams::protocolSpec &protocols,
04431 const char *format)
04432 {
04433 this->open (flowname, protocols, format);
04434 }
04435
04436 void
04437 TAO_FlowEndPoint::set_flow_handler (const char * ,
04438 TAO_AV_Flow_Handler * )
04439 {
04440 }
04441
04442 int
04443 TAO_FlowEndPoint::open (const char *flowname,
04444 AVStreams::protocolSpec &protocols,
04445 const char *format)
04446 {
04447 this->flowname_ = flowname;
04448 this->format_ = format;
04449
04450 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowEndPoint::open\n"));
04451 ACE_DECLARE_NEW_CORBA_ENV;
04452 ACE_TRY
04453 {
04454 CORBA::Any flowname_any;
04455 flowname_any <<= flowname;
04456 this->define_property ("FlowName",
04457 flowname_any
04458 ACE_ENV_ARG_PARAMETER);
04459 ACE_TRY_CHECK;
04460 this->set_format (format
04461 ACE_ENV_ARG_PARAMETER);
04462 ACE_TRY_CHECK;
04463 this->protocol_addresses_ = protocols;
04464 AVStreams::protocolSpec protocol_spec (protocols.length ());
04465 protocol_spec.length (protocols.length ());
04466 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%N:%l\n"));
04467 for (u_int i=0;i<protocols.length ();i++)
04468 {
04469 CORBA::String_var address = CORBA::string_dup (protocols [i]);
04470 TAO_Forward_FlowSpec_Entry entry ("", "", "", "", address.in ());
04471 protocol_spec [i] = CORBA::string_dup (entry.carrier_protocol_str ());
04472 if (TAO_debug_level > 0)
04473 ACE_DEBUG ((LM_DEBUG,
04474 "[%s]\n",
04475 static_cast<char const*>(protocol_spec[i])));
04476 }
04477 this->set_protocol_restriction (protocol_spec
04478 ACE_ENV_ARG_PARAMETER);
04479 ACE_TRY_CHECK;
04480 }
04481 ACE_CATCHANY
04482 {
04483 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowEndPoint::open");
04484 return -1;
04485 }
04486 ACE_ENDTRY;
04487 ACE_CHECK_RETURN (-1);
04488 return 0;
04489 }
04490
04491
04492 int
04493 TAO_FlowEndPoint::set_flowname (const char *flowname)
04494 {
04495 this->flowname_ = flowname;
04496 return 0;
04497 }
04498
04499
04500
04501 CORBA::Boolean
04502 TAO_FlowEndPoint::lock (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
04503 ACE_THROW_SPEC ((CORBA::SystemException))
04504 {
04505
04506
04507 if (this->lock_)
04508 return 0;
04509 this->lock_ = 1;
04510 return 1;
04511 }
04512
04513
04514 void
04515 TAO_FlowEndPoint::unlock (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
04516 ACE_THROW_SPEC ((CORBA::SystemException))
04517 {
04518 this->lock_ = 0;
04519 }
04520
04521
04522 void
04523 TAO_FlowEndPoint::destroy (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
04524 ACE_THROW_SPEC ((CORBA::SystemException))
04525 {
04526 int result = TAO_AV_Core::deactivate_servant (this);
04527 if (result < 0)
04528 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::destroy failed\n"));
04529 TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04530 for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04531 begin != end; ++begin)
04532 (*begin)->protocol_object ()->destroy ();
04533 }
04534
04535 AVStreams::StreamEndPoint_ptr
04536 TAO_FlowEndPoint::related_sep (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
04537 ACE_THROW_SPEC ((CORBA::SystemException))
04538 {
04539
04540 return AVStreams::StreamEndPoint::_duplicate (this->related_sep_.in ());
04541 }
04542
04543 void
04544 TAO_FlowEndPoint::related_sep (AVStreams::StreamEndPoint_ptr related_sep
04545 ACE_ENV_ARG_DECL_NOT_USED)
04546 ACE_THROW_SPEC ((CORBA::SystemException))
04547 {
04548 this->related_sep_ = AVStreams::StreamEndPoint::_duplicate (related_sep);
04549 }
04550
04551 AVStreams::FlowConnection_ptr
04552 TAO_FlowEndPoint::related_flow_connection (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
04553 ACE_THROW_SPEC ((CORBA::SystemException))
04554 {
04555 return AVStreams::FlowConnection::_duplicate (this->related_flow_connection_.in ());
04556 }
04557
04558 void
04559 TAO_FlowEndPoint::related_flow_connection (AVStreams::FlowConnection_ptr related_flow_connection
04560 ACE_ENV_ARG_DECL_NOT_USED)
04561 ACE_THROW_SPEC ((CORBA::SystemException))
04562 {
04563 this->related_flow_connection_ = AVStreams::FlowConnection::_duplicate (related_flow_connection);
04564 }
04565
04566
04567 AVStreams::FlowEndPoint_ptr
04568 TAO_FlowEndPoint::get_connected_fep (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
04569 ACE_THROW_SPEC ((CORBA::SystemException,
04570 AVStreams::notConnected,
04571 AVStreams::notSupported))
04572 {
04573 return AVStreams::FlowEndPoint::_duplicate (this->peer_fep_.in ());
04574 }
04575
04576 CORBA::Boolean
04577 TAO_FlowEndPoint::use_flow_protocol (const char * fp_name,
04578 const CORBA::Any &
04579 ACE_ENV_ARG_DECL)
04580 ACE_THROW_SPEC ((CORBA::SystemException,
04581 AVStreams::FPError,
04582 AVStreams::notSupported))
04583 {
04584 ACE_TRY
04585 {
04586
04587 CORBA::Any flowname_property;
04588 flowname_property <<= fp_name;
04589 this->define_property ("FlowProtocol",
04590 flowname_property
04591 ACE_ENV_ARG_PARAMETER);
04592 ACE_TRY_CHECK;
04593 }
04594 ACE_CATCHANY
04595 {
04596 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowEndPoint::use_flow_protocol");
04597 return 0;
04598 }
04599 ACE_ENDTRY;
04600 ACE_CHECK_RETURN (0);
04601 return 1;
04602 }
04603
04604 void
04605 TAO_FlowEndPoint::set_format (const char * format
04606 ACE_ENV_ARG_DECL)
04607 ACE_THROW_SPEC ((CORBA::SystemException,
04608 AVStreams::notSupported))
04609 {
04610 this->format_ = format;
04611 ACE_TRY
04612 {
04613
04614
04615 CORBA::Any format_val;
04616 format_val <<= format;
04617 this->define_property ("Format",
04618 format_val
04619 ACE_ENV_ARG_PARAMETER);
04620 ACE_TRY_CHECK;
04621 }
04622 ACE_CATCHANY
04623 {
04624 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowEndpoint::set_format");
04625 }
04626 ACE_ENDTRY;
04627 ACE_CHECK;
04628 }
04629
04630 void
04631 TAO_FlowEndPoint::set_dev_params (const CosPropertyService::Properties & new_settings
04632 ACE_ENV_ARG_DECL)
04633 ACE_THROW_SPEC ((CORBA::SystemException,
04634 AVStreams::PropertyException,
04635 AVStreams::streamOpFailed))
04636 {
04637 this->dev_params_ = new_settings;
04638 ACE_TRY
04639 {
04640 CORBA::Any DevParams_property;
04641 DevParams_property <<= new_settings;
04642 this->define_property ("DevParams",
04643 DevParams_property
04644 ACE_ENV_ARG_PARAMETER);
04645 ACE_TRY_CHECK;
04646 }
04647 ACE_CATCHANY
04648 {
04649 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowEndPoint::set_dev_params");
04650 }
04651 ACE_ENDTRY;
04652 ACE_CHECK;
04653 }
04654
04655 void
04656 TAO_FlowEndPoint::set_protocol_restriction (const AVStreams::protocolSpec & protocols
04657 ACE_ENV_ARG_DECL)
04658 ACE_THROW_SPEC ((CORBA::SystemException,
04659 AVStreams::notSupported))
04660 {
04661 ACE_TRY
04662 {
04663 u_int i = 0;
04664 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%N:%l\n"));
04665 for (i=0;i<protocols.length ();i++)
04666 {
04667 const char *protocol = (protocols)[i];
04668 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%s\n", protocol));
04669 }
04670 CORBA::Any AvailableProtocols_property;
04671 AvailableProtocols_property <<= protocols;
04672 this->define_property ("AvailableProtocols",
04673 AvailableProtocols_property
04674 ACE_ENV_ARG_PARAMETER);
04675 ACE_TRY_CHECK;
04676 AVStreams::protocolSpec *temp_spec;
04677 CORBA::Any_var temp_any = this->get_property_value ("AvailableProtocols"
04678 ACE_ENV_ARG_PARAMETER);
04679 ACE_TRY_CHECK;
04680 temp_any.in () >>= temp_spec;
04681 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%N:%l\n"));
04682 for (i=0;i<temp_spec->length ();i++)
04683 {
04684 const char *protocol = (*temp_spec)[i];
04685 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%s\n", protocol));
04686 }
04687 this->protocols_ = protocols;
04688 }
04689 ACE_CATCHANY
04690 {
04691 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowEndpoint::set_protocol_restriction");
04692 }
04693 ACE_ENDTRY;
04694 ACE_CHECK;
04695 }
04696
04697 CORBA::Boolean
04698 TAO_FlowEndPoint::is_fep_compatible (AVStreams::FlowEndPoint_ptr peer_fep
04699 ACE_ENV_ARG_DECL)
04700 ACE_THROW_SPEC ((CORBA::SystemException,
04701 AVStreams::formatMismatch,
04702 AVStreams::deviceQosMismatch))
04703 {
04704 const char *exception_message = "";
04705 ACE_TRY
04706 {
04707
04708
04709
04710 CORBA::Any_var format_ptr;
04711 CORBA::String_var my_format, peer_format;
04712
04713 exception_message = "TAO_FlowEndPoint::is_fep_compatible - Format";
04714 format_ptr = this->get_property_value ("Format"
04715 ACE_ENV_ARG_PARAMETER);
04716 ACE_TRY_CHECK;
04717
04718 const char *temp_format;
04719 format_ptr.in () >>= temp_format;
04720 my_format = CORBA::string_dup (temp_format);
04721
04722 exception_message = "TAO_FlowEndPoint::is_fep_compatible - Format[2]";
04723 format_ptr = peer_fep->get_property_value ("Format"
04724 ACE_ENV_ARG_PARAMETER);
04725 ACE_TRY_CHECK;
04726 format_ptr.in () >>= temp_format;
04727 peer_format = CORBA::string_dup (temp_format);
04728 if (ACE_OS::strcmp (my_format.in (),
04729 peer_format.in ()) != 0)
04730 return 0;
04731
04732
04733 CORBA::Any_var AvailableProtocols_ptr;
04734 AVStreams::protocolSpec my_protocol_spec, peer_protocol_spec;
04735 AVStreams::protocolSpec *temp_protocols;;
04736
04737 exception_message =
04738 "TAO_FlowEndPoint::is_fep_compatible - AvailableProtocols";
04739 AvailableProtocols_ptr = this->get_property_value ("AvailableProtocols"
04740 ACE_ENV_ARG_PARAMETER);
04741 ACE_TRY_CHECK;
04742 AvailableProtocols_ptr.in () >>= temp_protocols;
04743 my_protocol_spec = *temp_protocols;
04744
04745 exception_message =
04746 "TAO_FlowEndPoint::is_fep_compatible - AvailableProtocols[2]";
04747 AvailableProtocols_ptr = peer_fep->get_property_value ("AvailableProtocols"
04748 ACE_ENV_ARG_PARAMETER);
04749 ACE_TRY_CHECK;
04750 AvailableProtocols_ptr.in () >>= temp_protocols;
04751 peer_protocol_spec = *temp_protocols;
04752
04753 int protocol_match = 0;
04754 for (u_int i=0;i<my_protocol_spec.length ();i++)
04755 {
04756 CORBA::String_var my_protocol_string;
04757 for (u_int j=0;j<peer_protocol_spec.length ();j++)
04758 {
04759 CORBA::String_var peer_protocol_string;
04760 my_protocol_string = CORBA::string_dup (my_protocol_spec[i]);
04761 peer_protocol_string = CORBA::string_dup (peer_protocol_spec[j]);
04762 if (ACE_OS::strcmp (my_protocol_string.in (), peer_protocol_string.in ()) == 0)
04763 {
04764 protocol_match = 1;
04765 break;
04766 }
04767 }
04768 if (protocol_match)
04769 break;
04770 }
04771 if (!protocol_match)
04772 return 0;
04773 }
04774 ACE_CATCH (CosPropertyService::PropertyNotFound, nf)
04775 {
04776 ACE_PRINT_EXCEPTION (nf,
04777 exception_message);
04778 }
04779 ACE_CATCHANY
04780 {
04781 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
04782 "TAO_FlowEndPoint::is_fep_compatible");
04783 return 0;
04784 }
04785 ACE_ENDTRY;
04786 ACE_CHECK_RETURN (0);
04787 return 1;
04788 }
04789
04790 CORBA::Boolean
04791 TAO_FlowEndPoint::set_peer (AVStreams::FlowConnection_ptr ,
04792 AVStreams::FlowEndPoint_ptr the_peer_fep,
04793 AVStreams::QoS &
04794 ACE_ENV_ARG_DECL_NOT_USED)
04795 ACE_THROW_SPEC ((CORBA::SystemException,
04796 AVStreams::QoSRequestFailed,
04797 AVStreams::streamOpFailed))
04798 {
04799 this->peer_fep_ =
04800 AVStreams::FlowEndPoint::_duplicate (the_peer_fep);
04801 return 1;
04802 }
04803
04804 CORBA::Boolean
04805 TAO_FlowEndPoint::set_Mcast_peer (AVStreams::FlowConnection_ptr ,
04806 AVStreams::MCastConfigIf_ptr mcast_peer,
04807 AVStreams::QoS &
04808 ACE_ENV_ARG_DECL_NOT_USED)
04809 ACE_THROW_SPEC ((CORBA::SystemException,
04810 AVStreams::QoSRequestFailed))
04811 {
04812 this->mcast_peer_ = AVStreams::MCastConfigIf::_duplicate (mcast_peer);
04813 return 0;
04814 }
04815
04816 char *
04817 TAO_FlowEndPoint::go_to_listen_i (TAO_FlowSpec_Entry::Role role,
04818 AVStreams::QoS & ,
04819 CORBA::Boolean ,
04820 AVStreams::FlowEndPoint_ptr peer_fep,
04821 char *& flowProtocol
04822 ACE_ENV_ARG_DECL)
04823 ACE_THROW_SPEC ((CORBA::SystemException,
04824 AVStreams::failedToListen,
04825 AVStreams::FPError,
04826 AVStreams::QoSRequestFailed))
04827 {
04828 char direction [BUFSIZ];
04829 switch (role)
04830 {
04831 case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
04832 ACE_OS::strcpy (direction, "IN");
04833 break;
04834 case TAO_FlowSpec_Entry::TAO_AV_CONSUMER:
04835 ACE_OS::strcpy (direction, "OUT");
04836 break;
04837 default:
04838 break;
04839 }
04840 AVStreams::protocolSpec my_protocol_spec, peer_protocol_spec;
04841 AVStreams::protocolSpec *temp_protocols;
04842 CORBA::Any_var AvailableProtocols_ptr =
04843 peer_fep->get_property_value ("AvailableProtocols"
04844 ACE_ENV_ARG_PARAMETER);
04845 ACE_CHECK_RETURN (0);
04846 AvailableProtocols_ptr.in () >>= temp_protocols;
04847 peer_protocol_spec = *temp_protocols;
04848 AvailableProtocols_ptr =
04849 this->get_property_value ("AvailableProtocols"
04850 ACE_ENV_ARG_PARAMETER);
04851 ACE_CHECK_RETURN (0);
04852 AvailableProtocols_ptr.in () >>= temp_protocols;
04853 my_protocol_spec = *temp_protocols;
04854 int protocol_match = 0;
04855 CORBA::String_var listen_protocol;
04856 u_int i =0;
04857 for (i=0;i<my_protocol_spec.length ();i++)
04858 {
04859 CORBA::String_var my_protocol_string;
04860 for (u_int j=0;j<peer_protocol_spec.length ();j++)
04861 {
04862 CORBA::String_var peer_protocol_string;
04863 my_protocol_string = CORBA::string_dup (my_protocol_spec[i]);
04864 peer_protocol_string = CORBA::string_dup (peer_protocol_spec[j]);
04865 if (ACE_OS::strcmp (my_protocol_string.in (), peer_protocol_string.in ()) == 0)
04866 {
04867 listen_protocol = my_protocol_string;
04868 protocol_match = 1;
04869 break;
04870 }
04871 }
04872 if (protocol_match)
04873 break;
04874 }
04875 if (!protocol_match)
04876 ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowEndPoint::go_to_listen failed: no protoocol match\n"), 0);
04877
04878 for (u_int j=0;j<this->protocol_addresses_.length ();j++)
04879 if (ACE_OS::strncmp (this->protocol_addresses_ [j], listen_protocol.in (), ACE_OS::strlen (listen_protocol.in ())) == 0)
04880 {
04881
04882 TAO_Forward_FlowSpec_Entry *entry;
04883 ACE_NEW_RETURN (entry,
04884 TAO_Forward_FlowSpec_Entry (this->flowname_.in (),
04885 direction,
04886 this->format_.in (),
04887 flowProtocol,
04888 this->protocol_addresses_ [j]),
04889 0);
04890
04891 TAO_AV_Acceptor_Registry *acceptor_registry = TAO_AV_CORE::instance ()->acceptor_registry ();
04892 this->flow_spec_set_.insert (entry);
04893 int result = acceptor_registry->open (this,
04894 TAO_AV_CORE::instance (),
04895 this->flow_spec_set_);
04896 if (result < 0)
04897 return 0;
04898 char *listen_address = entry->get_local_addr_str ();
04899 char *address;
04900 ACE_NEW_RETURN (address,
04901 char [BUFSIZ],
04902 0);
04903 ACE_OS::sprintf (address, "%s=%s", listen_protocol.in (), listen_address);
04904 return address;
04905 }
04906 return 0;
04907 }
04908
04909
04910 CORBA::Boolean
04911 TAO_FlowEndPoint::connect_to_peer_i (TAO_FlowSpec_Entry::Role role,
04912 AVStreams::QoS & ,
04913 const char * address,
04914 const char * use_flow_protocol
04915 ACE_ENV_ARG_DECL_NOT_USED)
04916 ACE_THROW_SPEC ((CORBA::SystemException,
04917 AVStreams::failedToConnect,
04918 AVStreams::FPError,
04919 AVStreams::QoSRequestFailed))
04920 {
04921 char direction [BUFSIZ];
04922 switch (role)
04923 {
04924 case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
04925 ACE_OS::strcpy (direction, "IN");
04926 break;
04927 case TAO_FlowSpec_Entry::TAO_AV_CONSUMER:
04928 ACE_OS::strcpy (direction, "OUT");
04929 break;
04930 default:
04931 break;
04932 }
04933 TAO_Forward_FlowSpec_Entry *entry;
04934 ACE_NEW_RETURN (entry,
04935 TAO_Forward_FlowSpec_Entry (this->flowname_.in (),
04936 direction,
04937 this->format_.in (),
04938 use_flow_protocol,
04939 address),
04940 0);
04941 this->flow_spec_set_.insert (entry);
04942 TAO_AV_Connector_Registry *connector_registry = TAO_AV_CORE::instance ()->connector_registry ();
04943 int result = connector_registry->open (this,
04944 TAO_AV_CORE::instance (),
04945 this->flow_spec_set_);
04946 if (result < 0)
04947 ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowEndPoint::connector_registry::open failed\n"), 0);
04948 this->reverse_channel_ = entry->get_local_addr_str ();
04949 return 1;
04950 }
04951
04952 int
04953 TAO_FlowEndPoint::set_protocol_object (const char * ,
04954 TAO_AV_Protocol_Object * )
04955 {
04956 return 0;
04957 }
04958
04959
04960
04961
04962
04963
04964
04965 TAO_FlowProducer::TAO_FlowProducer (void)
04966 {
04967 }
04968
04969 TAO_FlowProducer::TAO_FlowProducer (const char *flowname,
04970 AVStreams::protocolSpec protocols,
04971 const char *format)
04972 {
04973 this->open (flowname, protocols, format);
04974 }
04975
04976
04977 char *
04978 TAO_FlowProducer::get_rev_channel (const char *
04979 ACE_ENV_ARG_DECL_NOT_USED)
04980 ACE_THROW_SPEC ((CORBA::SystemException))
04981 {
04982 return 0;
04983 }
04984
04985
04986 void
04987 TAO_FlowProducer::stop (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
04988 ACE_THROW_SPEC ((CORBA::SystemException))
04989 {
04990 TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04991 for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04992 begin != end; ++begin)
04993 {
04994 TAO_FlowSpec_Entry *entry = (*begin);
04995 entry->handler ()->stop (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
04996 }
04997 }
04998
04999 void
05000 TAO_FlowProducer::start (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
05001 ACE_THROW_SPEC ((CORBA::SystemException))
05002 {
05003 TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
05004 for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
05005 begin != end; ++begin)
05006 {
05007 TAO_FlowSpec_Entry *entry = (*begin);
05008 if (entry->handler () != 0)
05009 {
05010 entry->handler ()->start (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
05011 }
05012 if (entry->control_handler () != 0)
05013 {
05014 entry->control_handler ()->start (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
05015 }
05016 }
05017 }
05018
05019 char *
05020 TAO_FlowProducer::go_to_listen (AVStreams::QoS & the_qos,
05021 CORBA::Boolean is_mcast,
05022 AVStreams::FlowEndPoint_ptr peer_fep,
05023 char *& flowProtocol
05024 ACE_ENV_ARG_DECL)
05025 ACE_THROW_SPEC ((CORBA::SystemException,
05026 AVStreams::failedToListen,
05027 AVStreams::FPError,
05028 AVStreams::QoSRequestFailed))
05029 {
05030 return this->go_to_listen_i (TAO_FlowSpec_Entry::TAO_AV_PRODUCER,
05031 the_qos,
05032 is_mcast,
05033 peer_fep,
05034 flowProtocol
05035 ACE_ENV_ARG_PARAMETER);
05036 }
05037
05038 CORBA::Boolean
05039 TAO_FlowProducer::connect_to_peer (AVStreams::QoS & the_qos,
05040 const char * address,
05041 const char * use_flow_protocol
05042 ACE_ENV_ARG_DECL)
05043 ACE_THROW_SPEC ((CORBA::SystemException,
05044 AVStreams::failedToConnect,
05045 AVStreams::FPError,
05046 AVStreams::QoSRequestFailed))
05047 {
05048 return this->connect_to_peer_i (TAO_FlowSpec_Entry::TAO_AV_PRODUCER,
05049 the_qos,
05050 address,
05051 use_flow_protocol
05052 ACE_ENV_ARG_PARAMETER);
05053 }
05054
05055 char *
05056 TAO_FlowProducer::connect_mcast (AVStreams::QoS & ,
05057 CORBA::Boolean_out ,
05058 const char *address,
05059 const char * use_flow_protocol
05060 ACE_ENV_ARG_DECL_NOT_USED)
05061 ACE_THROW_SPEC ((CORBA::SystemException,
05062 AVStreams::failedToConnect,
05063 AVStreams::notSupported,
05064 AVStreams::FPError,
05065 AVStreams::QoSRequestFailed))
05066 {
05067
05068 for (u_int i=0;i<this->protocols_.length ();i++)
05069 {
05070
05071 }
05072
05073 if (address == 0)
05074 if (TAO_debug_level > 0)
05075 ACE_DEBUG ((LM_DEBUG, "TAO_FlowProducer::connect_mcast address is 0\n"));
05076 TAO_Forward_FlowSpec_Entry *entry;
05077 ACE_NEW_RETURN (entry,
05078 TAO_Forward_FlowSpec_Entry(this->flowname_.in (),
05079 "IN",
05080 this->format_.in (),
05081 use_flow_protocol,
05082 address),
05083 0);
05084
05085 this->flow_spec_set_.insert (entry);
05086 TAO_AV_Acceptor_Registry *acceptor_registry =
05087 TAO_AV_CORE::instance ()->acceptor_registry ();
05088 int result = acceptor_registry->open (this,
05089 TAO_AV_CORE::instance (),
05090 this->flow_spec_set_);
05091 if (result < 0)
05092 ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowProducer::connect_mcast:acceptor_registry open failed\n"), 0);
05093
05094
05095 ACE_Event_Handler *event_handler = entry->handler ()->event_handler ();
05096 event_handler->reactor ()->remove_handler (event_handler,
05097 ACE_Event_Handler::READ_MASK);
05098 return CORBA::string_dup (address);
05099 }
05100
05101
05102 void
05103 TAO_FlowProducer::set_key (const AVStreams::key & the_key
05104 ACE_ENV_ARG_DECL)
05105 ACE_THROW_SPEC ((CORBA::SystemException))
05106 {
05107 ACE_TRY
05108 {
05109 CORBA::Any anyval;
05110 anyval <<= the_key;
05111 this->define_property ("PublicKey",
05112 anyval
05113 ACE_ENV_ARG_PARAMETER);
05114 ACE_TRY_CHECK;
05115 }
05116 ACE_CATCHANY
05117 {
05118 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_FlowProducer::set_key");
05119 }
05120 ACE_ENDTRY;
05121 ACE_CHECK;
05122 }
05123
05124
05125 void
05126 TAO_FlowProducer::set_source_id (CORBA::Long source_id
05127 ACE_ENV_ARG_DECL_NOT_USED)
05128 ACE_THROW_SPEC ((CORBA::SystemException))
05129 {
05130 this->source_id_ = source_id;
05131 }
05132
05133
05134
05135
05136
05137
05138
05139 TAO_FlowConsumer::TAO_FlowConsumer (void)
05140 {
05141 }
05142
05143 TAO_FlowConsumer::TAO_FlowConsumer (const char *flowname,
05144 AVStreams::protocolSpec protocols,
05145 const char *format)
05146 {
05147 this->open (flowname, protocols, format);
05148 }
05149
05150
05151 void
05152 TAO_FlowConsumer::stop (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
05153 ACE_THROW_SPEC ((CORBA::SystemException))
05154 {
05155 TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
05156 for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
05157 begin != end; ++begin)
05158 (*begin)->handler ()->stop (TAO_FlowSpec_Entry::TAO_AV_CONSUMER);
05159 }
05160
05161 void
05162 TAO_FlowConsumer::start (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
05163 ACE_THROW_SPEC ((CORBA::SystemException))
05164 {
05165 TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
05166 for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
05167 begin != end; ++begin)
05168 {
05169 (*begin)->handler ()->start (TAO_FlowSpec_Entry::TAO_AV_CONSUMER);
05170 }
05171 }
05172
05173 char *
05174 TAO_FlowConsumer::go_to_listen (AVStreams::QoS & the_qos,
05175 CORBA::Boolean is_mcast,
05176 AVStreams::FlowEndPoint_ptr peer_fep,
05177 char *& flowProtocol
05178 ACE_ENV_ARG_DECL)
05179 ACE_THROW_SPEC ((CORBA::SystemException,
05180 AVStreams::failedToListen,
05181 AVStreams::FPError,
05182 AVStreams::QoSRequestFailed))
05183 {
05184 return this->go_to_listen_i (TAO_FlowSpec_Entry::TAO_AV_CONSUMER,
05185 the_qos,
05186 is_mcast,
05187 peer_fep,
05188 flowProtocol
05189 ACE_ENV_ARG_PARAMETER);
05190 }
05191
05192 CORBA::Boolean
05193 TAO_FlowConsumer::connect_to_peer (AVStreams::QoS & the_qos,
05194 const char * address,
05195 const char * use_flow_protocol
05196 ACE_ENV_ARG_DECL)
05197 ACE_THROW_SPEC ((CORBA::SystemException,
05198 AVStreams::failedToConnect,
05199 AVStreams::FPError,
05200 AVStreams::QoSRequestFailed))
05201 {
05202 return this->connect_to_peer_i (TAO_FlowSpec_Entry::TAO_AV_CONSUMER,
05203 the_qos,
05204 address,
05205 use_flow_protocol
05206 ACE_ENV_ARG_PARAMETER);
05207 }
05208
05209
05210
05211
05212 TAO_Tokenizer::TAO_Tokenizer (const char *string, char delimiter)
05213 :token_array_ (10),
05214 count_ (0)
05215 {
05216 this->parse (string, delimiter);
05217 }
05218
05219 TAO_Tokenizer::~TAO_Tokenizer ()
05220 {
05221 for (unsigned int i=0; i<this->num_tokens_; i++)
05222 CORBA::string_free (this->token_array_[i]);
05223 }
05224
05225
05226 int
05227 TAO_Tokenizer::parse (const char *string, char delimiter)
05228 {
05229 ACE_CString new_string (string);
05230 u_int pos =0;
05231 int slash_pos = 0;
05232 u_int count = 0;
05233 int result;
05234 while (pos < new_string.length ())
05235 {
05236 slash_pos = new_string.find (delimiter, pos);
05237 ACE_CString substring;
05238 if (slash_pos != new_string.npos)
05239 {
05240 substring = new_string.substring (pos,
05241 slash_pos - pos);
05242 pos = slash_pos + 1;
05243 }
05244 else
05245 {
05246 substring = new_string.substring (pos);
05247 pos = static_cast<int> (new_string.length ());
05248 }
05249 char *token = CORBA::string_dup (substring.c_str ());
05250 result = this->token_array_.set (token, count);
05251 if (result == -1)
05252 {
05253 this->token_array_.size (this->token_array_.size ()*2);
05254 result = this->token_array_.set (token, count);
05255 if (result == -1)
05256 ACE_ERROR_RETURN ((LM_ERROR, "TAO_Tokenizer::parse error"), -1);
05257 }
05258 count++;
05259 }
05260
05261
05262
05263
05264
05265
05266
05267
05268
05269
05270
05271
05272
05273
05274
05275
05276
05277
05278
05279
05280
05281 this->num_tokens_ = count;
05282 return 0;
05283 }
05284
05285 char*
05286 TAO_Tokenizer::token (void)
05287 {
05288 if (count_ < num_tokens_)
05289 return CORBA::string_dup (this->token_array_[this->count_++]);
05290 else
05291 return 0;
05292 }
05293
05294 int
05295 TAO_Tokenizer::num_tokens (void)
05296 {
05297 return static_cast<int> (this->num_tokens_);
05298 }
05299
05300 const char *
05301 TAO_Tokenizer::operator [] (size_t index) const
05302 {
05303 if (index >= this->num_tokens_)
05304 return 0;
05305
05306 return this->token_array_[index];
05307 }
05308
05309 TAO_END_VERSIONED_NAMESPACE_DECL